• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3584

17 Jan 2025 07:28AM UTC coverage: 63.756% (-0.1%) from 63.876%
#3584

push

travis-ci

web-flow
Merge pull request #29594 from taosdata/fix/insert-when-2-replicas

fix/insert-when-2-replicas

141233 of 284535 branches covered (49.64%)

Branch coverage included in aggregate %.

0 of 2 new or added lines in 1 file covered. (0.0%)

684 existing lines in 111 files now uncovered.

219774 of 281695 relevant lines covered (78.02%)

18696822.13 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.66
/source/libs/executor/src/aggregateoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "filter.h"
17
#include "function.h"
18
#include "os.h"
19
#include "querynodes.h"
20
#include "tfill.h"
21
#include "tname.h"
22

23
#include "executorInt.h"
24
#include "index.h"
25
#include "operator.h"
26
#include "query.h"
27
#include "querytask.h"
28
#include "tcompare.h"
29
#include "tdatablock.h"
30
#include "tglobal.h"
31
#include "thash.h"
32
#include "ttypes.h"
33

34
typedef struct {
35
  bool    hasAgg;
36
  int32_t numOfRows;
37
  int32_t startOffset;
38
} SFunctionCtxStatus;
39

40
typedef struct SAggOperatorInfo {
41
  SOptrBasicInfo   binfo;
42
  SAggSupporter    aggSup;
43
  STableQueryInfo* current;
44
  uint64_t         groupId;
45
  SGroupResInfo    groupResInfo;
46
  SExprSupp        scalarExprSup;
47
  bool             groupKeyOptimized;
48
  bool             hasValidBlock;
49
  SSDataBlock*     pNewGroupBlock;
50
  bool             hasCountFunc;
51
  SOperatorInfo*   pOperator;
52
  bool             cleanGroupResInfo;
53
} SAggOperatorInfo;
54

55
static void destroyAggOperatorInfo(void* param);
56
static int32_t setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
57

58
static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock);
59
static void    destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock);
60

61
static int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx);
62
static int32_t getAggregateResultNext(SOperatorInfo* pOperator, SSDataBlock** ppRes);
63
static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
64
                                const char* pKey);
65

66
static int32_t addNewResultRowBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, uint32_t size);
67

68
static int32_t doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
69

70
static void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus);
71
static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus);
72

73
int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo,
1,935,137✔
74
                                    SOperatorInfo** pOptrInfo) {
75
  QRY_PARAM_CHECK(pOptrInfo);
1,935,137!
76

77
  int32_t    lino = 0;
1,935,137✔
78
  int32_t    code = 0;
1,935,137✔
79
  int32_t    num = 0;
1,935,137✔
80
  SExprInfo* pExprInfo = NULL;
1,935,137✔
81
  int32_t    numOfScalarExpr = 0;
1,935,137✔
82
  SExprInfo* pScalarExprInfo = NULL;
1,935,137✔
83

84
  SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo));
1,935,137!
85
  SOperatorInfo*    pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
1,935,750!
86
  if (pInfo == NULL || pOperator == NULL) {
1,936,637!
87
    code = terrno;
×
88
    goto _error;
×
89
  }
90

91
  pOperator->exprSupp.hasWindowOrGroup = false;
1,936,962✔
92

93
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pAggNode->node.pOutputDataBlockDesc);
1,936,962✔
94
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
1,939,458!
95
  initBasicInfo(&pInfo->binfo, pResBlock);
1,939,458✔
96

97
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
1,937,831✔
98
  initResultSizeInfo(&pOperator->resultInfo, 4096);
1,937,831✔
99

100
  code = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &pExprInfo, &num);
1,938,916✔
101
  TSDB_CHECK_CODE(code, lino, _error);
1,936,674!
102

103
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
1,936,674✔
104
                               pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
1,936,674✔
105
  TSDB_CHECK_CODE(code, lino, _error);
1,938,169!
106

107
  if (pAggNode->pExprs != NULL) {
1,938,169✔
108
    code = createExprInfo(pAggNode->pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr);
162,103✔
109
    TSDB_CHECK_CODE(code, lino, _error);
162,088!
110
  }
111

112
  code = initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
1,938,154✔
113
  TSDB_CHECK_CODE(code, lino, _error);
1,936,798!
114

115
  code = filterInitFromNode((SNode*)pAggNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
1,936,798✔
116
  TSDB_CHECK_CODE(code, lino, _error);
1,937,708!
117

118
  pInfo->binfo.mergeResultBlock = pAggNode->mergeDataBlock;
1,937,708✔
119
  pInfo->groupKeyOptimized = pAggNode->groupKeyOptimized;
1,937,708✔
120
  pInfo->groupId = UINT64_MAX;
1,937,708✔
121
  pInfo->binfo.inputTsOrder = pAggNode->node.inputTsOrder;
1,937,708✔
122
  pInfo->binfo.outputTsOrder = pAggNode->node.outputTsOrder;
1,937,708✔
123
  pInfo->hasCountFunc = pAggNode->hasCountLikeFunc;
1,937,708✔
124
  pInfo->pOperator = pOperator;
1,937,708✔
125
  pInfo->cleanGroupResInfo = false;
1,937,708✔
126

127
  setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG,
1,937,708✔
128
                  !pAggNode->node.forceCreateNonBlockingOptr, OP_NOT_OPENED, pInfo, pTaskInfo);
1,937,708✔
129
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, getAggregateResultNext, NULL, destroyAggOperatorInfo,
1,937,275✔
130
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
131

132
  if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
1,936,122✔
133
    STableScanInfo* pTableScanInfo = downstream->info;
480,404✔
134
    pTableScanInfo->base.pdInfo.pExprSup = &pOperator->exprSupp;
480,404✔
135
    pTableScanInfo->base.pdInfo.pAggSup = &pInfo->aggSup;
480,404✔
136
  }
137

138
  code = appendDownstream(pOperator, &downstream, 1);
1,936,122✔
139
  if (code != TSDB_CODE_SUCCESS) {
1,937,011!
140
    goto _error;
×
141
  }
142

143
  *pOptrInfo = pOperator;
1,937,011✔
144
  return TSDB_CODE_SUCCESS;
1,937,011✔
145

146
_error:
×
147
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
148
  if (pInfo != NULL) {
×
149
    destroyAggOperatorInfo(pInfo);
×
150
  }
151
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
152
  pTaskInfo->code = code;
×
153
  return code;
×
154
}
155

156
void destroyAggOperatorInfo(void* param) {
1,939,307✔
157
  if (param == NULL) {
1,939,307!
158
    return;
×
159
  }
160
  SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param;
1,939,307✔
161
  cleanupBasicInfo(&pInfo->binfo);
1,939,307✔
162

163
  if (pInfo->pOperator) {
1,940,181!
164
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
1,940,189✔
165
                      pInfo->cleanGroupResInfo);
1,940,189✔
166
    pInfo->pOperator = NULL;
1,939,791✔
167
  }
168
  cleanupAggSup(&pInfo->aggSup);
1,939,783✔
169
  cleanupExprSupp(&pInfo->scalarExprSup);
1,940,203✔
170
  cleanupGroupResInfo(&pInfo->groupResInfo);
1,939,865✔
171
  taosMemoryFreeClear(param);
1,940,135!
172
}
173

174
/**
175
 * @brief get blocks from downstream and fill results into groupedRes after aggragation
176
 * @retval false if no more groups
177
 * @retval true if there could have new groups coming
178
 * @note if pOperator.blocking is true, scan all blocks from downstream, all groups are handled
179
 *       if false, fill results of ONE GROUP
180
 * */
181
static bool nextGroupedResult(SOperatorInfo* pOperator) {
2,057,722✔
182
  int32_t           code = TSDB_CODE_SUCCESS;
2,057,722✔
183
  int32_t           lino = 0;
2,057,722✔
184
  SExecTaskInfo*    pTaskInfo = pOperator->pTaskInfo;
2,057,722✔
185
  SAggOperatorInfo* pAggInfo = pOperator->info;
2,057,722✔
186

187
  if(!pAggInfo) {
2,057,722!
188
    qError("function:%s, pAggInfo is NULL", __func__);
×
189
    return false;
×
190
  }
191
  if (pOperator->blocking && pAggInfo->hasValidBlock) {
2,057,722✔
192
    return false;
74,744✔
193
  }
194

195
  SExprSupp*   pSup = &pOperator->exprSupp;
1,982,978✔
196
  int64_t      st = taosGetTimestampUs();
1,988,311✔
197
  int32_t      order = pAggInfo->binfo.inputTsOrder;
1,988,311✔
198
  SSDataBlock* pBlock = pAggInfo->pNewGroupBlock;
1,988,311✔
199

200
  pAggInfo->cleanGroupResInfo = false;
1,988,311✔
201
  if (pBlock) {
1,988,311✔
202
    pAggInfo->pNewGroupBlock = NULL;
49,924✔
203
    tSimpleHashClear(pAggInfo->aggSup.pResultRowHashTable);
49,924✔
204
    code = setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId);
49,924✔
205
    QUERY_CHECK_CODE(code, lino, _end);
49,924!
206
    code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
49,924✔
207
    QUERY_CHECK_CODE(code, lino, _end);
49,924!
208

209
    code = doAggregateImpl(pOperator, pSup->pCtx);
49,924✔
210
    QUERY_CHECK_CODE(code, lino, _end);
49,924!
211
  }
212
  while (1) {
8,738,245✔
213
    bool blockAllocated = false;
10,726,556✔
214
    pBlock = getNextBlockFromDownstream(pOperator, 0);
10,726,556✔
215
    if (pBlock == NULL) {
10,719,442✔
216
      if (!pAggInfo->hasValidBlock) {
2,071,241✔
217
        code = createDataBlockForEmptyInput(pOperator, &pBlock);
250,524✔
218
        QUERY_CHECK_CODE(code, lino, _end);
250,469!
219

220
        if (pBlock == NULL) {
250,469✔
221
          break;
112,933✔
222
        }
223
        blockAllocated = true;
137,536✔
224
      } else {
225
        break;
1,820,717✔
226
      }
227
    }
228
    pAggInfo->hasValidBlock = true;
8,785,737✔
229
    pAggInfo->binfo.pRes->info.scanFlag = pBlock->info.scanFlag;
8,785,737✔
230

231
    // there is an scalar expression that needs to be calculated before apply the group aggregation.
232
    if (pAggInfo->scalarExprSup.pExprInfo != NULL && !blockAllocated) {
8,785,737✔
233
      SExprSupp* pSup1 = &pAggInfo->scalarExprSup;
1,192,186✔
234
      code = projectApplyFunctions(pSup1->pExprInfo, pBlock, pBlock, pSup1->pCtx, pSup1->numOfExprs, NULL);
1,192,186✔
235
      if (code != TSDB_CODE_SUCCESS) {
1,192,053!
236
        destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
×
237
        T_LONG_JMP(pTaskInfo->env, code);
×
238
      }
239
    }
240
    // if non-blocking mode and new group arrived, save the block and break
241
    if (!pOperator->blocking && pAggInfo->groupId != UINT64_MAX && pBlock->info.id.groupId != pAggInfo->groupId) {
8,785,604✔
242
      pAggInfo->pNewGroupBlock = pBlock;
49,976✔
243
      break;
49,976✔
244
    }
245
    // the pDataBlock are always the same one, no need to call this again
246
    code = setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId);
8,735,628✔
247
    if (code != TSDB_CODE_SUCCESS) {
8,737,529✔
248
      destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
8✔
249
      T_LONG_JMP(pTaskInfo->env, code);
8!
250
    }
251
    code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
8,737,521✔
252
    if (code != TSDB_CODE_SUCCESS) {
8,737,747!
253
      destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
×
254
      T_LONG_JMP(pTaskInfo->env, code);
×
255
    }
256

257
    code = doAggregateImpl(pOperator, pSup->pCtx);
8,737,747✔
258
    if (code != TSDB_CODE_SUCCESS) {
8,738,049!
259
      destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
×
260
      T_LONG_JMP(pTaskInfo->env, code);
×
261
    }
262

263
    destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
8,738,049✔
264
  }
265

266
  // the downstream operator may return with error code, so let's check the code before generating results.
267
  if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
1,983,626✔
268
    T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
2!
269
  }
270

271
  code = initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0);
1,983,624✔
272
  QUERY_CHECK_CODE(code, lino, _end);
1,983,598!
273
  pAggInfo->cleanGroupResInfo = true;
1,983,598✔
274

275
_end:
1,983,598✔
276
  if (code != TSDB_CODE_SUCCESS) {
1,983,598!
277
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
278
    pTaskInfo->code = code;
×
279
    T_LONG_JMP(pTaskInfo->env, code);
×
280
  }
281
  return pBlock != NULL;
1,983,598✔
282
}
283

284
int32_t getAggregateResultNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
3,854,441✔
285
  int32_t           code = TSDB_CODE_SUCCESS;
3,854,441✔
286
  int32_t           lino = 0;
3,854,441✔
287
  SAggOperatorInfo* pAggInfo = pOperator->info;
3,854,441✔
288
  SOptrBasicInfo*   pInfo = &pAggInfo->binfo;
3,854,441✔
289

290
  if (pOperator->status == OP_EXEC_DONE) {
3,854,441✔
291
    (*ppRes) = NULL;
1,796,789✔
292
    return code;
1,796,789✔
293
  }
294

295
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2,057,652✔
296
  bool           hasNewGroups = false;
2,057,652✔
297
  do {
298
    hasNewGroups = nextGroupedResult(pOperator);
2,057,680✔
299
    code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
2,057,886✔
300
    QUERY_CHECK_CODE(code, lino, _end);
2,058,537!
301

302
    while (1) {
303
      doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf);
2,058,537✔
304
      code = doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
2,057,042✔
305
      QUERY_CHECK_CODE(code, lino, _end);
2,057,189!
306

307
      if (!hasRemainResults(&pAggInfo->groupResInfo)) {
2,057,189✔
308
        if (!hasNewGroups) setOperatorCompleted(pOperator);
1,982,949✔
309
        break;
1,983,298✔
310
      }
311

312
      if (pInfo->pRes->info.rows > 0) {
74,733!
313
        break;
74,733✔
314
      }
315
    }
316
  } while (pInfo->pRes->info.rows == 0 && hasNewGroups);
2,058,031✔
317

318
  size_t rows = blockDataGetNumOfRows(pInfo->pRes);
2,058,003✔
319
  pOperator->resultInfo.totalRows += rows;
2,057,395✔
320

321
_end:
2,057,395✔
322
  if (code != TSDB_CODE_SUCCESS) {
2,057,395!
323
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
324
    pTaskInfo->code = code;
×
325
    T_LONG_JMP(pTaskInfo->env, code);
×
326
  }
327

328
  (*ppRes) = (rows == 0) ? NULL : pInfo->pRes;
2,057,395✔
329
  return code;
2,057,395✔
330
}
331

332
static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
×
333
  SSDataBlock* pRes = NULL;
×
334
  int32_t code = getAggregateResultNext(pOperator, &pRes);
×
335
  return pRes;
×
336
}
337

338
int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) {
8,787,205✔
339
  int32_t code = TSDB_CODE_SUCCESS;
8,787,205✔
340
  if (!pOperator || (pOperator->exprSupp.numOfExprs > 0 && pCtx == NULL)) {
8,787,205!
341
    qError("%s failed at line %d since pCtx is NULL.", __func__, __LINE__);
×
342
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
343
  }
344
  for (int32_t k = 0; k < pOperator->exprSupp.numOfExprs; ++k) {
19,339,549✔
345
    if (functionNeedToExecute(&pCtx[k])) {
10,550,536✔
346
      // todo add a dummy function to avoid process check
347
      if (pCtx[k].fpSet.process == NULL) {
10,549,489✔
348
        continue;
152,218✔
349
      }
350

351
      if ((&pCtx[k])->input.pData[0] == NULL) {
10,397,271!
352
        code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
353
        qError("%s aggregate function error happens, input data is NULL.", GET_TASKID(pOperator->pTaskInfo));
×
354
      } else {
355
        code = pCtx[k].fpSet.process(&pCtx[k]);
10,397,271✔
356
      }
357

358
      if (code != TSDB_CODE_SUCCESS) {
10,398,247!
359
        if (pCtx[k].fpSet.cleanup != NULL) {
×
360
          pCtx[k].fpSet.cleanup(&pCtx[k]);
×
361
        }
362
        qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code));
×
363
        return code;
×
364
      }
365
    }
366
  }
367

368
  return TSDB_CODE_SUCCESS;
8,789,013✔
369
}
370

371
static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock) {
250,466✔
372
  int32_t code = TSDB_CODE_SUCCESS;
250,466✔
373
  int32_t lino = 0;
250,466✔
374
  SSDataBlock* pBlock = NULL;
250,466✔
375
  if (!tsCountAlwaysReturnValue) {
250,466✔
376
    return TSDB_CODE_SUCCESS;
48,363✔
377
  }
378

379
  SAggOperatorInfo* pAggInfo = pOperator->info;
202,103✔
380
  if (pAggInfo->groupKeyOptimized) {
202,103✔
381
    return TSDB_CODE_SUCCESS;
32,598✔
382
  }
383

384
  SOperatorInfo* downstream = pOperator->pDownstream[0];
169,505✔
385
  if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_PARTITION ||
169,505✔
386
      downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_SORT ||
169,222✔
387
      (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN &&
168,997✔
388
       ((STableScanInfo*)downstream->info)->hasGroupByTag == true)) {
29,551✔
389
    return TSDB_CODE_SUCCESS;
551✔
390
  }
391

392
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
168,954✔
393

394
  if (!pAggInfo->hasCountFunc) {
168,954✔
395
    return TSDB_CODE_SUCCESS;
31,420✔
396
  }
397

398
  code = createDataBlock(&pBlock);
137,534✔
399
  if (code) {
137,562!
400
    return code;
×
401
  }
402

403
  pBlock->info.rows = 1;
137,562✔
404
  pBlock->info.capacity = 0;
137,562✔
405

406
  for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
286,247✔
407
    SColumnInfoData colInfo = {0};
148,720✔
408
    colInfo.hasNull = true;
148,720✔
409
    colInfo.info.type = TSDB_DATA_TYPE_NULL;
148,720✔
410
    colInfo.info.bytes = 1;
148,720✔
411

412
    SExprInfo* pOneExpr = &pOperator->exprSupp.pExprInfo[i];
148,720✔
413
    for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
297,840✔
414
      SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
149,155✔
415
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
149,155✔
416
        int32_t slotId = pFuncParam->pCol->slotId;
149,110✔
417
        int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
149,110✔
418
        if (slotId >= numOfCols) {
149,087✔
419
          code = taosArrayEnsureCap(pBlock->pDataBlock, slotId + 1);
138,418✔
420
          QUERY_CHECK_CODE(code, lino, _end);
138,395!
421

422
          for (int32_t k = numOfCols; k < slotId + 1; ++k) {
281,286✔
423
            void* tmp = taosArrayPush(pBlock->pDataBlock, &colInfo);
142,880✔
424
            QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
142,891!
425
          }
426
        }
427
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
45✔
428
        // do nothing
429
      }
430
    }
431
  }
432

433
  code = blockDataEnsureCapacity(pBlock, pBlock->info.rows);
137,527✔
434
  QUERY_CHECK_CODE(code, lino, _end);
137,589!
435

436
  for (int32_t i = 0; i < blockDataGetNumOfCols(pBlock); ++i) {
280,447✔
437
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
142,811✔
438
    QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
142,854!
439
    colDataSetNULL(pColInfoData, 0);
440
  }
441
  *ppBlock = pBlock;
137,534✔
442

443
_end:
137,534✔
444
  if (code != TSDB_CODE_SUCCESS) {
137,534!
445
    blockDataDestroy(pBlock);
×
446
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
447
  }
448
  return code;
137,536✔
449
}
450

451
void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock) {
8,738,005✔
452
  if (!blockAllocated) {
8,738,005✔
453
    return;
8,600,692✔
454
  }
455

456
  blockDataDestroy(*ppBlock);
137,313✔
457
  *ppBlock = NULL;
137,583✔
458
}
459

460
int32_t setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
8,787,521✔
461
  int32_t           code = TSDB_CODE_SUCCESS;
8,787,521✔
462
  SAggOperatorInfo* pAggInfo = pOperator->info;
8,787,521✔
463
  if (pAggInfo->groupId != UINT64_MAX && pAggInfo->groupId == groupId) {
8,787,521✔
464
    return code;
6,395,144✔
465
  }
466

467
  code = doSetTableGroupOutputBuf(pOperator, numOfOutput, groupId);
2,392,377✔
468

469
  // record the current active group id
470
  pAggInfo->groupId = groupId;
2,392,488✔
471
  return code;
2,392,488✔
472
}
473

474
int32_t doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
2,392,710✔
475
  // for simple group by query without interval, all the tables belong to one group result.
476
  int32_t           code = TSDB_CODE_SUCCESS;
2,392,710✔
477
  int32_t           lino = 0;
2,392,710✔
478
  SExecTaskInfo*    pTaskInfo = pOperator->pTaskInfo;
2,392,710✔
479
  SAggOperatorInfo* pAggInfo = pOperator->info;
2,392,710✔
480

481
  SResultRowInfo* pResultRowInfo = &pAggInfo->binfo.resultRowInfo;
2,392,710✔
482
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
2,392,710✔
483
  int32_t*        rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset;
2,392,710✔
484

485
  SResultRow* pResultRow =
486
      doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, (char*)&groupId, sizeof(groupId), true,
2,392,710✔
487
                             groupId, pTaskInfo, false, &pAggInfo->aggSup, true);
488
  if (pResultRow == NULL || pTaskInfo->code != 0) {
2,393,548!
489
    code = pTaskInfo->code;
×
490
    lino = __LINE__;
×
491
    goto _end;
×
492
  }
493
  /*
494
   * not assign result buffer yet, add new result buffer
495
   * all group belong to one result set, and each group result has different group id so set the id to be one
496
   */
497
  if (pResultRow->pageId == -1) {
2,393,548!
498
    code = addNewResultRowBuf(pResultRow, pAggInfo->aggSup.pResultBuf, pAggInfo->binfo.pRes->info.rowSize);
×
499
    QUERY_CHECK_CODE(code, lino, _end);
×
500
  }
501

502
  code = setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
2,393,548✔
503
  QUERY_CHECK_CODE(code, lino, _end);
2,392,757!
504

505
_end:
2,392,757✔
506
  if (code != TSDB_CODE_SUCCESS) {
2,392,757✔
507
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
8!
508
  }
509
  return code;
2,392,513✔
510
}
511

512
// a new buffer page for each table. Needs to opt this design
513
int32_t addNewResultRowBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, uint32_t size) {
×
514
  if (pWindowRes->pageId != -1) {
×
515
    return 0;
×
516
  }
517

518
  SFilePage* pData = NULL;
×
519

520
  // in the first scan, new space needed for results
521
  int32_t pageId = -1;
×
522
  SArray* list = getDataBufPagesIdList(pResultBuf);
×
523

524
  if (taosArrayGetSize(list) == 0) {
×
525
    pData = getNewBufPage(pResultBuf, &pageId);
×
526
    if (pData == NULL) {
×
527
      qError("failed to get buffer, code:%s", tstrerror(terrno));
×
528
      return terrno;
×
529
    }
530
    pData->num = sizeof(SFilePage);
×
531
  } else {
532
    SPageInfo* pi = getLastPageInfo(list);
×
533
    pData = getBufPage(pResultBuf, getPageId(pi));
×
534
    if (pData == NULL) {
×
535
      qError("failed to get buffer, code:%s", tstrerror(terrno));
×
536
      return terrno;
×
537
    }
538

539
    pageId = getPageId(pi);
×
540

541
    if (pData->num + size > getBufPageSize(pResultBuf)) {
×
542
      // release current page first, and prepare the next one
543
      releaseBufPageInfo(pResultBuf, pi);
×
544

545
      pData = getNewBufPage(pResultBuf, &pageId);
×
546
      if (pData == NULL) {
×
547
        qError("failed to get buffer, code:%s", tstrerror(terrno));
×
548
        return terrno;
×
549
      }
550
      pData->num = sizeof(SFilePage);
×
551
    }
552
  }
553

554
  if (pData == NULL) {
×
555
    return -1;
×
556
  }
557

558
  // set the number of rows in current disk page
559
  if (pWindowRes->pageId == -1) {  // not allocated yet, allocate new buffer
×
560
    pWindowRes->pageId = pageId;
×
561
    pWindowRes->offset = (int32_t)pData->num;
×
562

563
    pData->num += size;
×
564
  }
565

566
  return 0;
×
567
}
568

569
int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
6,501,112✔
570
                         const char* pKey) {
571
  int32_t code = 0;
6,501,112✔
572
  //  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
573

574
  pAggSup->currentPageId = -1;
6,501,112✔
575
  pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
6,501,112✔
576
  pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t));
6,509,549!
577
  pAggSup->pResultRowHashTable = tSimpleHashInit(100, taosFastHash);
6,508,137✔
578

579
  if (pAggSup->keyBuf == NULL || pAggSup->pResultRowHashTable == NULL) {
6,503,217!
580
    return terrno;
×
581
  }
582

583
  uint32_t defaultPgsz = 0;
6,504,830✔
584
  int64_t defaultBufsz = 0;
6,504,830✔
585
  code = getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz);
6,504,830✔
586
  if (code) {
6,509,888!
587
    qError("failed to get buff page size, rowSize:%d", pAggSup->resultRowSize);
×
588
    return code;
×
589
  }
590

591
  if (!osTempSpaceAvailable()) {
6,509,888!
592
    code = TSDB_CODE_NO_DISKSPACE;
×
593
    qError("Init stream agg supporter failed since %s, key:%s, tempDir:%s", tstrerror(code), pKey, tsTempDir);
×
594
    return code;
×
595
  }
596

597
  code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, tsTempDir);
6,507,551✔
598
  if (code != TSDB_CODE_SUCCESS) {
6,513,366!
599
    qError("Create agg result buf failed since %s, %s", tstrerror(code), pKey);
×
600
    return code;
×
601
  }
602

603
  return code;
6,513,907✔
604
}
605

606
void cleanupResultInfoInStream(SExecTaskInfo* pTaskInfo, void* pState, SExprSupp* pSup, SGroupResInfo* pGroupResInfo) {
2,036✔
607
  int32_t         code = TSDB_CODE_SUCCESS;
2,036✔
608
  SStorageAPI*    pAPI = &pTaskInfo->storageAPI;
2,036✔
609
  int32_t         numOfExprs = pSup->numOfExprs;
2,036✔
610
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
2,036✔
611
  SqlFunctionCtx* pCtx = pSup->pCtx;
2,036✔
612
  int32_t         numOfRows = getNumOfTotalRes(pGroupResInfo);
2,036✔
613
  bool            needCleanup = false;
2,036✔
614

615
  for (int32_t j = 0; j < numOfExprs; ++j) {
29,662✔
616
    needCleanup |= pCtx[j].needCleanup;
27,626✔
617
  }
618
  if (!needCleanup) {
2,036!
619
    return;
2,036✔
620
  }
621
  
UNCOV
622
  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
×
623
    SResultWindowInfo* pWinInfo = taosArrayGet(pGroupResInfo->pRows, i);
×
624
    SRowBuffPos*       pPos = pWinInfo->pStatePos;
×
625
    SResultRow*        pRow = NULL;
×
626

627
    code = pAPI->stateStore.streamStateGetByPos(pState, pPos, (void**)&pRow);
×
628
    if (TSDB_CODE_SUCCESS != code) {
×
629
      qError("failed to get state by pos, code:%s, %s", tstrerror(code), GET_TASKID(pTaskInfo));
×
630
      continue;
×
631
    }
632

633
    for (int32_t j = 0; j < numOfExprs; ++j) {
×
634
      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
×
635
      if (pCtx[j].fpSet.cleanup) {
×
636
        pCtx[j].fpSet.cleanup(&pCtx[j]);
×
637
      }
638
    }
639
  }
640
}
641

642
void cleanupResultInfoInGroupResInfo(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf* pBuf,
3,471,463✔
643
                                  SGroupResInfo* pGroupResInfo) {
644
  int32_t         numOfExprs = pSup->numOfExprs;
3,471,463✔
645
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
3,471,463✔
646
  SqlFunctionCtx* pCtx = pSup->pCtx;
3,471,463✔
647
  int32_t         numOfRows = getNumOfTotalRes(pGroupResInfo);
3,471,463✔
648
  bool            needCleanup = false;
3,471,637✔
649

650
  for (int32_t j = 0; j < numOfExprs; ++j) {
9,814,803✔
651
    needCleanup |= pCtx[j].needCleanup;
6,343,166✔
652
  }
653
  if (!needCleanup) {
3,471,637✔
654
    return;
3,451,540✔
655
  }
656

657
  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
20,097!
658
    SResultRow*        pRow = NULL;
×
659
    SResKeyPos*        pPos = taosArrayGetP(pGroupResInfo->pRows, i);
×
660
    SFilePage*         page = getBufPage(pBuf, pPos->pos.pageId);
×
661
    if (page == NULL) {
×
662
      qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
×
663
      continue;
×
664
    }
665
    pRow = (SResultRow*)((char*)page + pPos->pos.offset);
×
666

667

668
    for (int32_t j = 0; j < numOfExprs; ++j) {
×
669
      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
×
670
      if (pCtx[j].fpSet.cleanup) {
×
671
        pCtx[j].fpSet.cleanup(&pCtx[j]);
×
672
      }
673
    }
674
    releaseBufPage(pBuf, page);
×
675
  }
676
}
677

678
void cleanupResultInfoInHashMap(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf* pBuf,
904,697✔
679
                       SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap) {
680
  int32_t         numOfExprs = pSup->numOfExprs;
904,697✔
681
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
904,697✔
682
  SqlFunctionCtx* pCtx = pSup->pCtx;
904,697✔
683
  bool            needCleanup = false;
904,697✔
684
  for (int32_t j = 0; j < numOfExprs; ++j) {
2,759,285✔
685
    needCleanup |= pCtx[j].needCleanup;
1,854,588✔
686
  }
687
  if (!needCleanup) {
904,697✔
688
    return;
903,305✔
689
  }
690

691
  // begin from last iter
692
  void*   pData = pGroupResInfo->dataPos;
1,392✔
693
  int32_t iter = pGroupResInfo->iter;
1,392✔
694
  while ((pData = tSimpleHashIterate(pHashmap, pData, &iter)) != NULL) {
1,392!
695
    SResultRowPosition* pos = pData;
×
696

697
    SFilePage* page = getBufPage(pBuf, pos->pageId);
×
698
    if (page == NULL) {
×
699
      qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
×
700
      continue;
×
701
    }
702

703
    SResultRow* pRow = (SResultRow*)((char*)page + pos->offset);
×
704

705
    for (int32_t j = 0; j < numOfExprs; ++j) {
×
706
      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
×
707
      if (pCtx[j].fpSet.cleanup) {
×
708
        pCtx[j].fpSet.cleanup(&pCtx[j]);
×
709
      }
710
    }
711

712
    releaseBufPage(pBuf, page);
×
713
  }
714
}
715

716
void cleanupResultInfo(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SGroupResInfo* pGroupResInfo,
4,376,264✔
717
                       SAggSupporter *pAggSup, bool cleanGroupResInfo) {
718
  if (cleanGroupResInfo) {
4,376,264✔
719
    cleanupResultInfoInGroupResInfo(pTaskInfo, pSup, pAggSup->pResultBuf, pGroupResInfo);
3,471,922✔
720
  } else {
721
    cleanupResultInfoInHashMap(pTaskInfo, pSup, pAggSup->pResultBuf, pGroupResInfo, pAggSup->pResultRowHashTable);
904,342✔
722
  }
723
}
4,376,657✔
724
void cleanupAggSup(SAggSupporter* pAggSup) {
6,515,642✔
725
  taosMemoryFreeClear(pAggSup->keyBuf);
6,515,642!
726
  tSimpleHashCleanup(pAggSup->pResultRowHashTable);
6,516,181✔
727
  destroyDiskbasedBuf(pAggSup->pResultBuf);
6,516,759✔
728
}
6,516,979✔
729

730
int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
6,506,475✔
731
                   const char* pkey, void* pState, SFunctionStateStore* pStore) {
732
  int32_t code = initExprSupp(pSup, pExprInfo, numOfCols, pStore);
6,506,475✔
733
  if (code != TSDB_CODE_SUCCESS) {
6,506,489!
734
    return code;
×
735
  }
736

737
  code = doInitAggInfoSup(pAggSup, pSup->pCtx, numOfCols, keyBufSize, pkey);
6,506,489✔
738
  if (code != TSDB_CODE_SUCCESS) {
6,513,094!
739
    return code;
×
740
  }
741

742
  for (int32_t i = 0; i < numOfCols; ++i) {
25,864,498✔
743
    pSup->pCtx[i].hasWindowOrGroup = pSup->hasWindowOrGroup;
19,351,404✔
744
    if (pState) {
19,351,404✔
745
      pSup->pCtx[i].saveHandle.pBuf = NULL;
253,664✔
746
      pSup->pCtx[i].saveHandle.pState = pState;
253,664✔
747
      pSup->pCtx[i].exprIdx = i;
253,664✔
748
    } else {
749
      pSup->pCtx[i].saveHandle.pBuf = pAggSup->pResultBuf;
19,097,740✔
750
    }
751
  }
752

753
  return TSDB_CODE_SUCCESS;
6,513,094✔
754
}
755

756
int32_t applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData,
393,815,937✔
757
                                        int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput) {
758
  int32_t code = TSDB_CODE_SUCCESS;
393,815,937✔
759
  for (int32_t k = 0; k < numOfOutput; ++k) {
1,536,199,233✔
760
    // keep it temporarily
761
    SFunctionCtxStatus status = {0};
1,142,931,688✔
762
    functionCtxSave(&pCtx[k], &status);
1,142,931,688✔
763

764
    pCtx[k].input.startRowIndex = offset;
1,143,604,069✔
765
    pCtx[k].input.numOfRows = forwardStep;
1,143,604,069✔
766

767
    // not a whole block involved in query processing, statistics data can not be used
768
    // NOTE: the original value of isSet have been changed here
769
    if (pCtx[k].input.colDataSMAIsSet && forwardStep < numOfTotal) {
1,143,604,069!
770
      pCtx[k].input.colDataSMAIsSet = false;
×
771
    }
772

773
    if (pCtx[k].isPseudoFunc) {
1,143,604,069✔
774
      SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[k]);
279,565,891✔
775

776
      char* p = GET_ROWCELL_INTERBUF(pEntryInfo);
279,565,891✔
777

778
      SColumnInfoData idata = {0};
279,565,891✔
779
      idata.info.type = TSDB_DATA_TYPE_BIGINT;
279,565,891✔
780
      idata.info.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes;
279,565,891✔
781
      idata.pData = p;
279,565,891✔
782

783
      SScalarParam out = {.columnData = &idata};
279,565,891✔
784
      SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData};
279,565,891✔
785
      code = pCtx[k].sfp.process(&tw, 1, &out);
279,565,891✔
786
      if (code != TSDB_CODE_SUCCESS) {
283,345,288!
787
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
788
        taskInfo->code = code;
×
789
        return code;
×
790
      }
791
      pEntryInfo->numOfRes = 1;
283,506,624✔
792
    } else {
793
      if (functionNeedToExecute(&pCtx[k]) && pCtx[k].fpSet.process != NULL) {
864,038,178✔
794
        if ((&pCtx[k])->input.pData[0] == NULL) {
704,792,253!
795
          code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
796
          qError("%s apply functions error, input data is NULL.", GET_TASKID(taskInfo));
×
797
        } else {
798
          code = pCtx[k].fpSet.process(&pCtx[k]);
704,792,253✔
799
        }
800

801
        if (code != TSDB_CODE_SUCCESS) {
705,260,137!
802
          if (pCtx[k].fpSet.cleanup != NULL) {
×
803
            pCtx[k].fpSet.cleanup(&pCtx[k]);
×
804
          }
805
          qError("%s apply functions error, code: %s", GET_TASKID(taskInfo), tstrerror(code));
×
806
          taskInfo->code = code;
×
807
          return code;
×
808
        }
809
      }
810

811
      // restore it
812
      functionCtxRestore(&pCtx[k], &status);
872,600,842✔
813
    }
814
  }
815
  return code;
393,267,545✔
816
}
817

818
void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
1,142,560,960✔
819
  pStatus->hasAgg = pCtx->input.colDataSMAIsSet;
1,142,560,960✔
820
  pStatus->numOfRows = pCtx->input.numOfRows;
1,142,560,960✔
821
  pStatus->startOffset = pCtx->input.startRowIndex;
1,142,560,960✔
822
}
1,142,560,960✔
823

824
void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
871,197,945✔
825
  pCtx->input.colDataSMAIsSet = pStatus->hasAgg;
871,197,945✔
826
  pCtx->input.numOfRows = pStatus->numOfRows;
871,197,945✔
827
  pCtx->input.startRowIndex = pStatus->startOffset;
871,197,945✔
828
}
871,197,945✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc