• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3524

08 Nov 2024 04:27AM UTC coverage: 60.898% (+5.0%) from 55.861%
#3524

push

travis-ci

web-flow
Merge pull request #28647 from taosdata/fix/3.0/TD-32519_drop_ctb

fix TD-32519 drop child table with tsma caused crash

118687 of 248552 branches covered (47.75%)

Branch coverage included in aggregate %.

286 of 337 new or added lines in 18 files covered. (84.87%)

9647 existing lines in 190 files now uncovered.

199106 of 273291 relevant lines covered (72.85%)

15236719.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.32
/source/libs/executor/src/aggregateoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "filter.h"
17
#include "function.h"
18
#include "os.h"
19
#include "querynodes.h"
20
#include "tfill.h"
21
#include "tname.h"
22

23
#include "executorInt.h"
24
#include "index.h"
25
#include "operator.h"
26
#include "query.h"
27
#include "querytask.h"
28
#include "tcompare.h"
29
#include "tdatablock.h"
30
#include "tglobal.h"
31
#include "thash.h"
32
#include "ttypes.h"
33

34
typedef struct {
35
  bool    hasAgg;
36
  int32_t numOfRows;
37
  int32_t startOffset;
38
} SFunctionCtxStatus;
39

40
typedef struct SAggOperatorInfo {
41
  SOptrBasicInfo   binfo;
42
  SAggSupporter    aggSup;
43
  STableQueryInfo* current;
44
  uint64_t         groupId;
45
  SGroupResInfo    groupResInfo;
46
  SExprSupp        scalarExprSup;
47
  bool             groupKeyOptimized;
48
  bool             hasValidBlock;
49
  SSDataBlock*     pNewGroupBlock;
50
  bool             hasCountFunc;
51
  SOperatorInfo*   pOperator;
52
  bool             cleanGroupResInfo;
53
} SAggOperatorInfo;
54

55
static void destroyAggOperatorInfo(void* param);
56
static int32_t setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
57

58
static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock);
59
static void    destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock);
60

61
static int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx);
62
static int32_t getAggregateResultNext(SOperatorInfo* pOperator, SSDataBlock** ppRes);
63
static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
64
                                const char* pKey);
65

66
static int32_t addNewResultRowBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, uint32_t size);
67

68
static int32_t doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
69

70
static void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus);
71
static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus);
72

73
int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo,
898,537✔
74
                                    SOperatorInfo** pOptrInfo) {
75
  QRY_PARAM_CHECK(pOptrInfo);
898,537!
76

77
  int32_t    lino = 0;
898,537✔
78
  int32_t    code = 0;
898,537✔
79
  int32_t    num = 0;
898,537✔
80
  SExprInfo* pExprInfo = NULL;
898,537✔
81
  int32_t    numOfScalarExpr = 0;
898,537✔
82
  SExprInfo* pScalarExprInfo = NULL;
898,537✔
83

84
  SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo));
898,537✔
85
  SOperatorInfo*    pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
899,008✔
86
  if (pInfo == NULL || pOperator == NULL) {
899,052!
87
    code = terrno;
8✔
UNCOV
88
    goto _error;
×
89
  }
90

91
  pOperator->exprSupp.hasWindowOrGroup = false;
899,044✔
92

93
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pAggNode->node.pOutputDataBlockDesc);
899,044✔
94
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
899,331!
95
  initBasicInfo(&pInfo->binfo, pResBlock);
899,331✔
96

97
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
899,189✔
98
  initResultSizeInfo(&pOperator->resultInfo, 4096);
899,189✔
99

100
  code = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &pExprInfo, &num);
899,228✔
101
  TSDB_CHECK_CODE(code, lino, _error);
899,131!
102

103
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
899,131✔
104
                               pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
899,131✔
105
  TSDB_CHECK_CODE(code, lino, _error);
899,256!
106

107
  if (pAggNode->pExprs != NULL) {
899,256✔
108
    code = createExprInfo(pAggNode->pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr);
164,421✔
109
    TSDB_CHECK_CODE(code, lino, _error);
164,384!
110
  }
111

112
  code = initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
899,219✔
113
  TSDB_CHECK_CODE(code, lino, _error);
899,130!
114

115
  code = filterInitFromNode((SNode*)pAggNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
899,130✔
116
  TSDB_CHECK_CODE(code, lino, _error);
898,855!
117

118
  pInfo->binfo.mergeResultBlock = pAggNode->mergeDataBlock;
898,855✔
119
  pInfo->groupKeyOptimized = pAggNode->groupKeyOptimized;
898,855✔
120
  pInfo->groupId = UINT64_MAX;
898,855✔
121
  pInfo->binfo.inputTsOrder = pAggNode->node.inputTsOrder;
898,855✔
122
  pInfo->binfo.outputTsOrder = pAggNode->node.outputTsOrder;
898,855✔
123
  pInfo->hasCountFunc = pAggNode->hasCountLikeFunc;
898,855✔
124
  pInfo->pOperator = pOperator;
898,855✔
125
  pInfo->cleanGroupResInfo = false;
898,855✔
126

127
  setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG,
898,855✔
128
                  !pAggNode->node.forceCreateNonBlockingOptr, OP_NOT_OPENED, pInfo, pTaskInfo);
898,855✔
129
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, getAggregateResultNext, NULL, destroyAggOperatorInfo,
898,870✔
130
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
131

132
  if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
898,643✔
133
    STableScanInfo* pTableScanInfo = downstream->info;
608,049✔
134
    pTableScanInfo->base.pdInfo.pExprSup = &pOperator->exprSupp;
608,049✔
135
    pTableScanInfo->base.pdInfo.pAggSup = &pInfo->aggSup;
608,049✔
136
  }
137

138
  code = appendDownstream(pOperator, &downstream, 1);
898,643✔
139
  if (code != TSDB_CODE_SUCCESS) {
899,228!
UNCOV
140
    goto _error;
×
141
  }
142

143
  *pOptrInfo = pOperator;
899,228✔
144
  return TSDB_CODE_SUCCESS;
899,228✔
145

UNCOV
146
_error:
×
UNCOV
147
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
148
  if (pInfo != NULL) {
×
UNCOV
149
    destroyAggOperatorInfo(pInfo);
×
150
  }
UNCOV
151
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
UNCOV
152
  pTaskInfo->code = code;
×
UNCOV
153
  return code;
×
154
}
155

156
void destroyAggOperatorInfo(void* param) {
899,396✔
157
  if (param == NULL) {
899,396!
158
    return;
×
159
  }
160
  SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param;
899,396✔
161
  cleanupBasicInfo(&pInfo->binfo);
899,396✔
162

163
  if (pInfo->pOperator) {
899,430!
164
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
899,431✔
165
                      pInfo->cleanGroupResInfo);
899,431✔
166
    pInfo->pOperator = NULL;
899,411✔
167
  }
168
  cleanupAggSup(&pInfo->aggSup);
899,410✔
169
  cleanupExprSupp(&pInfo->scalarExprSup);
899,442✔
170
  cleanupGroupResInfo(&pInfo->groupResInfo);
899,434✔
171
  taosMemoryFreeClear(param);
899,443✔
172
}
173

174
/**
175
 * @brief get blocks from downstream and fill results into groupedRes after aggragation
176
 * @retval false if no more groups
177
 * @retval true if there could have new groups coming
178
 * @note if pOperator.blocking is true, scan all blocks from downstream, all groups are handled
179
 *       if false, fill results of ONE GROUP
180
 * */
181
static bool nextGroupedResult(SOperatorInfo* pOperator) {
1,030,534✔
182
  int32_t           code = TSDB_CODE_SUCCESS;
1,030,534✔
183
  int32_t           lino = 0;
1,030,534✔
184
  SExecTaskInfo*    pTaskInfo = pOperator->pTaskInfo;
1,030,534✔
185
  SAggOperatorInfo* pAggInfo = pOperator->info;
1,030,534✔
186

187
  if (pOperator->blocking && pAggInfo->hasValidBlock) {
1,030,534✔
188
    return false;
82,821✔
189
  }
190

191
  SExprSupp*   pSup = &pOperator->exprSupp;
947,713✔
192
  int64_t      st = taosGetTimestampUs();
949,148✔
193
  int32_t      order = pAggInfo->binfo.inputTsOrder;
949,148✔
194
  SSDataBlock* pBlock = pAggInfo->pNewGroupBlock;
949,148✔
195

196
  pAggInfo->cleanGroupResInfo = false;
949,148✔
197
  if (pBlock) {
949,148✔
198
    pAggInfo->pNewGroupBlock = NULL;
49,894✔
199
    tSimpleHashClear(pAggInfo->aggSup.pResultRowHashTable);
49,894✔
200
    code = setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId);
49,894✔
201
    QUERY_CHECK_CODE(code, lino, _end);
49,894!
202
    code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
49,894✔
203
    QUERY_CHECK_CODE(code, lino, _end);
49,894!
204

205
    code = doAggregateImpl(pOperator, pSup->pCtx);
49,894✔
206
    QUERY_CHECK_CODE(code, lino, _end);
49,894!
207
  }
208
  while (1) {
2,443,808✔
209
    bool blockAllocated = false;
3,392,956✔
210
    pBlock = getNextBlockFromDownstream(pOperator, 0);
3,392,956✔
211
    if (pBlock == NULL) {
3,392,710✔
212
      if (!pAggInfo->hasValidBlock) {
919,324✔
213
        code = createDataBlockForEmptyInput(pOperator, &pBlock);
190,595✔
214
        QUERY_CHECK_CODE(code, lino, _end);
190,591!
215

216
        if (pBlock == NULL) {
190,591✔
217
          break;
170,408✔
218
        }
219
        blockAllocated = true;
20,183✔
220
      } else {
221
        break;
728,729✔
222
      }
223
    }
224
    pAggInfo->hasValidBlock = true;
2,493,569✔
225
    pAggInfo->binfo.pRes->info.scanFlag = pBlock->info.scanFlag;
2,493,569✔
226

227
    // there is an scalar expression that needs to be calculated before apply the group aggregation.
228
    if (pAggInfo->scalarExprSup.pExprInfo != NULL && !blockAllocated) {
2,493,569✔
229
      SExprSupp* pSup1 = &pAggInfo->scalarExprSup;
500,695✔
230
      code = projectApplyFunctions(pSup1->pExprInfo, pBlock, pBlock, pSup1->pCtx, pSup1->numOfExprs, NULL);
500,695✔
231
      if (code != TSDB_CODE_SUCCESS) {
500,767!
UNCOV
232
        destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
×
UNCOV
233
        T_LONG_JMP(pTaskInfo->env, code);
×
234
      }
235
    }
236
    // if non-blocking mode and new group arrived, save the block and break
237
    if (!pOperator->blocking && pAggInfo->groupId != UINT64_MAX && pBlock->info.id.groupId != pAggInfo->groupId) {
2,493,641✔
238
      pAggInfo->pNewGroupBlock = pBlock;
49,946✔
239
      break;
49,946✔
240
    }
241
    // the pDataBlock are always the same one, no need to call this again
242
    code = setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId);
2,443,695✔
243
    if (code != TSDB_CODE_SUCCESS) {
2,443,847✔
244
      destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
26✔
245
      T_LONG_JMP(pTaskInfo->env, code);
26!
246
    }
247
    code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
2,443,821✔
248
    if (code != TSDB_CODE_SUCCESS) {
2,443,831!
249
      destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
×
250
      T_LONG_JMP(pTaskInfo->env, code);
×
251
    }
252

253
    code = doAggregateImpl(pOperator, pSup->pCtx);
2,443,831✔
254
    if (code != TSDB_CODE_SUCCESS) {
2,443,857!
UNCOV
255
      destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
×
UNCOV
256
      T_LONG_JMP(pTaskInfo->env, code);
×
257
    }
258

259
    destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
2,443,857✔
260
  }
261

262
  // the downstream operator may return with error code, so let's check the code before generating results.
263
  if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
949,083!
UNCOV
264
    T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
265
  }
266

267
  code = initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0);
949,083✔
268
  QUERY_CHECK_CODE(code, lino, _end);
949,257!
269
  pAggInfo->cleanGroupResInfo = true;
949,257✔
270

271
_end:
949,257✔
272
  if (code != TSDB_CODE_SUCCESS) {
949,257!
UNCOV
273
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
274
    pTaskInfo->code = code;
×
UNCOV
275
    T_LONG_JMP(pTaskInfo->env, code);
×
276
  }
277
  return pBlock != NULL;
949,257✔
278
}
279

280
int32_t getAggregateResultNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1,727,202✔
281
  int32_t           code = TSDB_CODE_SUCCESS;
1,727,202✔
282
  int32_t           lino = 0;
1,727,202✔
283
  SAggOperatorInfo* pAggInfo = pOperator->info;
1,727,202✔
284
  SOptrBasicInfo*   pInfo = &pAggInfo->binfo;
1,727,202✔
285

286
  if (pOperator->status == OP_EXEC_DONE) {
1,727,202✔
287
    (*ppRes) = NULL;
696,749✔
288
    return code;
696,749✔
289
  }
290

291
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1,030,453✔
292
  bool           hasNewGroups = false;
1,030,453✔
293
  do {
294
    hasNewGroups = nextGroupedResult(pOperator);
1,030,481✔
295
    code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
1,031,909✔
296
    QUERY_CHECK_CODE(code, lino, _end);
1,032,047!
297

298
    while (1) {
299
      doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf);
1,032,047✔
300
      code = doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
1,031,910✔
301
      QUERY_CHECK_CODE(code, lino, _end);
1,031,844!
302

303
      if (!hasRemainResults(&pAggInfo->groupResInfo)) {
1,031,844✔
304
        if (!hasNewGroups) setOperatorCompleted(pOperator);
949,091✔
305
        break;
949,218✔
306
      }
307

308
      if (pInfo->pRes->info.rows > 0) {
82,758!
309
        break;
82,758✔
310
      }
311
    }
312
  } while (pInfo->pRes->info.rows == 0 && hasNewGroups);
1,031,976✔
313

314
  size_t rows = blockDataGetNumOfRows(pInfo->pRes);
1,031,948✔
315
  pOperator->resultInfo.totalRows += rows;
1,031,842✔
316

317
_end:
1,031,842✔
318
  if (code != TSDB_CODE_SUCCESS) {
1,031,842!
UNCOV
319
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
320
    pTaskInfo->code = code;
×
UNCOV
321
    T_LONG_JMP(pTaskInfo->env, code);
×
322
  }
323

324
  (*ppRes) = (rows == 0) ? NULL : pInfo->pRes;
1,031,842✔
325
  return code;
1,031,842✔
326
}
327

328
static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
×
329
  SSDataBlock* pRes = NULL;
×
330
  int32_t code = getAggregateResultNext(pOperator, &pRes);
×
331
  return pRes;
×
332
}
333

334
int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) {
2,493,485✔
335
  int32_t code = TSDB_CODE_SUCCESS;
2,493,485✔
336
  for (int32_t k = 0; k < pOperator->exprSupp.numOfExprs; ++k) {
9,284,368✔
337
    if (functionNeedToExecute(&pCtx[k])) {
6,790,932✔
338
      // todo add a dummy function to avoid process check
339
      if (pCtx[k].fpSet.process == NULL) {
6,785,087✔
340
        continue;
314,022✔
341
      }
342

343
      if ((&pCtx[k])->input.pData[0] == NULL) {
6,471,065!
344
        code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
345
        qError("%s aggregate function error happens, input data is NULL.", GET_TASKID(pOperator->pTaskInfo));
×
346
      } else {
347
        code = pCtx[k].fpSet.process(&pCtx[k]);
6,471,065✔
348
      }
349

350
      if (code != TSDB_CODE_SUCCESS) {
6,471,752!
UNCOV
351
        if (pCtx[k].fpSet.cleanup != NULL) {
×
UNCOV
352
          pCtx[k].fpSet.cleanup(&pCtx[k]);
×
353
        }
UNCOV
354
        qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code));
×
UNCOV
355
        return code;
×
356
      }
357
    }
358
  }
359

360
  return TSDB_CODE_SUCCESS;
2,493,436✔
361
}
362

363
static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock) {
190,573✔
364
  int32_t code = TSDB_CODE_SUCCESS;
190,573✔
365
  int32_t lino = 0;
190,573✔
366
  SSDataBlock* pBlock = NULL;
190,573✔
367
  if (!tsCountAlwaysReturnValue) {
190,573✔
368
    return TSDB_CODE_SUCCESS;
96,675✔
369
  }
370

371
  SAggOperatorInfo* pAggInfo = pOperator->info;
93,898✔
372
  if (pAggInfo->groupKeyOptimized) {
93,898✔
373
    return TSDB_CODE_SUCCESS;
37,964✔
374
  }
375

376
  SOperatorInfo* downstream = pOperator->pDownstream[0];
55,934✔
377
  if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_PARTITION ||
55,934✔
378
      downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_SORT ||
55,620✔
379
      (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN &&
55,349✔
380
       ((STableScanInfo*)downstream->info)->hasGroupByTag == true)) {
32,618✔
381
    return TSDB_CODE_SUCCESS;
600✔
382
  }
383

384
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
55,334✔
385

386
  if (!pAggInfo->hasCountFunc) {
55,334✔
387
    return TSDB_CODE_SUCCESS;
35,162✔
388
  }
389

390
  code = createDataBlock(&pBlock);
20,172✔
391
  if (code) {
20,188!
392
    return code;
×
393
  }
394

395
  pBlock->info.rows = 1;
20,188✔
396
  pBlock->info.capacity = 0;
20,188✔
397

398
  for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
51,245✔
399
    SColumnInfoData colInfo = {0};
31,059✔
400
    colInfo.hasNull = true;
31,059✔
401
    colInfo.info.type = TSDB_DATA_TYPE_NULL;
31,059✔
402
    colInfo.info.bytes = 1;
31,059✔
403

404
    SExprInfo* pOneExpr = &pOperator->exprSupp.pExprInfo[i];
31,059✔
405
    for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
62,528✔
406
      SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
31,471✔
407
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
31,471✔
408
        int32_t slotId = pFuncParam->pCol->slotId;
31,444✔
409
        int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
31,444✔
410
        if (slotId >= numOfCols) {
31,444✔
411
          code = taosArrayEnsureCap(pBlock->pDataBlock, slotId + 1);
21,059✔
412
          QUERY_CHECK_CODE(code, lino, _end);
21,058!
413

414
          for (int32_t k = numOfCols; k < slotId + 1; ++k) {
47,125✔
415
            void* tmp = taosArrayPush(pBlock->pDataBlock, &colInfo);
26,068✔
416
            QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
26,067!
417
          }
418
        }
419
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
27✔
420
        // do nothing
421
      }
422
    }
423
  }
424

425
  code = blockDataEnsureCapacity(pBlock, pBlock->info.rows);
20,186✔
426
  QUERY_CHECK_CODE(code, lino, _end);
20,189!
427

428
  for (int32_t i = 0; i < blockDataGetNumOfCols(pBlock); ++i) {
46,261✔
429
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
26,065✔
430
    QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
26,071!
431
    colDataSetNULL(pColInfoData, 0);
432
  }
433
  *ppBlock = pBlock;
20,189✔
434

435
_end:
20,189✔
436
  if (code != TSDB_CODE_SUCCESS) {
20,189!
437
    blockDataDestroy(pBlock);
×
438
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
439
  }
440
  return code;
20,185✔
441
}
442

443
void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock) {
2,443,646✔
444
  if (!blockAllocated) {
2,443,646✔
445
    return;
2,423,636✔
446
  }
447

448
  blockDataDestroy(*ppBlock);
20,010✔
449
  *ppBlock = NULL;
20,189✔
450
}
451

452
int32_t setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
2,493,696✔
453
  int32_t           code = TSDB_CODE_SUCCESS;
2,493,696✔
454
  SAggOperatorInfo* pAggInfo = pOperator->info;
2,493,696✔
455
  if (pAggInfo->groupId != UINT64_MAX && pAggInfo->groupId == groupId) {
2,493,696✔
456
    return code;
937,354✔
457
  }
458

459
  code = doSetTableGroupOutputBuf(pOperator, numOfOutput, groupId);
1,556,342✔
460

461
  // record the current active group id
462
  pAggInfo->groupId = groupId;
1,556,485✔
463
  return code;
1,556,485✔
464
}
465

466
int32_t doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
1,556,751✔
467
  // for simple group by query without interval, all the tables belong to one group result.
468
  int32_t           code = TSDB_CODE_SUCCESS;
1,556,751✔
469
  int32_t           lino = 0;
1,556,751✔
470
  SExecTaskInfo*    pTaskInfo = pOperator->pTaskInfo;
1,556,751✔
471
  SAggOperatorInfo* pAggInfo = pOperator->info;
1,556,751✔
472

473
  SResultRowInfo* pResultRowInfo = &pAggInfo->binfo.resultRowInfo;
1,556,751✔
474
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
1,556,751✔
475
  int32_t*        rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset;
1,556,751✔
476

477
  SResultRow* pResultRow =
478
      doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, (char*)&groupId, sizeof(groupId), true,
1,556,751✔
479
                             groupId, pTaskInfo, false, &pAggInfo->aggSup, true);
480
  if (pResultRow == NULL || pTaskInfo->code != 0) {
1,557,030!
UNCOV
481
    code = pTaskInfo->code;
×
UNCOV
482
    lino = __LINE__;
×
UNCOV
483
    goto _end;
×
484
  }
485
  /*
486
   * not assign result buffer yet, add new result buffer
487
   * all group belong to one result set, and each group result has different group id so set the id to be one
488
   */
489
  if (pResultRow->pageId == -1) {
1,557,041!
490
    code = addNewResultRowBuf(pResultRow, pAggInfo->aggSup.pResultBuf, pAggInfo->binfo.pRes->info.rowSize);
×
491
    QUERY_CHECK_CODE(code, lino, _end);
×
492
  }
493

494
  code = setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
1,557,041✔
495
  QUERY_CHECK_CODE(code, lino, _end);
1,556,425✔
496

497
_end:
1,556,399✔
498
  if (code != TSDB_CODE_SUCCESS) {
1,556,414✔
499
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
26!
500
  }
501
  return code;
1,556,432✔
502
}
503

504
// a new buffer page for each table. Needs to opt this design
505
int32_t addNewResultRowBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, uint32_t size) {
×
506
  if (pWindowRes->pageId != -1) {
×
507
    return 0;
×
508
  }
509

510
  SFilePage* pData = NULL;
×
511

512
  // in the first scan, new space needed for results
513
  int32_t pageId = -1;
×
514
  SArray* list = getDataBufPagesIdList(pResultBuf);
×
515

516
  if (taosArrayGetSize(list) == 0) {
×
517
    pData = getNewBufPage(pResultBuf, &pageId);
×
518
    if (pData == NULL) {
×
519
      qError("failed to get buffer, code:%s", tstrerror(terrno));
×
520
      return terrno;
×
521
    }
522
    pData->num = sizeof(SFilePage);
×
523
  } else {
524
    SPageInfo* pi = getLastPageInfo(list);
×
525
    pData = getBufPage(pResultBuf, getPageId(pi));
×
526
    if (pData == NULL) {
×
527
      qError("failed to get buffer, code:%s", tstrerror(terrno));
×
528
      return terrno;
×
529
    }
530

531
    pageId = getPageId(pi);
×
532

533
    if (pData->num + size > getBufPageSize(pResultBuf)) {
×
534
      // release current page first, and prepare the next one
535
      releaseBufPageInfo(pResultBuf, pi);
×
536

537
      pData = getNewBufPage(pResultBuf, &pageId);
×
538
      if (pData == NULL) {
×
539
        qError("failed to get buffer, code:%s", tstrerror(terrno));
×
540
        return terrno;
×
541
      }
542
      pData->num = sizeof(SFilePage);
×
543
    }
544
  }
545

546
  if (pData == NULL) {
×
547
    return -1;
×
548
  }
549

550
  // set the number of rows in current disk page
551
  if (pWindowRes->pageId == -1) {  // not allocated yet, allocate new buffer
×
552
    pWindowRes->pageId = pageId;
×
553
    pWindowRes->offset = (int32_t)pData->num;
×
554

555
    pData->num += size;
×
556
  }
557

558
  return 0;
×
559
}
560

561
int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
5,536,785✔
562
                         const char* pKey) {
563
  int32_t code = 0;
5,536,785✔
564
  //  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
565

566
  pAggSup->currentPageId = -1;
5,536,785✔
567
  pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
5,536,785✔
568
  pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t));
5,542,475✔
569
  pAggSup->pResultRowHashTable = tSimpleHashInit(100, taosFastHash);
5,545,008✔
570

571
  if (pAggSup->keyBuf == NULL || pAggSup->pResultRowHashTable == NULL) {
5,539,440!
UNCOV
572
    return terrno;
×
573
  }
574

575
  uint32_t defaultPgsz = 0;
5,540,056✔
576
  uint32_t defaultBufsz = 0;
5,540,056✔
577
  code = getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz);
5,540,056✔
578
  if (code) {
5,542,816!
579
    qError("failed to get buff page size, rowSize:%d", pAggSup->resultRowSize);
×
580
    return code;
×
581
  }
582

583
  if (!osTempSpaceAvailable()) {
5,542,816!
584
    code = TSDB_CODE_NO_DISKSPACE;
×
585
    qError("Init stream agg supporter failed since %s, key:%s, tempDir:%s", tstrerror(code), pKey, tsTempDir);
×
586
    return code;
×
587
  }
588

589
  code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, tsTempDir);
5,540,748✔
590
  if (code != TSDB_CODE_SUCCESS) {
5,545,096✔
591
    qError("Create agg result buf failed since %s, %s", tstrerror(code), pKey);
842!
UNCOV
592
    return code;
×
593
  }
594

595
  return code;
5,544,254✔
596
}
597

598
void cleanupResultInfoInStream(SExecTaskInfo* pTaskInfo, void* pState, SExprSupp* pSup, SGroupResInfo* pGroupResInfo) {
1,712✔
599
  int32_t         code = TSDB_CODE_SUCCESS;
1,712✔
600
  SStorageAPI*    pAPI = &pTaskInfo->storageAPI;
1,712✔
601
  int32_t         numOfExprs = pSup->numOfExprs;
1,712✔
602
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
1,712✔
603
  SqlFunctionCtx* pCtx = pSup->pCtx;
1,712✔
604
  int32_t         numOfRows = getNumOfTotalRes(pGroupResInfo);
1,712✔
605
  bool            needCleanup = false;
1,712✔
606

607
  for (int32_t j = 0; j < numOfExprs; ++j) {
28,166✔
608
    needCleanup |= pCtx[j].needCleanup;
26,454✔
609
  }
610
  if (!needCleanup) {
1,712!
611
    return;
1,712✔
612
  }
613
  
614
  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
×
615
    SResultWindowInfo* pWinInfo = taosArrayGet(pGroupResInfo->pRows, i);
×
616
    SRowBuffPos*       pPos = pWinInfo->pStatePos;
×
617
    SResultRow*        pRow = NULL;
×
618

619
    code = pAPI->stateStore.streamStateGetByPos(pState, pPos, (void**)&pRow);
×
620
    if (TSDB_CODE_SUCCESS != code) {
×
621
      qError("failed to get state by pos, code:%s, %s", tstrerror(code), GET_TASKID(pTaskInfo));
×
622
      continue;
×
623
    }
624

625
    for (int32_t j = 0; j < numOfExprs; ++j) {
×
626
      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
×
627
      if (pCtx[j].fpSet.cleanup) {
×
628
        pCtx[j].fpSet.cleanup(&pCtx[j]);
×
629
      }
630
    }
631
  }
632
}
633

634
void cleanupResultInfoInGroupResInfo(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf* pBuf,
2,577,182✔
635
                                  SGroupResInfo* pGroupResInfo) {
636
  int32_t         numOfExprs = pSup->numOfExprs;
2,577,182✔
637
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
2,577,182✔
638
  SqlFunctionCtx* pCtx = pSup->pCtx;
2,577,182✔
639
  int32_t         numOfRows = getNumOfTotalRes(pGroupResInfo);
2,577,182✔
640
  bool            needCleanup = false;
2,577,151✔
641

642
  for (int32_t j = 0; j < numOfExprs; ++j) {
8,529,543✔
643
    needCleanup |= pCtx[j].needCleanup;
5,952,392✔
644
  }
645
  if (!needCleanup) {
2,577,151✔
646
    return;
2,556,588✔
647
  }
648

649
  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
20,563!
650
    SResultRow*        pRow = NULL;
×
651
    SResKeyPos*        pPos = taosArrayGetP(pGroupResInfo->pRows, i);
×
652
    SFilePage*         page = getBufPage(pBuf, pPos->pos.pageId);
×
653
    if (page == NULL) {
×
654
      qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
×
655
      continue;
×
656
    }
657
    pRow = (SResultRow*)((char*)page + pPos->pos.offset);
×
658

659

660
    for (int32_t j = 0; j < numOfExprs; ++j) {
×
661
      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
×
662
      if (pCtx[j].fpSet.cleanup) {
×
663
        pCtx[j].fpSet.cleanup(&pCtx[j]);
×
664
      }
665
    }
666
    releaseBufPage(pBuf, page);
×
667
  }
668
}
669

670
void cleanupResultInfoInHashMap(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf* pBuf,
845,844✔
671
                       SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap) {
672
  int32_t         numOfExprs = pSup->numOfExprs;
845,844✔
673
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
845,844✔
674
  SqlFunctionCtx* pCtx = pSup->pCtx;
845,844✔
675
  bool            needCleanup = false;
845,844✔
676
  for (int32_t j = 0; j < numOfExprs; ++j) {
2,899,102✔
677
    needCleanup |= pCtx[j].needCleanup;
2,053,258✔
678
  }
679
  if (!needCleanup) {
845,844✔
680
    return;
844,399✔
681
  }
682

683
  // begin from last iter
684
  void*   pData = pGroupResInfo->dataPos;
1,445✔
685
  int32_t iter = pGroupResInfo->iter;
1,445✔
686
  while ((pData = tSimpleHashIterate(pHashmap, pData, &iter)) != NULL) {
1,445!
UNCOV
687
    SResultRowPosition* pos = pData;
×
688

UNCOV
689
    SFilePage* page = getBufPage(pBuf, pos->pageId);
×
UNCOV
690
    if (page == NULL) {
×
691
      qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
×
692
      continue;
×
693
    }
694

UNCOV
695
    SResultRow* pRow = (SResultRow*)((char*)page + pos->offset);
×
696

UNCOV
697
    for (int32_t j = 0; j < numOfExprs; ++j) {
×
UNCOV
698
      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
×
UNCOV
699
      if (pCtx[j].fpSet.cleanup) {
×
UNCOV
700
        pCtx[j].fpSet.cleanup(&pCtx[j]);
×
701
      }
702
    }
703

UNCOV
704
    releaseBufPage(pBuf, page);
×
705
  }
706
}
707

708
void cleanupResultInfo(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SGroupResInfo* pGroupResInfo,
3,423,001✔
709
                       SAggSupporter *pAggSup, bool cleanGroupResInfo) {
710
  if (cleanGroupResInfo) {
3,423,001✔
711
    cleanupResultInfoInGroupResInfo(pTaskInfo, pSup, pAggSup->pResultBuf, pGroupResInfo);
2,577,238✔
712
  } else {
713
    cleanupResultInfoInHashMap(pTaskInfo, pSup, pAggSup->pResultBuf, pGroupResInfo, pAggSup->pResultRowHashTable);
845,763✔
714
  }
715
}
3,423,019✔
716
void cleanupAggSup(SAggSupporter* pAggSup) {
5,544,996✔
717
  taosMemoryFreeClear(pAggSup->keyBuf);
5,544,996!
718
  tSimpleHashCleanup(pAggSup->pResultRowHashTable);
5,545,323✔
719
  destroyDiskbasedBuf(pAggSup->pResultBuf);
5,545,384✔
720
}
5,545,383✔
721

722
int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
5,540,879✔
723
                   const char* pkey, void* pState, SFunctionStateStore* pStore) {
724
  int32_t code = initExprSupp(pSup, pExprInfo, numOfCols, pStore);
5,540,879✔
725
  if (code != TSDB_CODE_SUCCESS) {
5,543,031!
UNCOV
726
    return code;
×
727
  }
728

729
  code = doInitAggInfoSup(pAggSup, pSup->pCtx, numOfCols, keyBufSize, pkey);
5,543,031✔
730
  if (code != TSDB_CODE_SUCCESS) {
5,544,494!
UNCOV
731
    return code;
×
732
  }
733

734
  for (int32_t i = 0; i < numOfCols; ++i) {
26,046,405✔
735
    pSup->pCtx[i].hasWindowOrGroup = pSup->hasWindowOrGroup;
20,501,911✔
736
    if (pState) {
20,501,911✔
737
      pSup->pCtx[i].saveHandle.pBuf = NULL;
253,305✔
738
      pSup->pCtx[i].saveHandle.pState = pState;
253,305✔
739
      pSup->pCtx[i].exprIdx = i;
253,305✔
740
    } else {
741
      pSup->pCtx[i].saveHandle.pBuf = pAggSup->pResultBuf;
20,248,606✔
742
    }
743
  }
744

745
  return TSDB_CODE_SUCCESS;
5,544,494✔
746
}
747

748
int32_t applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData,
478,349,430✔
749
                                        int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput) {
750
  int32_t code = TSDB_CODE_SUCCESS;
478,349,430✔
751
  for (int32_t k = 0; k < numOfOutput; ++k) {
1,773,294,220✔
752
    // keep it temporarily
753
    SFunctionCtxStatus status = {0};
1,296,619,023✔
754
    functionCtxSave(&pCtx[k], &status);
1,296,619,023✔
755

756
    pCtx[k].input.startRowIndex = offset;
1,296,783,295✔
757
    pCtx[k].input.numOfRows = forwardStep;
1,296,783,295✔
758

759
    // not a whole block involved in query processing, statistics data can not be used
760
    // NOTE: the original value of isSet have been changed here
761
    if (pCtx[k].input.colDataSMAIsSet && forwardStep < numOfTotal) {
1,296,783,295!
762
      pCtx[k].input.colDataSMAIsSet = false;
×
763
    }
764

765
    if (pCtx[k].isPseudoFunc) {
1,296,783,295✔
766
      SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[k]);
304,213,396✔
767

768
      char* p = GET_ROWCELL_INTERBUF(pEntryInfo);
304,213,396✔
769

770
      SColumnInfoData idata = {0};
304,213,396✔
771
      idata.info.type = TSDB_DATA_TYPE_BIGINT;
304,213,396✔
772
      idata.info.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes;
304,213,396✔
773
      idata.pData = p;
304,213,396✔
774

775
      SScalarParam out = {.columnData = &idata};
304,213,396✔
776
      SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData};
304,213,396✔
777
      code = pCtx[k].sfp.process(&tw, 1, &out);
304,213,396✔
778
      if (code != TSDB_CODE_SUCCESS) {
307,054,357!
UNCOV
779
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
780
        taskInfo->code = code;
×
781
        return code;
×
782
      }
783
      pEntryInfo->numOfRes = 1;
307,264,661✔
784
    } else {
785
      if (functionNeedToExecute(&pCtx[k]) && pCtx[k].fpSet.process != NULL) {
992,569,899✔
786
        if ((&pCtx[k])->input.pData[0] == NULL) {
778,308,933!
787
          code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
788
          qError("%s apply functions error, input data is NULL.", GET_TASKID(taskInfo));
×
789
        } else {
790
          code = pCtx[k].fpSet.process(&pCtx[k]);
778,308,933✔
791
        }
792

793
        if (code != TSDB_CODE_SUCCESS) {
779,494,012!
UNCOV
794
          if (pCtx[k].fpSet.cleanup != NULL) {
×
UNCOV
795
            pCtx[k].fpSet.cleanup(&pCtx[k]);
×
796
          }
UNCOV
797
          qError("%s apply functions error, code: %s", GET_TASKID(taskInfo), tstrerror(code));
×
UNCOV
798
          taskInfo->code = code;
×
UNCOV
799
          return code;
×
800
        }
801
      }
802

803
      // restore it
804
      functionCtxRestore(&pCtx[k], &status);
1,005,024,564✔
805
    }
806
  }
807
  return code;
476,675,197✔
808
}
809

810
void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
1,295,465,998✔
811
  pStatus->hasAgg = pCtx->input.colDataSMAIsSet;
1,295,465,998✔
812
  pStatus->numOfRows = pCtx->input.numOfRows;
1,295,465,998✔
813
  pStatus->startOffset = pCtx->input.startRowIndex;
1,295,465,998✔
814
}
1,295,465,998✔
815

816
void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
1,002,846,139✔
817
  pCtx->input.colDataSMAIsSet = pStatus->hasAgg;
1,002,846,139✔
818
  pCtx->input.numOfRows = pStatus->numOfRows;
1,002,846,139✔
819
  pCtx->input.startRowIndex = pStatus->startOffset;
1,002,846,139✔
820
}
1,002,846,139✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc