• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3529

14 Nov 2024 01:56PM UTC coverage: 60.888% (-0.02%) from 60.905%
#3529

push

travis-ci

web-flow
Merge pull request #28764 from taosdata/docs/TS-4937

doc(arch/last): new section for last/last_row cache

119990 of 252020 branches covered (47.61%)

Branch coverage included in aggregate %.

200800 of 274829 relevant lines covered (73.06%)

15624555.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.32
/source/libs/executor/src/aggregateoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "filter.h"
17
#include "function.h"
18
#include "os.h"
19
#include "querynodes.h"
20
#include "tfill.h"
21
#include "tname.h"
22

23
#include "executorInt.h"
24
#include "index.h"
25
#include "operator.h"
26
#include "query.h"
27
#include "querytask.h"
28
#include "tcompare.h"
29
#include "tdatablock.h"
30
#include "tglobal.h"
31
#include "thash.h"
32
#include "ttypes.h"
33

34
typedef struct {
35
  bool    hasAgg;
36
  int32_t numOfRows;
37
  int32_t startOffset;
38
} SFunctionCtxStatus;
39

40
typedef struct SAggOperatorInfo {
41
  SOptrBasicInfo   binfo;
42
  SAggSupporter    aggSup;
43
  STableQueryInfo* current;
44
  uint64_t         groupId;
45
  SGroupResInfo    groupResInfo;
46
  SExprSupp        scalarExprSup;
47
  bool             groupKeyOptimized;
48
  bool             hasValidBlock;
49
  SSDataBlock*     pNewGroupBlock;
50
  bool             hasCountFunc;
51
  SOperatorInfo*   pOperator;
52
  bool             cleanGroupResInfo;
53
} SAggOperatorInfo;
54

55
static void destroyAggOperatorInfo(void* param);
56
static int32_t setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
57

58
static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock);
59
static void    destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock);
60

61
static int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx);
62
static int32_t getAggregateResultNext(SOperatorInfo* pOperator, SSDataBlock** ppRes);
63
static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
64
                                const char* pKey);
65

66
static int32_t addNewResultRowBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, uint32_t size);
67

68
static int32_t doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
69

70
static void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus);
71
static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus);
72

73
int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo,
826,293✔
74
                                    SOperatorInfo** pOptrInfo) {
75
  QRY_PARAM_CHECK(pOptrInfo);
826,293!
76

77
  int32_t    lino = 0;
826,293✔
78
  int32_t    code = 0;
826,293✔
79
  int32_t    num = 0;
826,293✔
80
  SExprInfo* pExprInfo = NULL;
826,293✔
81
  int32_t    numOfScalarExpr = 0;
826,293✔
82
  SExprInfo* pScalarExprInfo = NULL;
826,293✔
83

84
  SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo));
826,293✔
85
  SOperatorInfo*    pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
826,726✔
86
  if (pInfo == NULL || pOperator == NULL) {
826,723!
87
    code = terrno;
×
88
    goto _error;
×
89
  }
90

91
  pOperator->exprSupp.hasWindowOrGroup = false;
826,736✔
92

93
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pAggNode->node.pOutputDataBlockDesc);
826,736✔
94
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
826,994!
95
  initBasicInfo(&pInfo->binfo, pResBlock);
826,994✔
96

97
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
826,874✔
98
  initResultSizeInfo(&pOperator->resultInfo, 4096);
826,874✔
99

100
  code = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &pExprInfo, &num);
826,870✔
101
  TSDB_CHECK_CODE(code, lino, _error);
826,694!
102

103
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
826,694✔
104
                               pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
826,694✔
105
  TSDB_CHECK_CODE(code, lino, _error);
826,786!
106

107
  if (pAggNode->pExprs != NULL) {
826,786✔
108
    code = createExprInfo(pAggNode->pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr);
137,300✔
109
    TSDB_CHECK_CODE(code, lino, _error);
137,305!
110
  }
111

112
  code = initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
826,791✔
113
  TSDB_CHECK_CODE(code, lino, _error);
826,687!
114

115
  code = filterInitFromNode((SNode*)pAggNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
826,687✔
116
  TSDB_CHECK_CODE(code, lino, _error);
826,467!
117

118
  pInfo->binfo.mergeResultBlock = pAggNode->mergeDataBlock;
826,467✔
119
  pInfo->groupKeyOptimized = pAggNode->groupKeyOptimized;
826,467✔
120
  pInfo->groupId = UINT64_MAX;
826,467✔
121
  pInfo->binfo.inputTsOrder = pAggNode->node.inputTsOrder;
826,467✔
122
  pInfo->binfo.outputTsOrder = pAggNode->node.outputTsOrder;
826,467✔
123
  pInfo->hasCountFunc = pAggNode->hasCountLikeFunc;
826,467✔
124
  pInfo->pOperator = pOperator;
826,467✔
125
  pInfo->cleanGroupResInfo = false;
826,467✔
126

127
  setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG,
826,467✔
128
                  !pAggNode->node.forceCreateNonBlockingOptr, OP_NOT_OPENED, pInfo, pTaskInfo);
826,467✔
129
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, getAggregateResultNext, NULL, destroyAggOperatorInfo,
826,544✔
130
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
131

132
  if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
826,305✔
133
    STableScanInfo* pTableScanInfo = downstream->info;
556,953✔
134
    pTableScanInfo->base.pdInfo.pExprSup = &pOperator->exprSupp;
556,953✔
135
    pTableScanInfo->base.pdInfo.pAggSup = &pInfo->aggSup;
556,953✔
136
  }
137

138
  code = appendDownstream(pOperator, &downstream, 1);
826,305✔
139
  if (code != TSDB_CODE_SUCCESS) {
826,827!
140
    goto _error;
×
141
  }
142

143
  *pOptrInfo = pOperator;
826,827✔
144
  return TSDB_CODE_SUCCESS;
826,827✔
145

146
_error:
×
147
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
148
  if (pInfo != NULL) {
×
149
    destroyAggOperatorInfo(pInfo);
×
150
  }
151
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
152
  pTaskInfo->code = code;
×
153
  return code;
×
154
}
155

156
void destroyAggOperatorInfo(void* param) {
827,049✔
157
  if (param == NULL) {
827,049!
158
    return;
×
159
  }
160
  SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param;
827,049✔
161
  cleanupBasicInfo(&pInfo->binfo);
827,049✔
162

163
  if (pInfo->pOperator) {
827,095✔
164
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
827,094✔
165
                      pInfo->cleanGroupResInfo);
827,094✔
166
    pInfo->pOperator = NULL;
827,082✔
167
  }
168
  cleanupAggSup(&pInfo->aggSup);
827,083✔
169
  cleanupExprSupp(&pInfo->scalarExprSup);
827,097✔
170
  cleanupGroupResInfo(&pInfo->groupResInfo);
827,093✔
171
  taosMemoryFreeClear(param);
827,102✔
172
}
173

174
/**
175
 * @brief get blocks from downstream and fill results into groupedRes after aggragation
176
 * @retval false if no more groups
177
 * @retval true if there could have new groups coming
178
 * @note if pOperator.blocking is true, scan all blocks from downstream, all groups are handled
179
 *       if false, fill results of ONE GROUP
180
 * */
181
static bool nextGroupedResult(SOperatorInfo* pOperator) {
953,749✔
182
  int32_t           code = TSDB_CODE_SUCCESS;
953,749✔
183
  int32_t           lino = 0;
953,749✔
184
  SExecTaskInfo*    pTaskInfo = pOperator->pTaskInfo;
953,749✔
185
  SAggOperatorInfo* pAggInfo = pOperator->info;
953,749✔
186

187
  if (pOperator->blocking && pAggInfo->hasValidBlock) {
953,749✔
188
    return false;
78,282✔
189
  }
190

191
  SExprSupp*   pSup = &pOperator->exprSupp;
875,467✔
192
  int64_t      st = taosGetTimestampUs();
876,710✔
193
  int32_t      order = pAggInfo->binfo.inputTsOrder;
876,710✔
194
  SSDataBlock* pBlock = pAggInfo->pNewGroupBlock;
876,710✔
195

196
  pAggInfo->cleanGroupResInfo = false;
876,710✔
197
  if (pBlock) {
876,710✔
198
    pAggInfo->pNewGroupBlock = NULL;
49,888✔
199
    tSimpleHashClear(pAggInfo->aggSup.pResultRowHashTable);
49,888✔
200
    code = setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId);
49,888✔
201
    QUERY_CHECK_CODE(code, lino, _end);
49,888!
202
    code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
49,888✔
203
    QUERY_CHECK_CODE(code, lino, _end);
49,888!
204

205
    code = doAggregateImpl(pOperator, pSup->pCtx);
49,888✔
206
    QUERY_CHECK_CODE(code, lino, _end);
49,888!
207
  }
208
  while (1) {
2,392,366✔
209
    bool blockAllocated = false;
3,269,076✔
210
    pBlock = getNextBlockFromDownstream(pOperator, 0);
3,269,076✔
211
    if (pBlock == NULL) {
3,268,637✔
212
      if (!pAggInfo->hasValidBlock) {
846,802✔
213
        code = createDataBlockForEmptyInput(pOperator, &pBlock);
146,116✔
214
        QUERY_CHECK_CODE(code, lino, _end);
146,117!
215

216
        if (pBlock == NULL) {
146,117✔
217
          break;
126,135✔
218
        }
219
        blockAllocated = true;
19,982✔
220
      } else {
221
        break;
700,686✔
222
      }
223
    }
224
    pAggInfo->hasValidBlock = true;
2,441,817✔
225
    pAggInfo->binfo.pRes->info.scanFlag = pBlock->info.scanFlag;
2,441,817✔
226

227
    // there is an scalar expression that needs to be calculated before apply the group aggregation.
228
    if (pAggInfo->scalarExprSup.pExprInfo != NULL && !blockAllocated) {
2,441,817✔
229
      SExprSupp* pSup1 = &pAggInfo->scalarExprSup;
639,612✔
230
      code = projectApplyFunctions(pSup1->pExprInfo, pBlock, pBlock, pSup1->pCtx, pSup1->numOfExprs, NULL);
639,612✔
231
      if (code != TSDB_CODE_SUCCESS) {
639,965!
232
        destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
×
233
        T_LONG_JMP(pTaskInfo->env, code);
×
234
      }
235
    }
236
    // if non-blocking mode and new group arrived, save the block and break
237
    if (!pOperator->blocking && pAggInfo->groupId != UINT64_MAX && pBlock->info.id.groupId != pAggInfo->groupId) {
2,442,170✔
238
      pAggInfo->pNewGroupBlock = pBlock;
49,940✔
239
      break;
49,940✔
240
    }
241
    // the pDataBlock are always the same one, no need to call this again
242
    code = setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId);
2,392,230✔
243
    if (code != TSDB_CODE_SUCCESS) {
2,392,540✔
244
      destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
26✔
245
      T_LONG_JMP(pTaskInfo->env, code);
26!
246
    }
247
    code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
2,392,514✔
248
    if (code != TSDB_CODE_SUCCESS) {
2,392,693!
249
      destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
×
250
      T_LONG_JMP(pTaskInfo->env, code);
×
251
    }
252

253
    code = doAggregateImpl(pOperator, pSup->pCtx);
2,392,693✔
254
    if (code != TSDB_CODE_SUCCESS) {
2,392,222!
255
      destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
×
256
      T_LONG_JMP(pTaskInfo->env, code);
×
257
    }
258

259
    destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
2,392,222✔
260
  }
261

262
  // the downstream operator may return with error code, so let's check the code before generating results.
263
  if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
876,761!
264
    T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
265
  }
266

267
  code = initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0);
876,761✔
268
  QUERY_CHECK_CODE(code, lino, _end);
876,850!
269
  pAggInfo->cleanGroupResInfo = true;
876,850✔
270

271
_end:
876,850✔
272
  if (code != TSDB_CODE_SUCCESS) {
876,850!
273
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
274
    pTaskInfo->code = code;
×
275
    T_LONG_JMP(pTaskInfo->env, code);
×
276
  }
277
  return pBlock != NULL;
876,850✔
278
}
279

280
int32_t getAggregateResultNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1,630,666✔
281
  int32_t           code = TSDB_CODE_SUCCESS;
1,630,666✔
282
  int32_t           lino = 0;
1,630,666✔
283
  SAggOperatorInfo* pAggInfo = pOperator->info;
1,630,666✔
284
  SOptrBasicInfo*   pInfo = &pAggInfo->binfo;
1,630,666✔
285

286
  if (pOperator->status == OP_EXEC_DONE) {
1,630,666✔
287
    (*ppRes) = NULL;
676,875✔
288
    return code;
676,875✔
289
  }
290

291
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
953,791✔
292
  bool           hasNewGroups = false;
953,791✔
293
  do {
294
    hasNewGroups = nextGroupedResult(pOperator);
953,819✔
295
    code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
955,018✔
296
    QUERY_CHECK_CODE(code, lino, _end);
955,175!
297

298
    while (1) {
299
      doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf);
955,175✔
300
      code = doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
954,985✔
301
      QUERY_CHECK_CODE(code, lino, _end);
954,916!
302

303
      if (!hasRemainResults(&pAggInfo->groupResInfo)) {
954,916✔
304
        if (!hasNewGroups) setOperatorCompleted(pOperator);
876,721✔
305
        break;
876,841✔
306
      }
307

308
      if (pInfo->pRes->info.rows > 0) {
78,221!
309
        break;
78,221✔
310
      }
311
    }
312
  } while (pInfo->pRes->info.rows == 0 && hasNewGroups);
955,062✔
313

314
  size_t rows = blockDataGetNumOfRows(pInfo->pRes);
955,034✔
315
  pOperator->resultInfo.totalRows += rows;
954,907✔
316

317
_end:
954,907✔
318
  if (code != TSDB_CODE_SUCCESS) {
954,907!
319
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
320
    pTaskInfo->code = code;
×
321
    T_LONG_JMP(pTaskInfo->env, code);
×
322
  }
323

324
  (*ppRes) = (rows == 0) ? NULL : pInfo->pRes;
954,907✔
325
  return code;
954,907✔
326
}
327

328
static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
×
329
  SSDataBlock* pRes = NULL;
×
330
  int32_t code = getAggregateResultNext(pOperator, &pRes);
×
331
  return pRes;
×
332
}
333

334
int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) {
2,442,126✔
335
  int32_t code = TSDB_CODE_SUCCESS;
2,442,126✔
336
  for (int32_t k = 0; k < pOperator->exprSupp.numOfExprs; ++k) {
11,225,299✔
337
    if (functionNeedToExecute(&pCtx[k])) {
8,784,525✔
338
      // todo add a dummy function to avoid process check
339
      if (pCtx[k].fpSet.process == NULL) {
8,773,454✔
340
        continue;
209,798✔
341
      }
342

343
      if ((&pCtx[k])->input.pData[0] == NULL) {
8,563,656!
344
        code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
345
        qError("%s aggregate function error happens, input data is NULL.", GET_TASKID(pOperator->pTaskInfo));
×
346
      } else {
347
        code = pCtx[k].fpSet.process(&pCtx[k]);
8,563,656✔
348
      }
349

350
      if (code != TSDB_CODE_SUCCESS) {
8,564,409!
351
        if (pCtx[k].fpSet.cleanup != NULL) {
×
352
          pCtx[k].fpSet.cleanup(&pCtx[k]);
×
353
        }
354
        qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code));
×
355
        return code;
×
356
      }
357
    }
358
  }
359

360
  return TSDB_CODE_SUCCESS;
2,440,774✔
361
}
362

363
static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock) {
146,108✔
364
  int32_t code = TSDB_CODE_SUCCESS;
146,108✔
365
  int32_t lino = 0;
146,108✔
366
  SSDataBlock* pBlock = NULL;
146,108✔
367
  if (!tsCountAlwaysReturnValue) {
146,108✔
368
    return TSDB_CODE_SUCCESS;
48,436✔
369
  }
370

371
  SAggOperatorInfo* pAggInfo = pOperator->info;
97,672✔
372
  if (pAggInfo->groupKeyOptimized) {
97,672✔
373
    return TSDB_CODE_SUCCESS;
42,610✔
374
  }
375

376
  SOperatorInfo* downstream = pOperator->pDownstream[0];
55,062✔
377
  if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_PARTITION ||
55,062✔
378
      downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_SORT ||
54,751✔
379
      (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN &&
54,525✔
380
       ((STableScanInfo*)downstream->info)->hasGroupByTag == true)) {
32,068✔
381
    return TSDB_CODE_SUCCESS;
549✔
382
  }
383

384
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
54,513✔
385

386
  if (!pAggInfo->hasCountFunc) {
54,513✔
387
    return TSDB_CODE_SUCCESS;
34,540✔
388
  }
389

390
  code = createDataBlock(&pBlock);
19,973✔
391
  if (code) {
19,985!
392
    return code;
×
393
  }
394

395
  pBlock->info.rows = 1;
19,985✔
396
  pBlock->info.capacity = 0;
19,985✔
397

398
  for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
50,213✔
399
    SColumnInfoData colInfo = {0};
30,229✔
400
    colInfo.hasNull = true;
30,229✔
401
    colInfo.info.type = TSDB_DATA_TYPE_NULL;
30,229✔
402
    colInfo.info.bytes = 1;
30,229✔
403

404
    SExprInfo* pOneExpr = &pOperator->exprSupp.pExprInfo[i];
30,229✔
405
    for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
60,888✔
406
      SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
30,660✔
407
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
30,660✔
408
        int32_t slotId = pFuncParam->pCol->slotId;
30,615✔
409
        int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
30,615✔
410
        if (slotId >= numOfCols) {
30,616✔
411
          code = taosArrayEnsureCap(pBlock->pDataBlock, slotId + 1);
20,794✔
412
          QUERY_CHECK_CODE(code, lino, _end);
20,792!
413

414
          for (int32_t k = numOfCols; k < slotId + 1; ++k) {
46,487✔
415
            void* tmp = taosArrayPush(pBlock->pDataBlock, &colInfo);
25,695✔
416
            QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
25,695!
417
          }
418
        }
419
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
45✔
420
        // do nothing
421
      }
422
    }
423
  }
424

425
  code = blockDataEnsureCapacity(pBlock, pBlock->info.rows);
19,984✔
426
  QUERY_CHECK_CODE(code, lino, _end);
19,987!
427

428
  for (int32_t i = 0; i < blockDataGetNumOfCols(pBlock); ++i) {
45,682✔
429
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
25,692✔
430
    QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
25,695!
431
    colDataSetNULL(pColInfoData, 0);
432
  }
433
  *ppBlock = pBlock;
19,986✔
434

435
_end:
19,986✔
436
  if (code != TSDB_CODE_SUCCESS) {
19,986!
437
    blockDataDestroy(pBlock);
×
438
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
439
  }
440
  return code;
19,984✔
441
}
442

443
void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock) {
2,392,196✔
444
  if (!blockAllocated) {
2,392,196✔
445
    return;
2,372,354✔
446
  }
447

448
  blockDataDestroy(*ppBlock);
19,842✔
449
  *ppBlock = NULL;
19,987✔
450
}
451

452
int32_t setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
2,442,299✔
453
  int32_t           code = TSDB_CODE_SUCCESS;
2,442,299✔
454
  SAggOperatorInfo* pAggInfo = pOperator->info;
2,442,299✔
455
  if (pAggInfo->groupId != UINT64_MAX && pAggInfo->groupId == groupId) {
2,442,299✔
456
    return code;
868,650✔
457
  }
458

459
  code = doSetTableGroupOutputBuf(pOperator, numOfOutput, groupId);
1,573,649✔
460

461
  // record the current active group id
462
  pAggInfo->groupId = groupId;
1,574,022✔
463
  return code;
1,574,022✔
464
}
465

466
int32_t doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
1,574,488✔
467
  // for simple group by query without interval, all the tables belong to one group result.
468
  int32_t           code = TSDB_CODE_SUCCESS;
1,574,488✔
469
  int32_t           lino = 0;
1,574,488✔
470
  SExecTaskInfo*    pTaskInfo = pOperator->pTaskInfo;
1,574,488✔
471
  SAggOperatorInfo* pAggInfo = pOperator->info;
1,574,488✔
472

473
  SResultRowInfo* pResultRowInfo = &pAggInfo->binfo.resultRowInfo;
1,574,488✔
474
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
1,574,488✔
475
  int32_t*        rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset;
1,574,488✔
476

477
  SResultRow* pResultRow =
478
      doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, (char*)&groupId, sizeof(groupId), true,
1,574,488✔
479
                             groupId, pTaskInfo, false, &pAggInfo->aggSup, true);
480
  if (pResultRow == NULL || pTaskInfo->code != 0) {
1,574,527!
481
    code = pTaskInfo->code;
×
482
    lino = __LINE__;
×
483
    goto _end;
×
484
  }
485
  /*
486
   * not assign result buffer yet, add new result buffer
487
   * all group belong to one result set, and each group result has different group id so set the id to be one
488
   */
489
  if (pResultRow->pageId == -1) {
1,574,551!
490
    code = addNewResultRowBuf(pResultRow, pAggInfo->aggSup.pResultBuf, pAggInfo->binfo.pRes->info.rowSize);
×
491
    QUERY_CHECK_CODE(code, lino, _end);
×
492
  }
493

494
  code = setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
1,574,551✔
495
  QUERY_CHECK_CODE(code, lino, _end);
1,573,943✔
496

497
_end:
1,573,917✔
498
  if (code != TSDB_CODE_SUCCESS) {
1,573,919✔
499
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
26!
500
  }
501
  return code;
1,573,959✔
502
}
503

504
// a new buffer page for each table. Needs to opt this design
505
int32_t addNewResultRowBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, uint32_t size) {
×
506
  if (pWindowRes->pageId != -1) {
×
507
    return 0;
×
508
  }
509

510
  SFilePage* pData = NULL;
×
511

512
  // in the first scan, new space needed for results
513
  int32_t pageId = -1;
×
514
  SArray* list = getDataBufPagesIdList(pResultBuf);
×
515

516
  if (taosArrayGetSize(list) == 0) {
×
517
    pData = getNewBufPage(pResultBuf, &pageId);
×
518
    if (pData == NULL) {
×
519
      qError("failed to get buffer, code:%s", tstrerror(terrno));
×
520
      return terrno;
×
521
    }
522
    pData->num = sizeof(SFilePage);
×
523
  } else {
524
    SPageInfo* pi = getLastPageInfo(list);
×
525
    pData = getBufPage(pResultBuf, getPageId(pi));
×
526
    if (pData == NULL) {
×
527
      qError("failed to get buffer, code:%s", tstrerror(terrno));
×
528
      return terrno;
×
529
    }
530

531
    pageId = getPageId(pi);
×
532

533
    if (pData->num + size > getBufPageSize(pResultBuf)) {
×
534
      // release current page first, and prepare the next one
535
      releaseBufPageInfo(pResultBuf, pi);
×
536

537
      pData = getNewBufPage(pResultBuf, &pageId);
×
538
      if (pData == NULL) {
×
539
        qError("failed to get buffer, code:%s", tstrerror(terrno));
×
540
        return terrno;
×
541
      }
542
      pData->num = sizeof(SFilePage);
×
543
    }
544
  }
545

546
  if (pData == NULL) {
×
547
    return -1;
×
548
  }
549

550
  // set the number of rows in current disk page
551
  if (pWindowRes->pageId == -1) {  // not allocated yet, allocate new buffer
×
552
    pWindowRes->pageId = pageId;
×
553
    pWindowRes->offset = (int32_t)pData->num;
×
554

555
    pData->num += size;
×
556
  }
557

558
  return 0;
×
559
}
560

561
int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
5,094,373✔
562
                         const char* pKey) {
563
  int32_t code = 0;
5,094,373✔
564
  //  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
565

566
  pAggSup->currentPageId = -1;
5,094,373✔
567
  pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
5,094,373✔
568
  pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t));
5,099,316✔
569
  pAggSup->pResultRowHashTable = tSimpleHashInit(100, taosFastHash);
5,100,893✔
570

571
  if (pAggSup->keyBuf == NULL || pAggSup->pResultRowHashTable == NULL) {
5,096,494!
572
    return terrno;
×
573
  }
574

575
  uint32_t defaultPgsz = 0;
5,097,039✔
576
  uint32_t defaultBufsz = 0;
5,097,039✔
577
  code = getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz);
5,097,039✔
578
  if (code) {
5,098,972!
579
    qError("failed to get buff page size, rowSize:%d", pAggSup->resultRowSize);
×
580
    return code;
×
581
  }
582

583
  if (!osTempSpaceAvailable()) {
5,098,972!
584
    code = TSDB_CODE_NO_DISKSPACE;
×
585
    qError("Init stream agg supporter failed since %s, key:%s, tempDir:%s", tstrerror(code), pKey, tsTempDir);
×
586
    return code;
×
587
  }
588

589
  code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, tsTempDir);
5,097,320✔
590
  if (code != TSDB_CODE_SUCCESS) {
5,100,848✔
591
    qError("Create agg result buf failed since %s, %s", tstrerror(code), pKey);
458!
592
    return code;
×
593
  }
594

595
  return code;
5,100,390✔
596
}
597

598
void cleanupResultInfoInStream(SExecTaskInfo* pTaskInfo, void* pState, SExprSupp* pSup, SGroupResInfo* pGroupResInfo) {
1,712✔
599
  int32_t         code = TSDB_CODE_SUCCESS;
1,712✔
600
  SStorageAPI*    pAPI = &pTaskInfo->storageAPI;
1,712✔
601
  int32_t         numOfExprs = pSup->numOfExprs;
1,712✔
602
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
1,712✔
603
  SqlFunctionCtx* pCtx = pSup->pCtx;
1,712✔
604
  int32_t         numOfRows = getNumOfTotalRes(pGroupResInfo);
1,712✔
605
  bool            needCleanup = false;
1,712✔
606

607
  for (int32_t j = 0; j < numOfExprs; ++j) {
28,166✔
608
    needCleanup |= pCtx[j].needCleanup;
26,454✔
609
  }
610
  if (!needCleanup) {
1,712!
611
    return;
1,712✔
612
  }
613
  
614
  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
×
615
    SResultWindowInfo* pWinInfo = taosArrayGet(pGroupResInfo->pRows, i);
×
616
    SRowBuffPos*       pPos = pWinInfo->pStatePos;
×
617
    SResultRow*        pRow = NULL;
×
618

619
    code = pAPI->stateStore.streamStateGetByPos(pState, pPos, (void**)&pRow);
×
620
    if (TSDB_CODE_SUCCESS != code) {
×
621
      qError("failed to get state by pos, code:%s, %s", tstrerror(code), GET_TASKID(pTaskInfo));
×
622
      continue;
×
623
    }
624

625
    for (int32_t j = 0; j < numOfExprs; ++j) {
×
626
      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
×
627
      if (pCtx[j].fpSet.cleanup) {
×
628
        pCtx[j].fpSet.cleanup(&pCtx[j]);
×
629
      }
630
    }
631
  }
632
}
633

634
void cleanupResultInfoInGroupResInfo(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf* pBuf,
2,327,699✔
635
                                  SGroupResInfo* pGroupResInfo) {
636
  int32_t         numOfExprs = pSup->numOfExprs;
2,327,699✔
637
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
2,327,699✔
638
  SqlFunctionCtx* pCtx = pSup->pCtx;
2,327,699✔
639
  int32_t         numOfRows = getNumOfTotalRes(pGroupResInfo);
2,327,699✔
640
  bool            needCleanup = false;
2,327,709✔
641

642
  for (int32_t j = 0; j < numOfExprs; ++j) {
7,990,141✔
643
    needCleanup |= pCtx[j].needCleanup;
5,662,432✔
644
  }
645
  if (!needCleanup) {
2,327,709✔
646
    return;
2,308,519✔
647
  }
648

649
  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
19,190!
650
    SResultRow*        pRow = NULL;
×
651
    SResKeyPos*        pPos = taosArrayGetP(pGroupResInfo->pRows, i);
×
652
    SFilePage*         page = getBufPage(pBuf, pPos->pos.pageId);
×
653
    if (page == NULL) {
×
654
      qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
×
655
      continue;
×
656
    }
657
    pRow = (SResultRow*)((char*)page + pPos->pos.offset);
×
658

659

660
    for (int32_t j = 0; j < numOfExprs; ++j) {
×
661
      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
×
662
      if (pCtx[j].fpSet.cleanup) {
×
663
        pCtx[j].fpSet.cleanup(&pCtx[j]);
×
664
      }
665
    }
666
    releaseBufPage(pBuf, page);
×
667
  }
668
}
669

670
void cleanupResultInfoInHashMap(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf* pBuf,
795,623✔
671
                       SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap) {
672
  int32_t         numOfExprs = pSup->numOfExprs;
795,623✔
673
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
795,623✔
674
  SqlFunctionCtx* pCtx = pSup->pCtx;
795,623✔
675
  bool            needCleanup = false;
795,623✔
676
  for (int32_t j = 0; j < numOfExprs; ++j) {
2,734,657✔
677
    needCleanup |= pCtx[j].needCleanup;
1,939,034✔
678
  }
679
  if (!needCleanup) {
795,623✔
680
    return;
794,179✔
681
  }
682

683
  // begin from last iter
684
  void*   pData = pGroupResInfo->dataPos;
1,444✔
685
  int32_t iter = pGroupResInfo->iter;
1,444✔
686
  while ((pData = tSimpleHashIterate(pHashmap, pData, &iter)) != NULL) {
1,444!
687
    SResultRowPosition* pos = pData;
×
688

689
    SFilePage* page = getBufPage(pBuf, pos->pageId);
×
690
    if (page == NULL) {
×
691
      qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
×
692
      continue;
×
693
    }
694

695
    SResultRow* pRow = (SResultRow*)((char*)page + pos->offset);
×
696

697
    for (int32_t j = 0; j < numOfExprs; ++j) {
×
698
      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
×
699
      if (pCtx[j].fpSet.cleanup) {
×
700
        pCtx[j].fpSet.cleanup(&pCtx[j]);
×
701
      }
702
    }
703

704
    releaseBufPage(pBuf, page);
×
705
  }
706
}
707

708
void cleanupResultInfo(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SGroupResInfo* pGroupResInfo,
3,123,332✔
709
                       SAggSupporter *pAggSup, bool cleanGroupResInfo) {
710
  if (cleanGroupResInfo) {
3,123,332✔
711
    cleanupResultInfoInGroupResInfo(pTaskInfo, pSup, pAggSup->pResultBuf, pGroupResInfo);
2,327,761✔
712
  } else {
713
    cleanupResultInfoInHashMap(pTaskInfo, pSup, pAggSup->pResultBuf, pGroupResInfo, pAggSup->pResultRowHashTable);
795,571✔
714
  }
715
}
3,123,364✔
716
void cleanupAggSup(SAggSupporter* pAggSup) {
5,101,124✔
717
  taosMemoryFreeClear(pAggSup->keyBuf);
5,101,124!
718
  tSimpleHashCleanup(pAggSup->pResultRowHashTable);
5,101,344✔
719
  destroyDiskbasedBuf(pAggSup->pResultBuf);
5,101,464✔
720
}
5,101,464✔
721

722
int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
5,097,698✔
723
                   const char* pkey, void* pState, SFunctionStateStore* pStore) {
724
  int32_t code = initExprSupp(pSup, pExprInfo, numOfCols, pStore);
5,097,698✔
725
  if (code != TSDB_CODE_SUCCESS) {
5,098,182!
726
    return code;
×
727
  }
728

729
  code = doInitAggInfoSup(pAggSup, pSup->pCtx, numOfCols, keyBufSize, pkey);
5,098,182✔
730
  if (code != TSDB_CODE_SUCCESS) {
5,100,316!
731
    return code;
×
732
  }
733

734
  for (int32_t i = 0; i < numOfCols; ++i) {
24,593,313✔
735
    pSup->pCtx[i].hasWindowOrGroup = pSup->hasWindowOrGroup;
19,492,997✔
736
    if (pState) {
19,492,997✔
737
      pSup->pCtx[i].saveHandle.pBuf = NULL;
253,172✔
738
      pSup->pCtx[i].saveHandle.pState = pState;
253,172✔
739
      pSup->pCtx[i].exprIdx = i;
253,172✔
740
    } else {
741
      pSup->pCtx[i].saveHandle.pBuf = pAggSup->pResultBuf;
19,239,825✔
742
    }
743
  }
744

745
  return TSDB_CODE_SUCCESS;
5,100,316✔
746
}
747

748
int32_t applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData,
417,094,622✔
749
                                        int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput) {
750
  int32_t code = TSDB_CODE_SUCCESS;
417,094,622✔
751
  for (int32_t k = 0; k < numOfOutput; ++k) {
1,584,496,955✔
752
    // keep it temporarily
753
    SFunctionCtxStatus status = {0};
1,168,135,829✔
754
    functionCtxSave(&pCtx[k], &status);
1,168,135,829✔
755

756
    pCtx[k].input.startRowIndex = offset;
1,169,365,098✔
757
    pCtx[k].input.numOfRows = forwardStep;
1,169,365,098✔
758

759
    // not a whole block involved in query processing, statistics data can not be used
760
    // NOTE: the original value of isSet have been changed here
761
    if (pCtx[k].input.colDataSMAIsSet && forwardStep < numOfTotal) {
1,169,365,098!
762
      pCtx[k].input.colDataSMAIsSet = false;
×
763
    }
764

765
    if (pCtx[k].isPseudoFunc) {
1,169,365,098✔
766
      SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[k]);
286,340,277✔
767

768
      char* p = GET_ROWCELL_INTERBUF(pEntryInfo);
286,340,277✔
769

770
      SColumnInfoData idata = {0};
286,340,277✔
771
      idata.info.type = TSDB_DATA_TYPE_BIGINT;
286,340,277✔
772
      idata.info.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes;
286,340,277✔
773
      idata.pData = p;
286,340,277✔
774

775
      SScalarParam out = {.columnData = &idata};
286,340,277✔
776
      SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData};
286,340,277✔
777
      code = pCtx[k].sfp.process(&tw, 1, &out);
286,340,277✔
778
      if (code != TSDB_CODE_SUCCESS) {
288,987,081!
779
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
780
        taskInfo->code = code;
×
781
        return code;
×
782
      }
783
      pEntryInfo->numOfRes = 1;
289,087,724✔
784
    } else {
785
      if (functionNeedToExecute(&pCtx[k]) && pCtx[k].fpSet.process != NULL) {
883,024,821✔
786
        if ((&pCtx[k])->input.pData[0] == NULL) {
713,833,282!
787
          code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
788
          qError("%s apply functions error, input data is NULL.", GET_TASKID(taskInfo));
×
789
        } else {
790
          code = pCtx[k].fpSet.process(&pCtx[k]);
713,833,282✔
791
        }
792

793
        if (code != TSDB_CODE_SUCCESS) {
714,272,172!
794
          if (pCtx[k].fpSet.cleanup != NULL) {
×
795
            pCtx[k].fpSet.cleanup(&pCtx[k]);
×
796
          }
797
          qError("%s apply functions error, code: %s", GET_TASKID(taskInfo), tstrerror(code));
×
798
          taskInfo->code = code;
×
799
          return code;
×
800
        }
801
      }
802

803
      // restore it
804
      functionCtxRestore(&pCtx[k], &status);
892,950,745✔
805
    }
806
  }
807
  return code;
416,361,126✔
808
}
809

810
void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
1,168,153,640✔
811
  pStatus->hasAgg = pCtx->input.colDataSMAIsSet;
1,168,153,640✔
812
  pStatus->numOfRows = pCtx->input.numOfRows;
1,168,153,640✔
813
  pStatus->startOffset = pCtx->input.startRowIndex;
1,168,153,640✔
814
}
1,168,153,640✔
815

816
void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
891,572,135✔
817
  pCtx->input.colDataSMAIsSet = pStatus->hasAgg;
891,572,135✔
818
  pCtx->input.numOfRows = pStatus->numOfRows;
891,572,135✔
819
  pCtx->input.startRowIndex = pStatus->startOffset;
891,572,135✔
820
}
891,572,135✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc