• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3522

07 Nov 2024 05:59AM UTC coverage: 58.216% (+1.3%) from 56.943%
#3522

push

travis-ci

web-flow
Merge pull request #28663 from taosdata/fix/3_liaohj

fix(stream): stop the underlying scan operations for stream

111884 of 248391 branches covered (45.04%)

Branch coverage included in aggregate %.

3 of 4 new or added lines in 1 file covered. (75.0%)

1164 existing lines in 134 files now uncovered.

191720 of 273118 relevant lines covered (70.2%)

13088725.13 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.76
/source/libs/executor/src/aggregateoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "filter.h"
17
#include "function.h"
18
#include "os.h"
19
#include "querynodes.h"
20
#include "tfill.h"
21
#include "tname.h"
22

23
#include "executorInt.h"
24
#include "index.h"
25
#include "operator.h"
26
#include "query.h"
27
#include "querytask.h"
28
#include "tcompare.h"
29
#include "tdatablock.h"
30
#include "tglobal.h"
31
#include "thash.h"
32
#include "ttypes.h"
33

34
typedef struct {
35
  bool    hasAgg;
36
  int32_t numOfRows;
37
  int32_t startOffset;
38
} SFunctionCtxStatus;
39

40
typedef struct SAggOperatorInfo {
41
  SOptrBasicInfo   binfo;
42
  SAggSupporter    aggSup;
43
  STableQueryInfo* current;
44
  uint64_t         groupId;
45
  SGroupResInfo    groupResInfo;
46
  SExprSupp        scalarExprSup;
47
  bool             groupKeyOptimized;
48
  bool             hasValidBlock;
49
  SSDataBlock*     pNewGroupBlock;
50
  bool             hasCountFunc;
51
  SOperatorInfo*   pOperator;
52
  bool             cleanGroupResInfo;
53
} SAggOperatorInfo;
54

55
static void destroyAggOperatorInfo(void* param);
56
static int32_t setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
57

58
static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock);
59
static void    destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock);
60

61
static int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx);
62
static int32_t getAggregateResultNext(SOperatorInfo* pOperator, SSDataBlock** ppRes);
63
static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
64
                                const char* pKey);
65

66
static int32_t addNewResultRowBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, uint32_t size);
67

68
static int32_t doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
69

70
static void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus);
71
static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus);
72

73
int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo,
335,689✔
74
                                    SOperatorInfo** pOptrInfo) {
75
  QRY_PARAM_CHECK(pOptrInfo);
335,689!
76

77
  int32_t    lino = 0;
335,689✔
78
  int32_t    code = 0;
335,689✔
79
  int32_t    num = 0;
335,689✔
80
  SExprInfo* pExprInfo = NULL;
335,689✔
81
  int32_t    numOfScalarExpr = 0;
335,689✔
82
  SExprInfo* pScalarExprInfo = NULL;
335,689✔
83

84
  SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo));
335,689✔
85
  SOperatorInfo*    pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
335,808✔
86
  if (pInfo == NULL || pOperator == NULL) {
335,837!
87
    code = terrno;
1✔
88
    goto _error;
×
89
  }
90

91
  pOperator->exprSupp.hasWindowOrGroup = false;
335,836✔
92

93
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pAggNode->node.pOutputDataBlockDesc);
335,836✔
94
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
335,915!
95
  initBasicInfo(&pInfo->binfo, pResBlock);
335,915✔
96

97
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
335,877✔
98
  initResultSizeInfo(&pOperator->resultInfo, 4096);
335,877✔
99

100
  code = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &pExprInfo, &num);
335,897✔
101
  TSDB_CHECK_CODE(code, lino, _error);
335,852!
102

103
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
335,852✔
104
                               pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
335,852✔
105
  TSDB_CHECK_CODE(code, lino, _error);
335,873!
106

107
  if (pAggNode->pExprs != NULL) {
335,873✔
108
    code = createExprInfo(pAggNode->pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr);
2,198✔
109
    TSDB_CHECK_CODE(code, lino, _error);
2,198!
110
  }
111

112
  code = initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
335,873✔
113
  TSDB_CHECK_CODE(code, lino, _error);
335,801!
114

115
  code = filterInitFromNode((SNode*)pAggNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
335,801✔
116
  TSDB_CHECK_CODE(code, lino, _error);
335,774!
117

118
  pInfo->binfo.mergeResultBlock = pAggNode->mergeDataBlock;
335,774✔
119
  pInfo->groupKeyOptimized = pAggNode->groupKeyOptimized;
335,774✔
120
  pInfo->groupId = UINT64_MAX;
335,774✔
121
  pInfo->binfo.inputTsOrder = pAggNode->node.inputTsOrder;
335,774✔
122
  pInfo->binfo.outputTsOrder = pAggNode->node.outputTsOrder;
335,774✔
123
  pInfo->hasCountFunc = pAggNode->hasCountLikeFunc;
335,774✔
124
  pInfo->pOperator = pOperator;
335,774✔
125
  pInfo->cleanGroupResInfo = false;
335,774✔
126

127
  setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG,
335,774✔
128
                  !pAggNode->node.forceCreateNonBlockingOptr, OP_NOT_OPENED, pInfo, pTaskInfo);
335,774✔
129
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, getAggregateResultNext, NULL, destroyAggOperatorInfo,
335,780✔
130
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
131

132
  if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
335,692✔
133
    STableScanInfo* pTableScanInfo = downstream->info;
245,507✔
134
    pTableScanInfo->base.pdInfo.pExprSup = &pOperator->exprSupp;
245,507✔
135
    pTableScanInfo->base.pdInfo.pAggSup = &pInfo->aggSup;
245,507✔
136
  }
137

138
  code = appendDownstream(pOperator, &downstream, 1);
335,692✔
139
  if (code != TSDB_CODE_SUCCESS) {
335,877!
140
    goto _error;
×
141
  }
142

143
  *pOptrInfo = pOperator;
335,877✔
144
  return TSDB_CODE_SUCCESS;
335,877✔
145

146
_error:
×
147
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
148
  if (pInfo != NULL) {
×
149
    destroyAggOperatorInfo(pInfo);
×
150
  }
151
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
152
  pTaskInfo->code = code;
×
153
  return code;
×
154
}
155

156
void destroyAggOperatorInfo(void* param) {
335,940✔
157
  if (param == NULL) {
335,940!
158
    return;
×
159
  }
160
  SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param;
335,940✔
161
  cleanupBasicInfo(&pInfo->binfo);
335,940✔
162

163
  if (pInfo->pOperator) {
335,945✔
164
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
335,944✔
165
                      pInfo->cleanGroupResInfo);
335,944✔
166
    pInfo->pOperator = NULL;
335,929✔
167
  }
168
  cleanupAggSup(&pInfo->aggSup);
335,930✔
169
  cleanupExprSupp(&pInfo->scalarExprSup);
335,942✔
170
  cleanupGroupResInfo(&pInfo->groupResInfo);
335,937✔
171
  taosMemoryFreeClear(param);
335,942!
172
}
173

174
/**
175
 * @brief get blocks from downstream and fill results into groupedRes after aggragation
176
 * @retval false if no more groups
177
 * @retval true if there could have new groups coming
178
 * @note if pOperator.blocking is true, scan all blocks from downstream, all groups are handled
179
 *       if false, fill results of ONE GROUP
180
 * */
181
static bool nextGroupedResult(SOperatorInfo* pOperator) {
351,613✔
182
  int32_t           code = TSDB_CODE_SUCCESS;
351,613✔
183
  int32_t           lino = 0;
351,613✔
184
  SExecTaskInfo*    pTaskInfo = pOperator->pTaskInfo;
351,613✔
185
  SAggOperatorInfo* pAggInfo = pOperator->info;
351,613✔
186

187
  if (pOperator->blocking && pAggInfo->hasValidBlock) {
351,613✔
188
    return false;
2,609✔
189
  }
190

191
  SExprSupp*   pSup = &pOperator->exprSupp;
349,004✔
192
  int64_t      st = taosGetTimestampUs();
349,429✔
193
  int32_t      order = pAggInfo->binfo.inputTsOrder;
349,429✔
194
  SSDataBlock* pBlock = pAggInfo->pNewGroupBlock;
349,429✔
195

196
  pAggInfo->cleanGroupResInfo = false;
349,429✔
197
  if (pBlock) {
349,429✔
198
    pAggInfo->pNewGroupBlock = NULL;
13,538✔
199
    tSimpleHashClear(pAggInfo->aggSup.pResultRowHashTable);
13,538✔
200
    code = setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId);
13,538✔
201
    QUERY_CHECK_CODE(code, lino, _end);
13,538!
202
    code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
13,538✔
203
    QUERY_CHECK_CODE(code, lino, _end);
13,538!
204

205
    code = doAggregateImpl(pOperator, pSup->pCtx);
13,538✔
206
    QUERY_CHECK_CODE(code, lino, _end);
13,538!
207
  }
208
  while (1) {
655,970✔
209
    bool blockAllocated = false;
1,005,399✔
210
    pBlock = getNextBlockFromDownstream(pOperator, 0);
1,005,399✔
211
    if (pBlock == NULL) {
1,005,417✔
212
      if (!pAggInfo->hasValidBlock) {
342,049✔
213
        code = createDataBlockForEmptyInput(pOperator, &pBlock);
39,728✔
214
        QUERY_CHECK_CODE(code, lino, _end);
39,725!
215

216
        if (pBlock == NULL) {
39,725✔
217
          break;
33,582✔
218
        }
219
        blockAllocated = true;
6,143✔
220
      } else {
221
        break;
302,321✔
222
      }
223
    }
224
    pAggInfo->hasValidBlock = true;
669,511✔
225
    pAggInfo->binfo.pRes->info.scanFlag = pBlock->info.scanFlag;
669,511✔
226

227
    // there is an scalar expression that needs to be calculated before apply the group aggregation.
228
    if (pAggInfo->scalarExprSup.pExprInfo != NULL && !blockAllocated) {
669,511✔
229
      SExprSupp* pSup1 = &pAggInfo->scalarExprSup;
5,241✔
230
      code = projectApplyFunctions(pSup1->pExprInfo, pBlock, pBlock, pSup1->pCtx, pSup1->numOfExprs, NULL);
5,241✔
231
      if (code != TSDB_CODE_SUCCESS) {
5,240!
232
        destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
×
233
        T_LONG_JMP(pTaskInfo->env, code);
×
234
      }
235
    }
236
    // if non-blocking mode and new group arrived, save the block and break
237
    if (!pOperator->blocking && pAggInfo->groupId != UINT64_MAX && pBlock->info.id.groupId != pAggInfo->groupId) {
669,510✔
238
      pAggInfo->pNewGroupBlock = pBlock;
13,568✔
239
      break;
13,568✔
240
    }
241
    // the pDataBlock are always the same one, no need to call this again
242
    code = setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId);
655,942✔
243
    if (code != TSDB_CODE_SUCCESS) {
655,971!
244
      destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
×
245
      T_LONG_JMP(pTaskInfo->env, code);
×
246
    }
247
    code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
655,971✔
248
    if (code != TSDB_CODE_SUCCESS) {
655,969!
249
      destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
×
250
      T_LONG_JMP(pTaskInfo->env, code);
×
251
    }
252

253
    code = doAggregateImpl(pOperator, pSup->pCtx);
655,969✔
254
    if (code != TSDB_CODE_SUCCESS) {
655,972!
255
      destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
×
256
      T_LONG_JMP(pTaskInfo->env, code);
×
257
    }
258

259
    destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
655,972✔
260
  }
261

262
  // the downstream operator may return with error code, so let's check the code before generating results.
263
  if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
349,471!
264
    T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
265
  }
266

267
  code = initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0);
349,471✔
268
  QUERY_CHECK_CODE(code, lino, _end);
349,471!
269
  pAggInfo->cleanGroupResInfo = true;
349,471✔
270

271
_end:
349,471✔
272
  if (code != TSDB_CODE_SUCCESS) {
349,471!
273
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
274
    pTaskInfo->code = code;
×
275
    T_LONG_JMP(pTaskInfo->env, code);
×
276
  }
277
  return pBlock != NULL;
349,471✔
278
}
279

280
int32_t getAggregateResultNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
652,772✔
281
  int32_t           code = TSDB_CODE_SUCCESS;
652,772✔
282
  int32_t           lino = 0;
652,772✔
283
  SAggOperatorInfo* pAggInfo = pOperator->info;
652,772✔
284
  SOptrBasicInfo*   pInfo = &pAggInfo->binfo;
652,772✔
285

286
  if (pOperator->status == OP_EXEC_DONE) {
652,772✔
287
    (*ppRes) = NULL;
301,157✔
288
    return code;
301,157✔
289
  }
290

291
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
351,615✔
292
  bool           hasNewGroups = false;
351,615✔
293
  do {
294
    hasNewGroups = nextGroupedResult(pOperator);
351,643✔
295
    code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
352,081✔
296
    QUERY_CHECK_CODE(code, lino, _end);
352,083!
297

298
    while (1) {
299
      doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf);
352,083✔
300
      code = doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
352,079✔
301
      QUERY_CHECK_CODE(code, lino, _end);
352,074!
302

303
      if (!hasRemainResults(&pAggInfo->groupResInfo)) {
352,074✔
304
        if (!hasNewGroups) setOperatorCompleted(pOperator);
349,464✔
305
        break;
349,471✔
306
      }
307

308
      if (pInfo->pRes->info.rows > 0) {
2,610!
309
        break;
2,610✔
310
      }
311
    }
312
  } while (pInfo->pRes->info.rows == 0 && hasNewGroups);
352,081✔
313

314
  size_t rows = blockDataGetNumOfRows(pInfo->pRes);
352,053✔
315
  pOperator->resultInfo.totalRows += rows;
352,050✔
316

317
_end:
352,050✔
318
  if (code != TSDB_CODE_SUCCESS) {
352,050!
319
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
320
    pTaskInfo->code = code;
×
321
    T_LONG_JMP(pTaskInfo->env, code);
×
322
  }
323

324
  (*ppRes) = (rows == 0) ? NULL : pInfo->pRes;
352,050✔
325
  return code;
352,050✔
326
}
327

328
static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
×
329
  SSDataBlock* pRes = NULL;
×
330
  int32_t code = getAggregateResultNext(pOperator, &pRes);
×
331
  return pRes;
×
332
}
333

334
int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) {
669,499✔
335
  int32_t code = TSDB_CODE_SUCCESS;
669,499✔
336
  for (int32_t k = 0; k < pOperator->exprSupp.numOfExprs; ++k) {
1,835,349✔
337
    if (functionNeedToExecute(&pCtx[k])) {
1,165,830!
338
      // todo add a dummy function to avoid process check
339
      if (pCtx[k].fpSet.process == NULL) {
1,165,837✔
340
        continue;
150,283✔
341
      }
342

343
      if ((&pCtx[k])->input.pData[0] == NULL) {
1,015,554!
344
        code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
345
        qError("%s aggregate function error happens, input data is NULL.", GET_TASKID(pOperator->pTaskInfo));
×
346
      } else {
347
        code = pCtx[k].fpSet.process(&pCtx[k]);
1,015,554✔
348
      }
349

350
      if (code != TSDB_CODE_SUCCESS) {
1,015,572!
351
        if (pCtx[k].fpSet.cleanup != NULL) {
×
352
          pCtx[k].fpSet.cleanup(&pCtx[k]);
×
353
        }
354
        qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code));
×
355
        return code;
×
356
      }
357
    }
358
  }
359

360
  return TSDB_CODE_SUCCESS;
669,519✔
361
}
362

363
static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock) {
39,725✔
364
  int32_t code = TSDB_CODE_SUCCESS;
39,725✔
365
  int32_t lino = 0;
39,725✔
366
  SSDataBlock* pBlock = NULL;
39,725✔
367
  if (!tsCountAlwaysReturnValue) {
39,725✔
368
    return TSDB_CODE_SUCCESS;
872✔
369
  }
370

371
  SAggOperatorInfo* pAggInfo = pOperator->info;
38,853✔
372
  if (pAggInfo->groupKeyOptimized) {
38,853✔
373
    return TSDB_CODE_SUCCESS;
16,581✔
374
  }
375

376
  SOperatorInfo* downstream = pOperator->pDownstream[0];
22,272✔
377
  if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_PARTITION ||
22,272✔
378
      downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_SORT ||
22,219✔
379
      (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN &&
22,188✔
380
       ((STableScanInfo*)downstream->info)->hasGroupByTag == true)) {
18,099✔
381
    return TSDB_CODE_SUCCESS;
85✔
382
  }
383

384
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
22,187✔
385

386
  if (!pAggInfo->hasCountFunc) {
22,187✔
387
    return TSDB_CODE_SUCCESS;
16,044✔
388
  }
389

390
  code = createDataBlock(&pBlock);
6,143✔
391
  if (code) {
6,144!
392
    return code;
×
393
  }
394

395
  pBlock->info.rows = 1;
6,144✔
396
  pBlock->info.capacity = 0;
6,144✔
397

398
  for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
20,087✔
399
    SColumnInfoData colInfo = {0};
13,947✔
400
    colInfo.hasNull = true;
13,947✔
401
    colInfo.info.type = TSDB_DATA_TYPE_NULL;
13,947✔
402
    colInfo.info.bytes = 1;
13,947✔
403

404
    SExprInfo* pOneExpr = &pOperator->exprSupp.pExprInfo[i];
13,947✔
405
    for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
28,035✔
406
      SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
14,092✔
407
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
14,092!
408
        int32_t slotId = pFuncParam->pCol->slotId;
14,092✔
409
        int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
14,092✔
410
        if (slotId >= numOfCols) {
14,092✔
411
          code = taosArrayEnsureCap(pBlock->pDataBlock, slotId + 1);
6,230✔
412
          QUERY_CHECK_CODE(code, lino, _end);
6,226!
413

414
          for (int32_t k = numOfCols; k < slotId + 1; ++k) {
13,678✔
415
            void* tmp = taosArrayPush(pBlock->pDataBlock, &colInfo);
7,452✔
416
            QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
7,452!
417
          }
418
        }
419
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
×
420
        // do nothing
421
      }
422
    }
423
  }
424

425
  code = blockDataEnsureCapacity(pBlock, pBlock->info.rows);
6,140✔
426
  QUERY_CHECK_CODE(code, lino, _end);
6,145!
427

428
  for (int32_t i = 0; i < blockDataGetNumOfCols(pBlock); ++i) {
13,597✔
429
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
7,453✔
430
    QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
7,452!
431
    colDataSetNULL(pColInfoData, 0);
432
  }
433
  *ppBlock = pBlock;
6,144✔
434

435
_end:
6,144✔
436
  if (code != TSDB_CODE_SUCCESS) {
6,144!
437
    blockDataDestroy(pBlock);
×
438
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
439
  }
440
  return code;
6,143✔
441
}
442

443
void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock) {
655,966✔
444
  if (!blockAllocated) {
655,966✔
445
    return;
649,826✔
446
  }
447

448
  blockDataDestroy(*ppBlock);
6,140✔
449
  *ppBlock = NULL;
6,145✔
450
}
451

452
int32_t setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
669,494✔
453
  int32_t           code = TSDB_CODE_SUCCESS;
669,494✔
454
  SAggOperatorInfo* pAggInfo = pOperator->info;
669,494✔
455
  if (pAggInfo->groupId != UINT64_MAX && pAggInfo->groupId == groupId) {
669,494✔
456
    return code;
294,214✔
457
  }
458

459
  code = doSetTableGroupOutputBuf(pOperator, numOfOutput, groupId);
375,280✔
460

461
  // record the current active group id
462
  pAggInfo->groupId = groupId;
375,297✔
463
  return code;
375,297✔
464
}
465

466
int32_t doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
375,283✔
467
  // for simple group by query without interval, all the tables belong to one group result.
468
  int32_t           code = TSDB_CODE_SUCCESS;
375,283✔
469
  int32_t           lino = 0;
375,283✔
470
  SExecTaskInfo*    pTaskInfo = pOperator->pTaskInfo;
375,283✔
471
  SAggOperatorInfo* pAggInfo = pOperator->info;
375,283✔
472

473
  SResultRowInfo* pResultRowInfo = &pAggInfo->binfo.resultRowInfo;
375,283✔
474
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
375,283✔
475
  int32_t*        rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset;
375,283✔
476

477
  SResultRow* pResultRow =
478
      doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, (char*)&groupId, sizeof(groupId), true,
375,283✔
479
                             groupId, pTaskInfo, false, &pAggInfo->aggSup, true);
480
  if (pResultRow == NULL || pTaskInfo->code != 0) {
375,297!
481
    code = pTaskInfo->code;
×
482
    lino = __LINE__;
×
483
    goto _end;
×
484
  }
485
  /*
486
   * not assign result buffer yet, add new result buffer
487
   * all group belong to one result set, and each group result has different group id so set the id to be one
488
   */
489
  if (pResultRow->pageId == -1) {
375,297!
490
    code = addNewResultRowBuf(pResultRow, pAggInfo->aggSup.pResultBuf, pAggInfo->binfo.pRes->info.rowSize);
×
491
    QUERY_CHECK_CODE(code, lino, _end);
×
492
  }
493

494
  code = setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
375,297✔
495
  QUERY_CHECK_CODE(code, lino, _end);
375,296!
496

497
_end:
375,296✔
498
  if (code != TSDB_CODE_SUCCESS) {
375,296!
499
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
500
  }
501
  return code;
375,294✔
502
}
503

504
// a new buffer page for each table. Needs to opt this design
505
int32_t addNewResultRowBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, uint32_t size) {
×
506
  if (pWindowRes->pageId != -1) {
×
507
    return 0;
×
508
  }
509

510
  SFilePage* pData = NULL;
×
511

512
  // in the first scan, new space needed for results
513
  int32_t pageId = -1;
×
514
  SArray* list = getDataBufPagesIdList(pResultBuf);
×
515

516
  if (taosArrayGetSize(list) == 0) {
×
517
    pData = getNewBufPage(pResultBuf, &pageId);
×
518
    if (pData == NULL) {
×
519
      qError("failed to get buffer, code:%s", tstrerror(terrno));
×
520
      return terrno;
×
521
    }
522
    pData->num = sizeof(SFilePage);
×
523
  } else {
524
    SPageInfo* pi = getLastPageInfo(list);
×
525
    pData = getBufPage(pResultBuf, getPageId(pi));
×
526
    if (pData == NULL) {
×
527
      qError("failed to get buffer, code:%s", tstrerror(terrno));
×
528
      return terrno;
×
529
    }
530

531
    pageId = getPageId(pi);
×
532

533
    if (pData->num + size > getBufPageSize(pResultBuf)) {
×
534
      // release current page first, and prepare the next one
535
      releaseBufPageInfo(pResultBuf, pi);
×
536

537
      pData = getNewBufPage(pResultBuf, &pageId);
×
538
      if (pData == NULL) {
×
539
        qError("failed to get buffer, code:%s", tstrerror(terrno));
×
540
        return terrno;
×
541
      }
542
      pData->num = sizeof(SFilePage);
×
543
    }
544
  }
545

546
  if (pData == NULL) {
×
547
    return -1;
×
548
  }
549

550
  // set the number of rows in current disk page
551
  if (pWindowRes->pageId == -1) {  // not allocated yet, allocate new buffer
×
552
    pWindowRes->pageId = pageId;
×
553
    pWindowRes->offset = (int32_t)pData->num;
×
554

555
    pData->num += size;
×
556
  }
557

558
  return 0;
×
559
}
560

561
int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
3,980,182✔
562
                         const char* pKey) {
563
  int32_t code = 0;
3,980,182✔
564
  //  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
565

566
  pAggSup->currentPageId = -1;
3,980,182✔
567
  pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
3,980,182✔
568
  pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t));
3,985,253✔
569
  pAggSup->pResultRowHashTable = tSimpleHashInit(100, taosFastHash);
3,986,689✔
570

571
  if (pAggSup->keyBuf == NULL || pAggSup->pResultRowHashTable == NULL) {
3,982,462!
572
    return terrno;
×
573
  }
574

575
  uint32_t defaultPgsz = 0;
3,983,072✔
576
  uint32_t defaultBufsz = 0;
3,983,072✔
577
  code = getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz);
3,983,072✔
578
  if (code) {
3,985,312!
579
    qError("failed to get buff page size, rowSize:%d", pAggSup->resultRowSize);
×
580
    return code;
×
581
  }
582

583
  if (!osTempSpaceAvailable()) {
3,985,312!
584
    code = TSDB_CODE_NO_DISKSPACE;
×
585
    qError("Init stream agg supporter failed since %s, key:%s, tempDir:%s", tstrerror(code), pKey, tsTempDir);
×
586
    return code;
×
587
  }
588

589
  code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, tsTempDir);
3,984,016✔
590
  if (code != TSDB_CODE_SUCCESS) {
3,987,061✔
591
    qError("Create agg result buf failed since %s, %s", tstrerror(code), pKey);
1,825!
592
    return code;
×
593
  }
594

595
  return code;
3,985,236✔
596
}
597

598
void cleanupResultInfoInStream(SExecTaskInfo* pTaskInfo, void* pState, SExprSupp* pSup, SGroupResInfo* pGroupResInfo) {
1,431✔
599
  int32_t         code = TSDB_CODE_SUCCESS;
1,431✔
600
  SStorageAPI*    pAPI = &pTaskInfo->storageAPI;
1,431✔
601
  int32_t         numOfExprs = pSup->numOfExprs;
1,431✔
602
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
1,431✔
603
  SqlFunctionCtx* pCtx = pSup->pCtx;
1,431✔
604
  int32_t         numOfRows = getNumOfTotalRes(pGroupResInfo);
1,431✔
605
  bool            needCleanup = false;
1,431✔
606

607
  for (int32_t j = 0; j < numOfExprs; ++j) {
23,021✔
608
    needCleanup |= pCtx[j].needCleanup;
21,590✔
609
  }
610
  if (!needCleanup) {
1,431!
611
    return;
1,431✔
612
  }
613
  
614
  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
×
615
    SResultWindowInfo* pWinInfo = taosArrayGet(pGroupResInfo->pRows, i);
×
616
    SRowBuffPos*       pPos = pWinInfo->pStatePos;
×
617
    SResultRow*        pRow = NULL;
×
618

619
    code = pAPI->stateStore.streamStateGetByPos(pState, pPos, (void**)&pRow);
×
620
    if (TSDB_CODE_SUCCESS != code) {
×
621
      qError("failed to get state by pos, code:%s, %s", tstrerror(code), GET_TASKID(pTaskInfo));
×
622
      continue;
×
623
    }
624

625
    for (int32_t j = 0; j < numOfExprs; ++j) {
×
626
      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
×
627
      if (pCtx[j].fpSet.cleanup) {
×
628
        pCtx[j].fpSet.cleanup(&pCtx[j]);
×
629
      }
630
    }
631
  }
632
}
633

634
void cleanupResultInfoInGroupResInfo(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf* pBuf,
1,870,346✔
635
                                  SGroupResInfo* pGroupResInfo) {
636
  int32_t         numOfExprs = pSup->numOfExprs;
1,870,346✔
637
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
1,870,346✔
638
  SqlFunctionCtx* pCtx = pSup->pCtx;
1,870,346✔
639
  int32_t         numOfRows = getNumOfTotalRes(pGroupResInfo);
1,870,346✔
640
  bool            needCleanup = false;
1,870,303✔
641

642
  for (int32_t j = 0; j < numOfExprs; ++j) {
5,946,739✔
643
    needCleanup |= pCtx[j].needCleanup;
4,076,436✔
644
  }
645
  if (!needCleanup) {
1,870,303✔
646
    return;
1,855,003✔
647
  }
648

649
  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
15,300!
650
    SResultRow*        pRow = NULL;
×
651
    SResKeyPos*        pPos = taosArrayGetP(pGroupResInfo->pRows, i);
×
652
    SFilePage*         page = getBufPage(pBuf, pPos->pos.pageId);
×
653
    if (page == NULL) {
×
654
      qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
×
655
      continue;
×
656
    }
657
    pRow = (SResultRow*)((char*)page + pPos->pos.offset);
×
658

659

660
    for (int32_t j = 0; j < numOfExprs; ++j) {
×
661
      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
×
662
      if (pCtx[j].fpSet.cleanup) {
×
663
        pCtx[j].fpSet.cleanup(&pCtx[j]);
×
664
      }
665
    }
666
    releaseBufPage(pBuf, page);
×
667
  }
668
}
669

670
void cleanupResultInfoInHashMap(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf* pBuf,
585,052✔
671
                       SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap) {
672
  int32_t         numOfExprs = pSup->numOfExprs;
585,052✔
673
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
585,052✔
674
  SqlFunctionCtx* pCtx = pSup->pCtx;
585,052✔
675
  bool            needCleanup = false;
585,052✔
676
  for (int32_t j = 0; j < numOfExprs; ++j) {
1,895,389✔
677
    needCleanup |= pCtx[j].needCleanup;
1,310,337✔
678
  }
679
  if (!needCleanup) {
585,052✔
680
    return;
583,675✔
681
  }
682

683
  // begin from last iter
684
  void*   pData = pGroupResInfo->dataPos;
1,377✔
685
  int32_t iter = pGroupResInfo->iter;
1,377✔
686
  while ((pData = tSimpleHashIterate(pHashmap, pData, &iter)) != NULL) {
1,377!
687
    SResultRowPosition* pos = pData;
×
688

689
    SFilePage* page = getBufPage(pBuf, pos->pageId);
×
690
    if (page == NULL) {
×
691
      qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
×
692
      continue;
×
693
    }
694

695
    SResultRow* pRow = (SResultRow*)((char*)page + pos->offset);
×
696

697
    for (int32_t j = 0; j < numOfExprs; ++j) {
×
698
      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
×
699
      if (pCtx[j].fpSet.cleanup) {
×
700
        pCtx[j].fpSet.cleanup(&pCtx[j]);
×
701
      }
702
    }
703

704
    releaseBufPage(pBuf, page);
×
705
  }
706
}
707

708
void cleanupResultInfo(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SGroupResInfo* pGroupResInfo,
2,455,373✔
709
                       SAggSupporter *pAggSup, bool cleanGroupResInfo) {
710
  if (cleanGroupResInfo) {
2,455,373✔
711
    cleanupResultInfoInGroupResInfo(pTaskInfo, pSup, pAggSup->pResultBuf, pGroupResInfo);
1,870,376✔
712
  } else {
713
    cleanupResultInfoInHashMap(pTaskInfo, pSup, pAggSup->pResultBuf, pGroupResInfo, pAggSup->pResultRowHashTable);
584,997✔
714
  }
715
}
2,455,375✔
716
void cleanupAggSup(SAggSupporter* pAggSup) {
3,987,105✔
717
  taosMemoryFreeClear(pAggSup->keyBuf);
3,987,105!
718
  tSimpleHashCleanup(pAggSup->pResultRowHashTable);
3,987,321✔
719
  destroyDiskbasedBuf(pAggSup->pResultBuf);
3,987,388✔
720
}
3,987,364✔
721

722
int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
3,983,846✔
723
                   const char* pkey, void* pState, SFunctionStateStore* pStore) {
724
  int32_t code = initExprSupp(pSup, pExprInfo, numOfCols, pStore);
3,983,846✔
725
  if (code != TSDB_CODE_SUCCESS) {
3,985,009!
726
    return code;
×
727
  }
728

729
  code = doInitAggInfoSup(pAggSup, pSup->pCtx, numOfCols, keyBufSize, pkey);
3,985,009✔
730
  if (code != TSDB_CODE_SUCCESS) {
3,985,887!
731
    return code;
×
732
  }
733

734
  for (int32_t i = 0; i < numOfCols; ++i) {
19,272,289✔
735
    pSup->pCtx[i].hasWindowOrGroup = pSup->hasWindowOrGroup;
15,286,402✔
736
    if (pState) {
15,286,402✔
737
      pSup->pCtx[i].saveHandle.pBuf = NULL;
51,827✔
738
      pSup->pCtx[i].saveHandle.pState = pState;
51,827✔
739
      pSup->pCtx[i].exprIdx = i;
51,827✔
740
    } else {
741
      pSup->pCtx[i].saveHandle.pBuf = pAggSup->pResultBuf;
15,234,575✔
742
    }
743
  }
744

745
  return TSDB_CODE_SUCCESS;
3,985,887✔
746
}
747

748
int32_t applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData,
362,664,715✔
749
                                        int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput) {
750
  int32_t code = TSDB_CODE_SUCCESS;
362,664,715✔
751
  for (int32_t k = 0; k < numOfOutput; ++k) {
1,116,035,880✔
752
    // keep it temporarily
753
    SFunctionCtxStatus status = {0};
753,910,910✔
754
    functionCtxSave(&pCtx[k], &status);
753,910,910✔
755

756
    pCtx[k].input.startRowIndex = offset;
753,977,073✔
757
    pCtx[k].input.numOfRows = forwardStep;
753,977,073✔
758

759
    // not a whole block involved in query processing, statistics data can not be used
760
    // NOTE: the original value of isSet have been changed here
761
    if (pCtx[k].input.colDataSMAIsSet && forwardStep < numOfTotal) {
753,977,073!
762
      pCtx[k].input.colDataSMAIsSet = false;
×
763
    }
764

765
    if (pCtx[k].isPseudoFunc) {
753,977,073✔
766
      SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[k]);
150,148,930✔
767

768
      char* p = GET_ROWCELL_INTERBUF(pEntryInfo);
150,148,930✔
769

770
      SColumnInfoData idata = {0};
150,148,930✔
771
      idata.info.type = TSDB_DATA_TYPE_BIGINT;
150,148,930✔
772
      idata.info.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes;
150,148,930✔
773
      idata.pData = p;
150,148,930✔
774

775
      SScalarParam out = {.columnData = &idata};
150,148,930✔
776
      SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData};
150,148,930✔
777
      code = pCtx[k].sfp.process(&tw, 1, &out);
150,148,930✔
778
      if (code != TSDB_CODE_SUCCESS) {
150,625,215!
UNCOV
779
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
780
        taskInfo->code = code;
×
781
        return code;
×
782
      }
783
      pEntryInfo->numOfRes = 1;
150,637,857✔
784
    } else {
785
      if (functionNeedToExecute(&pCtx[k]) && pCtx[k].fpSet.process != NULL) {
603,828,143✔
786
        if ((&pCtx[k])->input.pData[0] == NULL) {
424,418,078!
787
          code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
788
          qError("%s apply functions error, input data is NULL.", GET_TASKID(taskInfo));
×
789
        } else {
790
          code = pCtx[k].fpSet.process(&pCtx[k]);
424,418,078✔
791
        }
792

793
        if (code != TSDB_CODE_SUCCESS) {
424,502,275!
794
          if (pCtx[k].fpSet.cleanup != NULL) {
×
795
            pCtx[k].fpSet.cleanup(&pCtx[k]);
×
796
          }
797
          qError("%s apply functions error, code: %s", GET_TASKID(taskInfo), tstrerror(code));
×
798
          taskInfo->code = code;
×
799
          return code;
×
800
        }
801
      }
802

803
      // restore it
804
      functionCtxRestore(&pCtx[k], &status);
605,440,411✔
805
    }
806
  }
807
  return code;
362,124,970✔
808
}
809

810
void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
753,673,063✔
811
  pStatus->hasAgg = pCtx->input.colDataSMAIsSet;
753,673,063✔
812
  pStatus->numOfRows = pCtx->input.numOfRows;
753,673,063✔
813
  pStatus->startOffset = pCtx->input.startRowIndex;
753,673,063✔
814
}
753,673,063✔
815

816
void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
605,036,693✔
817
  pCtx->input.colDataSMAIsSet = pStatus->hasAgg;
605,036,693✔
818
  pCtx->input.numOfRows = pStatus->numOfRows;
605,036,693✔
819
  pCtx->input.startRowIndex = pStatus->startOffset;
605,036,693✔
820
}
605,036,693✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc