• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3535

23 Nov 2024 02:07AM UTC coverage: 60.85% (+0.03%) from 60.825%
#3535

push

travis-ci

web-flow
Merge pull request #28893 from taosdata/doc/internal

refact: rename taos lib name

120252 of 252737 branches covered (47.58%)

Branch coverage included in aggregate %.

201187 of 275508 relevant lines covered (73.02%)

15886166.19 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.96
/source/libs/executor/src/aggregateoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "filter.h"
17
#include "function.h"
18
#include "os.h"
19
#include "querynodes.h"
20
#include "tfill.h"
21
#include "tname.h"
22

23
#include "executorInt.h"
24
#include "index.h"
25
#include "operator.h"
26
#include "query.h"
27
#include "querytask.h"
28
#include "tcompare.h"
29
#include "tdatablock.h"
30
#include "tglobal.h"
31
#include "thash.h"
32
#include "ttypes.h"
33

34
typedef struct {
35
  bool    hasAgg;
36
  int32_t numOfRows;
37
  int32_t startOffset;
38
} SFunctionCtxStatus;
39

40
typedef struct SAggOperatorInfo {
41
  SOptrBasicInfo   binfo;
42
  SAggSupporter    aggSup;
43
  STableQueryInfo* current;
44
  uint64_t         groupId;
45
  SGroupResInfo    groupResInfo;
46
  SExprSupp        scalarExprSup;
47
  bool             groupKeyOptimized;
48
  bool             hasValidBlock;
49
  SSDataBlock*     pNewGroupBlock;
50
  bool             hasCountFunc;
51
  SOperatorInfo*   pOperator;
52
  bool             cleanGroupResInfo;
53
} SAggOperatorInfo;
54

55
static void destroyAggOperatorInfo(void* param);
56
static int32_t setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
57

58
static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock);
59
static void    destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock);
60

61
static int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx);
62
static int32_t getAggregateResultNext(SOperatorInfo* pOperator, SSDataBlock** ppRes);
63
static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
64
                                const char* pKey);
65

66
static int32_t addNewResultRowBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, uint32_t size);
67

68
static int32_t doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
69

70
static void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus);
71
static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus);
72

73
int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo,
821,239✔
74
                                    SOperatorInfo** pOptrInfo) {
75
  QRY_PARAM_CHECK(pOptrInfo);
821,239!
76

77
  int32_t    lino = 0;
821,239✔
78
  int32_t    code = 0;
821,239✔
79
  int32_t    num = 0;
821,239✔
80
  SExprInfo* pExprInfo = NULL;
821,239✔
81
  int32_t    numOfScalarExpr = 0;
821,239✔
82
  SExprInfo* pScalarExprInfo = NULL;
821,239✔
83

84
  SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo));
821,239✔
85
  SOperatorInfo*    pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
821,626✔
86
  if (pInfo == NULL || pOperator == NULL) {
821,652!
87
    code = terrno;
×
88
    goto _error;
×
89
  }
90

91
  pOperator->exprSupp.hasWindowOrGroup = false;
821,681✔
92

93
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pAggNode->node.pOutputDataBlockDesc);
821,681✔
94
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
821,935!
95
  initBasicInfo(&pInfo->binfo, pResBlock);
821,935✔
96

97
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
821,788✔
98
  initResultSizeInfo(&pOperator->resultInfo, 4096);
821,788✔
99

100
  code = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &pExprInfo, &num);
821,804✔
101
  TSDB_CHECK_CODE(code, lino, _error);
821,750!
102

103
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
821,750✔
104
                               pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
821,750✔
105
  TSDB_CHECK_CODE(code, lino, _error);
821,787!
106

107
  if (pAggNode->pExprs != NULL) {
821,787✔
108
    code = createExprInfo(pAggNode->pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr);
134,230✔
109
    TSDB_CHECK_CODE(code, lino, _error);
134,321!
110
  }
111

112
  code = initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
821,878✔
113
  TSDB_CHECK_CODE(code, lino, _error);
821,583!
114

115
  code = filterInitFromNode((SNode*)pAggNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
821,583✔
116
  TSDB_CHECK_CODE(code, lino, _error);
821,495!
117

118
  pInfo->binfo.mergeResultBlock = pAggNode->mergeDataBlock;
821,495✔
119
  pInfo->groupKeyOptimized = pAggNode->groupKeyOptimized;
821,495✔
120
  pInfo->groupId = UINT64_MAX;
821,495✔
121
  pInfo->binfo.inputTsOrder = pAggNode->node.inputTsOrder;
821,495✔
122
  pInfo->binfo.outputTsOrder = pAggNode->node.outputTsOrder;
821,495✔
123
  pInfo->hasCountFunc = pAggNode->hasCountLikeFunc;
821,495✔
124
  pInfo->pOperator = pOperator;
821,495✔
125
  pInfo->cleanGroupResInfo = false;
821,495✔
126

127
  setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG,
821,495✔
128
                  !pAggNode->node.forceCreateNonBlockingOptr, OP_NOT_OPENED, pInfo, pTaskInfo);
821,495✔
129
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, getAggregateResultNext, NULL, destroyAggOperatorInfo,
821,369✔
130
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
131

132
  if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
821,178✔
133
    STableScanInfo* pTableScanInfo = downstream->info;
553,871✔
134
    pTableScanInfo->base.pdInfo.pExprSup = &pOperator->exprSupp;
553,871✔
135
    pTableScanInfo->base.pdInfo.pAggSup = &pInfo->aggSup;
553,871✔
136
  }
137

138
  code = appendDownstream(pOperator, &downstream, 1);
821,178✔
139
  if (code != TSDB_CODE_SUCCESS) {
821,822!
140
    goto _error;
×
141
  }
142

143
  *pOptrInfo = pOperator;
821,822✔
144
  return TSDB_CODE_SUCCESS;
821,822✔
145

146
_error:
×
147
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
148
  if (pInfo != NULL) {
×
149
    destroyAggOperatorInfo(pInfo);
×
150
  }
151
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
152
  pTaskInfo->code = code;
×
153
  return code;
×
154
}
155

156
void destroyAggOperatorInfo(void* param) {
821,978✔
157
  if (param == NULL) {
821,978!
158
    return;
×
159
  }
160
  SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param;
821,978✔
161
  cleanupBasicInfo(&pInfo->binfo);
821,978✔
162

163
  if (pInfo->pOperator) {
822,023!
164
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
822,024✔
165
                      pInfo->cleanGroupResInfo);
822,024✔
166
    pInfo->pOperator = NULL;
821,997✔
167
  }
168
  cleanupAggSup(&pInfo->aggSup);
821,996✔
169
  cleanupExprSupp(&pInfo->scalarExprSup);
822,026✔
170
  cleanupGroupResInfo(&pInfo->groupResInfo);
822,023✔
171
  taosMemoryFreeClear(param);
822,029!
172
}
173

174
/**
175
 * @brief get blocks from downstream and fill results into groupedRes after aggragation
176
 * @retval false if no more groups
177
 * @retval true if there could have new groups coming
178
 * @note if pOperator.blocking is true, scan all blocks from downstream, all groups are handled
179
 *       if false, fill results of ONE GROUP
180
 * */
181
static bool nextGroupedResult(SOperatorInfo* pOperator) {
948,360✔
182
  int32_t           code = TSDB_CODE_SUCCESS;
948,360✔
183
  int32_t           lino = 0;
948,360✔
184
  SExecTaskInfo*    pTaskInfo = pOperator->pTaskInfo;
948,360✔
185
  SAggOperatorInfo* pAggInfo = pOperator->info;
948,360✔
186

187
  if (pOperator->blocking && pAggInfo->hasValidBlock) {
948,360✔
188
    return false;
77,906✔
189
  }
190

191
  SExprSupp*   pSup = &pOperator->exprSupp;
870,454✔
192
  int64_t      st = taosGetTimestampUs();
871,697✔
193
  int32_t      order = pAggInfo->binfo.inputTsOrder;
871,697✔
194
  SSDataBlock* pBlock = pAggInfo->pNewGroupBlock;
871,697✔
195

196
  pAggInfo->cleanGroupResInfo = false;
871,697✔
197
  if (pBlock) {
871,697✔
198
    pAggInfo->pNewGroupBlock = NULL;
49,921✔
199
    tSimpleHashClear(pAggInfo->aggSup.pResultRowHashTable);
49,921✔
200
    code = setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId);
49,921✔
201
    QUERY_CHECK_CODE(code, lino, _end);
49,921!
202
    code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
49,921✔
203
    QUERY_CHECK_CODE(code, lino, _end);
49,921!
204

205
    code = doAggregateImpl(pOperator, pSup->pCtx);
49,921✔
206
    QUERY_CHECK_CODE(code, lino, _end);
49,921!
207
  }
208
  while (1) {
2,515,121✔
209
    bool blockAllocated = false;
3,386,818✔
210
    pBlock = getNextBlockFromDownstream(pOperator, 0);
3,386,818✔
211
    if (pBlock == NULL) {
3,386,527✔
212
      if (!pAggInfo->hasValidBlock) {
841,685✔
213
        code = createDataBlockForEmptyInput(pOperator, &pBlock);
145,898✔
214
        QUERY_CHECK_CODE(code, lino, _end);
145,903!
215

216
        if (pBlock == NULL) {
145,903✔
217
          break;
125,942✔
218
        }
219
        blockAllocated = true;
19,961✔
220
      } else {
221
        break;
695,787✔
222
      }
223
    }
224
    pAggInfo->hasValidBlock = true;
2,564,803✔
225
    pAggInfo->binfo.pRes->info.scanFlag = pBlock->info.scanFlag;
2,564,803✔
226

227
    // there is an scalar expression that needs to be calculated before apply the group aggregation.
228
    if (pAggInfo->scalarExprSup.pExprInfo != NULL && !blockAllocated) {
2,564,803✔
229
      SExprSupp* pSup1 = &pAggInfo->scalarExprSup;
579,249✔
230
      code = projectApplyFunctions(pSup1->pExprInfo, pBlock, pBlock, pSup1->pCtx, pSup1->numOfExprs, NULL);
579,249✔
231
      if (code != TSDB_CODE_SUCCESS) {
579,741!
232
        destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
×
233
        T_LONG_JMP(pTaskInfo->env, code);
×
234
      }
235
    }
236
    // if non-blocking mode and new group arrived, save the block and break
237
    if (!pOperator->blocking && pAggInfo->groupId != UINT64_MAX && pBlock->info.id.groupId != pAggInfo->groupId) {
2,565,295✔
238
      pAggInfo->pNewGroupBlock = pBlock;
49,973✔
239
      break;
49,973✔
240
    }
241
    // the pDataBlock are always the same one, no need to call this again
242
    code = setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId);
2,515,322✔
243
    if (code != TSDB_CODE_SUCCESS) {
2,515,393✔
244
      destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
26✔
245
      T_LONG_JMP(pTaskInfo->env, code);
26!
246
    }
247
    code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
2,515,367✔
248
    if (code != TSDB_CODE_SUCCESS) {
2,515,922!
249
      destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
×
250
      T_LONG_JMP(pTaskInfo->env, code);
×
251
    }
252

253
    code = doAggregateImpl(pOperator, pSup->pCtx);
2,515,922✔
254
    if (code != TSDB_CODE_SUCCESS) {
2,515,494!
255
      destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
×
256
      T_LONG_JMP(pTaskInfo->env, code);
×
257
    }
258

259
    destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
2,515,494✔
260
  }
261

262
  // the downstream operator may return with error code, so let's check the code before generating results.
263
  if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
871,702!
264
    T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
265
  }
266

267
  code = initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0);
871,702✔
268
  QUERY_CHECK_CODE(code, lino, _end);
871,806!
269
  pAggInfo->cleanGroupResInfo = true;
871,806✔
270

271
_end:
871,806✔
272
  if (code != TSDB_CODE_SUCCESS) {
871,806!
273
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
274
    pTaskInfo->code = code;
×
275
    T_LONG_JMP(pTaskInfo->env, code);
×
276
  }
277
  return pBlock != NULL;
871,806✔
278
}
279

280
int32_t getAggregateResultNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1,620,494✔
281
  int32_t           code = TSDB_CODE_SUCCESS;
1,620,494✔
282
  int32_t           lino = 0;
1,620,494✔
283
  SAggOperatorInfo* pAggInfo = pOperator->info;
1,620,494✔
284
  SOptrBasicInfo*   pInfo = &pAggInfo->binfo;
1,620,494✔
285

286
  if (pOperator->status == OP_EXEC_DONE) {
1,620,494✔
287
    (*ppRes) = NULL;
672,165✔
288
    return code;
672,165✔
289
  }
290

291
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
948,329✔
292
  bool           hasNewGroups = false;
948,329✔
293
  do {
294
    hasNewGroups = nextGroupedResult(pOperator);
948,357✔
295
    code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
949,675✔
296
    QUERY_CHECK_CODE(code, lino, _end);
949,739!
297

298
    while (1) {
299
      doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf);
949,739✔
300
      code = doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
949,643✔
301
      QUERY_CHECK_CODE(code, lino, _end);
949,546!
302

303
      if (!hasRemainResults(&pAggInfo->groupResInfo)) {
949,546✔
304
        if (!hasNewGroups) setOperatorCompleted(pOperator);
871,775✔
305
        break;
871,806✔
306
      }
307

308
      if (pInfo->pRes->info.rows > 0) {
77,845!
309
        break;
77,845✔
310
      }
311
    }
312
  } while (pInfo->pRes->info.rows == 0 && hasNewGroups);
949,651✔
313

314
  size_t rows = blockDataGetNumOfRows(pInfo->pRes);
949,623✔
315
  pOperator->resultInfo.totalRows += rows;
949,550✔
316

317
_end:
949,550✔
318
  if (code != TSDB_CODE_SUCCESS) {
949,550!
319
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
320
    pTaskInfo->code = code;
×
321
    T_LONG_JMP(pTaskInfo->env, code);
×
322
  }
323

324
  (*ppRes) = (rows == 0) ? NULL : pInfo->pRes;
949,550✔
325
  return code;
949,550✔
326
}
327

328
static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
×
329
  SSDataBlock* pRes = NULL;
×
330
  int32_t code = getAggregateResultNext(pOperator, &pRes);
×
331
  return pRes;
×
332
}
333

334
int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) {
2,565,519✔
335
  int32_t code = TSDB_CODE_SUCCESS;
2,565,519✔
336
  for (int32_t k = 0; k < pOperator->exprSupp.numOfExprs; ++k) {
10,866,383✔
337
    if (functionNeedToExecute(&pCtx[k])) {
8,301,067✔
338
      // todo add a dummy function to avoid process check
339
      if (pCtx[k].fpSet.process == NULL) {
8,292,134✔
340
        continue;
310,341✔
341
      }
342

343
      if ((&pCtx[k])->input.pData[0] == NULL) {
7,981,793!
344
        code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
345
        qError("%s aggregate function error happens, input data is NULL.", GET_TASKID(pOperator->pTaskInfo));
×
346
      } else {
347
        code = pCtx[k].fpSet.process(&pCtx[k]);
7,981,793✔
348
      }
349

350
      if (code != TSDB_CODE_SUCCESS) {
7,982,899!
351
        if (pCtx[k].fpSet.cleanup != NULL) {
×
352
          pCtx[k].fpSet.cleanup(&pCtx[k]);
×
353
        }
354
        qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code));
×
355
        return code;
×
356
      }
357
    }
358
  }
359

360
  return TSDB_CODE_SUCCESS;
2,565,316✔
361
}
362

363
static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock) {
145,883✔
364
  int32_t code = TSDB_CODE_SUCCESS;
145,883✔
365
  int32_t lino = 0;
145,883✔
366
  SSDataBlock* pBlock = NULL;
145,883✔
367
  if (!tsCountAlwaysReturnValue) {
145,883✔
368
    return TSDB_CODE_SUCCESS;
48,436✔
369
  }
370

371
  SAggOperatorInfo* pAggInfo = pOperator->info;
97,447✔
372
  if (pAggInfo->groupKeyOptimized) {
97,447✔
373
    return TSDB_CODE_SUCCESS;
42,420✔
374
  }
375

376
  SOperatorInfo* downstream = pOperator->pDownstream[0];
55,027✔
377
  if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_PARTITION ||
55,027✔
378
      downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_SORT ||
54,719✔
379
      (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN &&
54,495✔
380
       ((STableScanInfo*)downstream->info)->hasGroupByTag == true)) {
31,979✔
381
    return TSDB_CODE_SUCCESS;
551✔
382
  }
383

384
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
54,476✔
385

386
  if (!pAggInfo->hasCountFunc) {
54,476✔
387
    return TSDB_CODE_SUCCESS;
34,537✔
388
  }
389

390
  code = createDataBlock(&pBlock);
19,939✔
391
  if (code) {
19,961!
392
    return code;
×
393
  }
394

395
  pBlock->info.rows = 1;
19,961✔
396
  pBlock->info.capacity = 0;
19,961✔
397

398
  for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
51,441✔
399
    SColumnInfoData colInfo = {0};
31,483✔
400
    colInfo.hasNull = true;
31,483✔
401
    colInfo.info.type = TSDB_DATA_TYPE_NULL;
31,483✔
402
    colInfo.info.bytes = 1;
31,483✔
403

404
    SExprInfo* pOneExpr = &pOperator->exprSupp.pExprInfo[i];
31,483✔
405
    for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
63,377✔
406
      SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
31,897✔
407
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
31,897✔
408
        int32_t slotId = pFuncParam->pCol->slotId;
31,872✔
409
        int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
31,872✔
410
        if (slotId >= numOfCols) {
31,870✔
411
          code = taosArrayEnsureCap(pBlock->pDataBlock, slotId + 1);
20,976✔
412
          QUERY_CHECK_CODE(code, lino, _end);
20,974!
413

414
          for (int32_t k = numOfCols; k < slotId + 1; ++k) {
47,291✔
415
            void* tmp = taosArrayPush(pBlock->pDataBlock, &colInfo);
26,316✔
416
            QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
26,317!
417
          }
418
        }
419
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
25✔
420
        // do nothing
421
      }
422
    }
423
  }
424

425
  code = blockDataEnsureCapacity(pBlock, pBlock->info.rows);
19,958✔
426
  QUERY_CHECK_CODE(code, lino, _end);
19,963!
427

428
  for (int32_t i = 0; i < blockDataGetNumOfCols(pBlock); ++i) {
46,287✔
429
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
26,319✔
430
    QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
26,322!
431
    colDataSetNULL(pColInfoData, 0);
432
  }
433
  *ppBlock = pBlock;
19,961✔
434

435
_end:
19,961✔
436
  if (code != TSDB_CODE_SUCCESS) {
19,961!
437
    blockDataDestroy(pBlock);
×
438
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
439
  }
440
  return code;
19,962✔
441
}
442

443
void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock) {
2,515,284✔
444
  if (!blockAllocated) {
2,515,284✔
445
    return;
2,495,464✔
446
  }
447

448
  blockDataDestroy(*ppBlock);
19,820✔
449
  *ppBlock = NULL;
19,962✔
450
}
451

452
int32_t setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
2,565,482✔
453
  int32_t           code = TSDB_CODE_SUCCESS;
2,565,482✔
454
  SAggOperatorInfo* pAggInfo = pOperator->info;
2,565,482✔
455
  if (pAggInfo->groupId != UINT64_MAX && pAggInfo->groupId == groupId) {
2,565,482✔
456
    return code;
1,026,107✔
457
  }
458

459
  code = doSetTableGroupOutputBuf(pOperator, numOfOutput, groupId);
1,539,375✔
460

461
  // record the current active group id
462
  pAggInfo->groupId = groupId;
1,539,346✔
463
  return code;
1,539,346✔
464
}
465

466
int32_t doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
1,540,028✔
467
  // for simple group by query without interval, all the tables belong to one group result.
468
  int32_t           code = TSDB_CODE_SUCCESS;
1,540,028✔
469
  int32_t           lino = 0;
1,540,028✔
470
  SExecTaskInfo*    pTaskInfo = pOperator->pTaskInfo;
1,540,028✔
471
  SAggOperatorInfo* pAggInfo = pOperator->info;
1,540,028✔
472

473
  SResultRowInfo* pResultRowInfo = &pAggInfo->binfo.resultRowInfo;
1,540,028✔
474
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
1,540,028✔
475
  int32_t*        rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset;
1,540,028✔
476

477
  SResultRow* pResultRow =
478
      doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, (char*)&groupId, sizeof(groupId), true,
1,540,028✔
479
                             groupId, pTaskInfo, false, &pAggInfo->aggSup, true);
480
  if (pResultRow == NULL || pTaskInfo->code != 0) {
1,540,321!
481
    code = pTaskInfo->code;
×
482
    lino = __LINE__;
×
483
    goto _end;
×
484
  }
485
  /*
486
   * not assign result buffer yet, add new result buffer
487
   * all group belong to one result set, and each group result has different group id so set the id to be one
488
   */
489
  if (pResultRow->pageId == -1) {
1,540,357!
490
    code = addNewResultRowBuf(pResultRow, pAggInfo->aggSup.pResultBuf, pAggInfo->binfo.pRes->info.rowSize);
×
491
    QUERY_CHECK_CODE(code, lino, _end);
×
492
  }
493

494
  code = setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
1,540,357✔
495
  QUERY_CHECK_CODE(code, lino, _end);
1,539,444✔
496

497
_end:
1,539,418✔
498
  if (code != TSDB_CODE_SUCCESS) {
1,539,408✔
499
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
26!
500
  }
501
  return code;
1,539,315✔
502
}
503

504
// a new buffer page for each table. Needs to opt this design
505
int32_t addNewResultRowBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, uint32_t size) {
×
506
  if (pWindowRes->pageId != -1) {
×
507
    return 0;
×
508
  }
509

510
  SFilePage* pData = NULL;
×
511

512
  // in the first scan, new space needed for results
513
  int32_t pageId = -1;
×
514
  SArray* list = getDataBufPagesIdList(pResultBuf);
×
515

516
  if (taosArrayGetSize(list) == 0) {
×
517
    pData = getNewBufPage(pResultBuf, &pageId);
×
518
    if (pData == NULL) {
×
519
      qError("failed to get buffer, code:%s", tstrerror(terrno));
×
520
      return terrno;
×
521
    }
522
    pData->num = sizeof(SFilePage);
×
523
  } else {
524
    SPageInfo* pi = getLastPageInfo(list);
×
525
    pData = getBufPage(pResultBuf, getPageId(pi));
×
526
    if (pData == NULL) {
×
527
      qError("failed to get buffer, code:%s", tstrerror(terrno));
×
528
      return terrno;
×
529
    }
530

531
    pageId = getPageId(pi);
×
532

533
    if (pData->num + size > getBufPageSize(pResultBuf)) {
×
534
      // release current page first, and prepare the next one
535
      releaseBufPageInfo(pResultBuf, pi);
×
536

537
      pData = getNewBufPage(pResultBuf, &pageId);
×
538
      if (pData == NULL) {
×
539
        qError("failed to get buffer, code:%s", tstrerror(terrno));
×
540
        return terrno;
×
541
      }
542
      pData->num = sizeof(SFilePage);
×
543
    }
544
  }
545

546
  if (pData == NULL) {
×
547
    return -1;
×
548
  }
549

550
  // set the number of rows in current disk page
551
  if (pWindowRes->pageId == -1) {  // not allocated yet, allocate new buffer
×
552
    pWindowRes->pageId = pageId;
×
553
    pWindowRes->offset = (int32_t)pData->num;
×
554

555
    pData->num += size;
×
556
  }
557

558
  return 0;
×
559
}
560

561
int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
5,436,745✔
562
                         const char* pKey) {
563
  int32_t code = 0;
5,436,745✔
564
  //  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
565

566
  pAggSup->currentPageId = -1;
5,436,745✔
567
  pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
5,436,745✔
568
  pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t));
5,442,838✔
569
  pAggSup->pResultRowHashTable = tSimpleHashInit(100, taosFastHash);
5,444,751✔
570

571
  if (pAggSup->keyBuf == NULL || pAggSup->pResultRowHashTable == NULL) {
5,439,680!
572
    return terrno;
×
573
  }
574

575
  uint32_t defaultPgsz = 0;
5,440,597✔
576
  uint32_t defaultBufsz = 0;
5,440,597✔
577
  code = getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz);
5,440,597✔
578
  if (code) {
5,443,170!
579
    qError("failed to get buff page size, rowSize:%d", pAggSup->resultRowSize);
×
580
    return code;
×
581
  }
582

583
  if (!osTempSpaceAvailable()) {
5,443,170!
584
    code = TSDB_CODE_NO_DISKSPACE;
×
585
    qError("Init stream agg supporter failed since %s, key:%s, tempDir:%s", tstrerror(code), pKey, tsTempDir);
×
586
    return code;
×
587
  }
588

589
  code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, tsTempDir);
5,441,470✔
590
  if (code != TSDB_CODE_SUCCESS) {
5,443,953!
591
    qError("Create agg result buf failed since %s, %s", tstrerror(code), pKey);
×
592
    return code;
×
593
  }
594

595
  return code;
5,443,970✔
596
}
597

598
void cleanupResultInfoInStream(SExecTaskInfo* pTaskInfo, void* pState, SExprSupp* pSup, SGroupResInfo* pGroupResInfo) {
1,711✔
599
  int32_t         code = TSDB_CODE_SUCCESS;
1,711✔
600
  SStorageAPI*    pAPI = &pTaskInfo->storageAPI;
1,711✔
601
  int32_t         numOfExprs = pSup->numOfExprs;
1,711✔
602
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
1,711✔
603
  SqlFunctionCtx* pCtx = pSup->pCtx;
1,711✔
604
  int32_t         numOfRows = getNumOfTotalRes(pGroupResInfo);
1,711✔
605
  bool            needCleanup = false;
1,712✔
606

607
  for (int32_t j = 0; j < numOfExprs; ++j) {
28,166✔
608
    needCleanup |= pCtx[j].needCleanup;
26,454✔
609
  }
610
  if (!needCleanup) {
1,712!
611
    return;
1,712✔
612
  }
613
  
614
  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
×
615
    SResultWindowInfo* pWinInfo = taosArrayGet(pGroupResInfo->pRows, i);
×
616
    SRowBuffPos*       pPos = pWinInfo->pStatePos;
×
617
    SResultRow*        pRow = NULL;
×
618

619
    code = pAPI->stateStore.streamStateGetByPos(pState, pPos, (void**)&pRow);
×
620
    if (TSDB_CODE_SUCCESS != code) {
×
621
      qError("failed to get state by pos, code:%s, %s", tstrerror(code), GET_TASKID(pTaskInfo));
×
622
      continue;
×
623
    }
624

625
    for (int32_t j = 0; j < numOfExprs; ++j) {
×
626
      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
×
627
      if (pCtx[j].fpSet.cleanup) {
×
628
        pCtx[j].fpSet.cleanup(&pCtx[j]);
×
629
      }
630
    }
631
  }
632
}
633

634
void cleanupResultInfoInGroupResInfo(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf* pBuf,
2,494,322✔
635
                                  SGroupResInfo* pGroupResInfo) {
636
  int32_t         numOfExprs = pSup->numOfExprs;
2,494,322✔
637
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
2,494,322✔
638
  SqlFunctionCtx* pCtx = pSup->pCtx;
2,494,322✔
639
  int32_t         numOfRows = getNumOfTotalRes(pGroupResInfo);
2,494,322✔
640
  bool            needCleanup = false;
2,494,260✔
641

642
  for (int32_t j = 0; j < numOfExprs; ++j) {
8,409,466✔
643
    needCleanup |= pCtx[j].needCleanup;
5,915,206✔
644
  }
645
  if (!needCleanup) {
2,494,260✔
646
    return;
2,473,650✔
647
  }
648

649
  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
20,610!
650
    SResultRow*        pRow = NULL;
×
651
    SResKeyPos*        pPos = taosArrayGetP(pGroupResInfo->pRows, i);
×
652
    SFilePage*         page = getBufPage(pBuf, pPos->pos.pageId);
×
653
    if (page == NULL) {
×
654
      qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
×
655
      continue;
×
656
    }
657
    pRow = (SResultRow*)((char*)page + pPos->pos.offset);
×
658

659

660
    for (int32_t j = 0; j < numOfExprs; ++j) {
×
661
      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
×
662
      if (pCtx[j].fpSet.cleanup) {
×
663
        pCtx[j].fpSet.cleanup(&pCtx[j]);
×
664
      }
665
    }
666
    releaseBufPage(pBuf, page);
×
667
  }
668
}
669

670
void cleanupResultInfoInHashMap(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf* pBuf,
845,963✔
671
                       SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap) {
672
  int32_t         numOfExprs = pSup->numOfExprs;
845,963✔
673
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
845,963✔
674
  SqlFunctionCtx* pCtx = pSup->pCtx;
845,963✔
675
  bool            needCleanup = false;
845,963✔
676
  for (int32_t j = 0; j < numOfExprs; ++j) {
2,872,040✔
677
    needCleanup |= pCtx[j].needCleanup;
2,026,077✔
678
  }
679
  if (!needCleanup) {
845,963✔
680
    return;
844,515✔
681
  }
682

683
  // begin from last iter
684
  void*   pData = pGroupResInfo->dataPos;
1,448✔
685
  int32_t iter = pGroupResInfo->iter;
1,448✔
686
  while ((pData = tSimpleHashIterate(pHashmap, pData, &iter)) != NULL) {
1,448!
687
    SResultRowPosition* pos = pData;
×
688

689
    SFilePage* page = getBufPage(pBuf, pos->pageId);
×
690
    if (page == NULL) {
×
691
      qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
×
692
      continue;
×
693
    }
694

695
    SResultRow* pRow = (SResultRow*)((char*)page + pos->offset);
×
696

697
    for (int32_t j = 0; j < numOfExprs; ++j) {
×
698
      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
×
699
      if (pCtx[j].fpSet.cleanup) {
×
700
        pCtx[j].fpSet.cleanup(&pCtx[j]);
×
701
      }
702
    }
703

704
    releaseBufPage(pBuf, page);
×
705
  }
706
}
707

708
void cleanupResultInfo(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SGroupResInfo* pGroupResInfo,
3,340,259✔
709
                       SAggSupporter *pAggSup, bool cleanGroupResInfo) {
710
  if (cleanGroupResInfo) {
3,340,259✔
711
    cleanupResultInfoInGroupResInfo(pTaskInfo, pSup, pAggSup->pResultBuf, pGroupResInfo);
2,494,356✔
712
  } else {
713
    cleanupResultInfoInHashMap(pTaskInfo, pSup, pAggSup->pResultBuf, pGroupResInfo, pAggSup->pResultRowHashTable);
845,903✔
714
  }
715
}
3,340,279✔
716
void cleanupAggSup(SAggSupporter* pAggSup) {
5,444,800✔
717
  taosMemoryFreeClear(pAggSup->keyBuf);
5,444,800!
718
  tSimpleHashCleanup(pAggSup->pResultRowHashTable);
5,445,114✔
719
  destroyDiskbasedBuf(pAggSup->pResultBuf);
5,445,124✔
720
}
5,445,138✔
721

722
int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
5,440,461✔
723
                   const char* pkey, void* pState, SFunctionStateStore* pStore) {
724
  int32_t code = initExprSupp(pSup, pExprInfo, numOfCols, pStore);
5,440,461✔
725
  if (code != TSDB_CODE_SUCCESS) {
5,440,687!
726
    return code;
×
727
  }
728

729
  code = doInitAggInfoSup(pAggSup, pSup->pCtx, numOfCols, keyBufSize, pkey);
5,440,687✔
730
  if (code != TSDB_CODE_SUCCESS) {
5,443,959!
731
    return code;
×
732
  }
733

734
  for (int32_t i = 0; i < numOfCols; ++i) {
26,059,857✔
735
    pSup->pCtx[i].hasWindowOrGroup = pSup->hasWindowOrGroup;
20,615,898✔
736
    if (pState) {
20,615,898✔
737
      pSup->pCtx[i].saveHandle.pBuf = NULL;
253,495✔
738
      pSup->pCtx[i].saveHandle.pState = pState;
253,495✔
739
      pSup->pCtx[i].exprIdx = i;
253,495✔
740
    } else {
741
      pSup->pCtx[i].saveHandle.pBuf = pAggSup->pResultBuf;
20,362,403✔
742
    }
743
  }
744

745
  return TSDB_CODE_SUCCESS;
5,443,959✔
746
}
747

748
int32_t applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData,
447,240,351✔
749
                                        int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput) {
750
  int32_t code = TSDB_CODE_SUCCESS;
447,240,351✔
751
  for (int32_t k = 0; k < numOfOutput; ++k) {
1,650,598,879✔
752
    // keep it temporarily
753
    SFunctionCtxStatus status = {0};
1,205,021,812✔
754
    functionCtxSave(&pCtx[k], &status);
1,205,021,812✔
755

756
    pCtx[k].input.startRowIndex = offset;
1,202,639,969✔
757
    pCtx[k].input.numOfRows = forwardStep;
1,202,639,969✔
758

759
    // not a whole block involved in query processing, statistics data can not be used
760
    // NOTE: the original value of isSet have been changed here
761
    if (pCtx[k].input.colDataSMAIsSet && forwardStep < numOfTotal) {
1,202,639,969!
762
      pCtx[k].input.colDataSMAIsSet = false;
×
763
    }
764

765
    if (pCtx[k].isPseudoFunc) {
1,202,639,969✔
766
      SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[k]);
287,018,306✔
767

768
      char* p = GET_ROWCELL_INTERBUF(pEntryInfo);
287,018,306✔
769

770
      SColumnInfoData idata = {0};
287,018,306✔
771
      idata.info.type = TSDB_DATA_TYPE_BIGINT;
287,018,306✔
772
      idata.info.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes;
287,018,306✔
773
      idata.pData = p;
287,018,306✔
774

775
      SScalarParam out = {.columnData = &idata};
287,018,306✔
776
      SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData};
287,018,306✔
777
      code = pCtx[k].sfp.process(&tw, 1, &out);
287,018,306✔
778
      if (code != TSDB_CODE_SUCCESS) {
290,196,938✔
779
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
279,308!
780
        taskInfo->code = code;
×
781
        return code;
×
782
      }
783
      pEntryInfo->numOfRes = 1;
289,917,630✔
784
    } else {
785
      if (functionNeedToExecute(&pCtx[k]) && pCtx[k].fpSet.process != NULL) {
915,621,663✔
786
        if ((&pCtx[k])->input.pData[0] == NULL) {
749,911,232!
787
          code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
788
          qError("%s apply functions error, input data is NULL.", GET_TASKID(taskInfo));
×
789
        } else {
790
          code = pCtx[k].fpSet.process(&pCtx[k]);
749,911,232✔
791
        }
792

793
        if (code != TSDB_CODE_SUCCESS) {
750,553,949!
794
          if (pCtx[k].fpSet.cleanup != NULL) {
×
795
            pCtx[k].fpSet.cleanup(&pCtx[k]);
×
796
          }
797
          qError("%s apply functions error, code: %s", GET_TASKID(taskInfo), tstrerror(code));
×
798
          taskInfo->code = code;
×
799
          return code;
×
800
        }
801
      }
802

803
      // restore it
804
      functionCtxRestore(&pCtx[k], &status);
926,691,104✔
805
    }
806
  }
807
  return code;
445,577,067✔
808
}
809

810
void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
1,205,264,293✔
811
  pStatus->hasAgg = pCtx->input.colDataSMAIsSet;
1,205,264,293✔
812
  pStatus->numOfRows = pCtx->input.numOfRows;
1,205,264,293✔
813
  pStatus->startOffset = pCtx->input.startRowIndex;
1,205,264,293✔
814
}
1,205,264,293✔
815

816
void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
926,351,687✔
817
  pCtx->input.colDataSMAIsSet = pStatus->hasAgg;
926,351,687✔
818
  pCtx->input.numOfRows = pStatus->numOfRows;
926,351,687✔
819
  pCtx->input.startRowIndex = pStatus->startOffset;
926,351,687✔
820
}
926,351,687✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc