• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5015

03 Apr 2026 03:59PM UTC coverage: 72.289% (+0.03%) from 72.256%
#5015

push

travis-ci

web-flow
merge: from main to 3.0 branch #35067

4055 of 5985 new or added lines in 68 files covered. (67.75%)

13044 existing lines in 149 files now uncovered.

257390 of 356056 relevant lines covered (72.29%)

130247228.09 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.42
/source/libs/executor/src/executorInt.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "filter.h"
17
#include "function.h"
18
#include "functionMgt.h"
19
#include "../../function/inc/functionResInfoInt.h"
20
#include "os.h"
21
#include "querynodes.h"
22
#include "tfill.h"
23
#include "tname.h"
24

25
#include "tdatablock.h"
26
#include "tmsg.h"
27
#include "ttime.h"
28

29
#include "executorInt.h"
30
#include "index.h"
31
#include "operator.h"
32
#include "query.h"
33
#include "querytask.h"
34
#include "storageapi.h"
35
#include "tcompare.h"
36
#include "thash.h"
37
#include "ttypes.h"
38

39
#define SET_REVERSE_SCAN_FLAG(runtime)    ((runtime)->scanFlag = REVERSE_SCAN)
40
#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) != TSDB_ORDER_DESC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)
41

42
#if 0
43
static UNUSED_FUNC void *u_malloc (size_t __size) {
44
  uint32_t v = taosRand();
45

46
  if (v % 1000 <= 0) {
47
    return NULL;
48
  } else {
49
    return taosMemoryMalloc(__size);
50
  }
51
}
52

53
static UNUSED_FUNC void* u_calloc(size_t num, size_t __size) {
54
  uint32_t v = taosRand();
55
  if (v % 1000 <= 0) {
56
    return NULL;
57
  } else {
58
    return taosMemoryCalloc(num, __size);
59
  }
60
}
61

62
static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
63
  uint32_t v = taosRand();
64
  if (v % 5 <= 1) {
65
    return NULL;
66
  } else {
67
    return taosMemoryRealloc(p, __size);
68
  }
69
}
70

71
#define calloc  u_calloc
72
#define malloc  u_malloc
73
#define realloc u_realloc
74
#endif
75

76
static int32_t setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pBlock);
77

78
static int32_t initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
79
static void    doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag);
80

81
static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
82
                                   bool createDummyCol);
83

84
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize) {
2,147,483,647✔
85
  SFilePage* pData = NULL;
2,147,483,647✔
86

87
  // in the first scan, new space needed for results
88
  int32_t pageId = -1;
2,147,483,647✔
89
  if (*currentPageId == -1) {
2,147,483,647✔
90
    pData = getNewBufPage(pResultBuf, &pageId);
260,759,203✔
91
    if (pData == NULL) {
260,771,800✔
UNCOV
92
      qError("failed to get buffer, code:%s", tstrerror(terrno));
×
UNCOV
93
      return NULL;
×
94
    }
95
    pData->num = sizeof(SFilePage);
260,771,830✔
96
  } else {
97
    pData = getBufPage(pResultBuf, *currentPageId);
2,147,483,647✔
98
    if (pData == NULL) {
2,147,483,647✔
UNCOV
99
      qError("failed to get buffer, code:%s", tstrerror(terrno));
×
100
      return NULL;
×
101
    }
102

103
    pageId = *currentPageId;
2,147,483,647✔
104

105
    if (pData->num + interBufSize > getBufPageSize(pResultBuf)) {
2,147,483,647✔
106
      // release current page first, and prepare the next one
107
      releaseBufPage(pResultBuf, pData);
818,525,562✔
108

109
      pData = getNewBufPage(pResultBuf, &pageId);
818,435,425✔
110
      if (pData == NULL) {
818,516,522✔
UNCOV
111
        qError("failed to get buffer, code:%s", tstrerror(terrno));
×
112
        return NULL;
×
113
      }
114
      pData->num = sizeof(SFilePage);
818,516,522✔
115
    }
116
  }
117

118
  setBufPageDirty(pData, true);
2,147,483,647✔
119

120
  // set the number of rows in current disk page
121
  SResultRow* pResultRow = (SResultRow*)((char*)pData + pData->num);
2,147,483,647✔
122

123
  memset((char*)pResultRow, 0, interBufSize);
2,147,483,647✔
124
  pResultRow->pageId = pageId;
2,147,483,647✔
125
  pResultRow->offset = (int32_t)pData->num;
2,147,483,647✔
126

127
  *currentPageId = pageId;
2,147,483,647✔
128
  pData->num += interBufSize;
2,147,483,647✔
129
  return pResultRow;
2,147,483,647✔
130
}
131

132
/**
133
 * the struct of key in hash table
134
 * +----------+---------------+
135
 * | group id |   key data    |
136
 * | 8 bytes  | actual length |
137
 * +----------+---------------+
138
 */
139
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData,
2,147,483,647✔
140
                                   int32_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo,
141
                                   bool isIntervalQuery, SAggSupporter* pSup, bool keepGroup) {
142
  SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId);
2,147,483,647✔
143
  if (!keepGroup) {
2,147,483,647✔
144
    *(uint64_t*)pSup->keyBuf = calcGroupId(pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
2,147,483,647✔
145
  }
146

147
  SResultRowPosition* p1 =
148
      (SResultRowPosition*)tSimpleHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
2,147,483,647✔
149

150
  SResultRow* pResult = NULL;
2,147,483,647✔
151

152
  // in case of repeat scan/reverse scan, no new time window added.
153
  if (isIntervalQuery) {
2,147,483,647✔
154
    if (p1 != NULL) {  // the *p1 may be NULL in case of sliding+offset exists.
2,147,483,647✔
155
      pResult = getResultRowByPos(pResultBuf, p1, true);
1,693,567,345✔
156
      if (pResult == NULL) {
1,693,567,345✔
UNCOV
157
        pTaskInfo->code = terrno;
×
158
        return NULL;
×
159
      }
160

161
      if (pResult->pageId != p1->pageId || pResult->offset != p1->offset) {
1,693,567,345✔
162
        terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
921✔
UNCOV
163
        pTaskInfo->code = terrno;
×
164
        return NULL;
×
165
      }
166
    }
167
  } else {
168
    // In case of group by column query, the required SResultRow object must be existInCurrentResusltRowInfo in the
169
    // pResultRowInfo object.
170
    if (p1 != NULL) {
2,147,483,647✔
171
      // todo
172
      pResult = getResultRowByPos(pResultBuf, p1, true);
2,147,483,647✔
173
      if (NULL == pResult) {
2,147,483,647✔
UNCOV
174
        pTaskInfo->code = terrno;
×
175
        return NULL;
×
176
      }
177

178
      if (pResult->pageId != p1->pageId || pResult->offset != p1->offset) {
2,147,483,647✔
179
        terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
98✔
180
        pTaskInfo->code = terrno;
×
181
        return NULL;
×
182
      }
183
    }
184
  }
185

186
  // 1. close current opened time window
187
  if (pResultRowInfo->cur.pageId != -1 && ((pResult == NULL) || (pResult->pageId != pResultRowInfo->cur.pageId))) {
2,147,483,647✔
188
    SResultRowPosition pos = pResultRowInfo->cur;
2,147,483,647✔
189
    SFilePage*         pPage = getBufPage(pResultBuf, pos.pageId);
2,147,483,647✔
190
    if (pPage == NULL) {
2,147,483,647✔
UNCOV
191
      qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
×
192
      pTaskInfo->code = terrno;
×
193
      return NULL;
×
194
    }
195
    releaseBufPage(pResultBuf, pPage);
2,147,483,647✔
196
  }
197

198
  // allocate a new buffer page
199
  if (pResult == NULL) {
2,147,483,647✔
200
    pResult = getNewResultRow(pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
2,147,483,647✔
201
    if (pResult == NULL) {
2,147,483,647✔
UNCOV
202
      pTaskInfo->code = terrno;
×
203
      return NULL;
×
204
    }
205

206
    // add a new result set for a new group
207
    SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset};
2,147,483,647✔
208
     int32_t code = tSimpleHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos,
2,147,483,647✔
209
                                  sizeof(SResultRowPosition));
210
    if (code != TSDB_CODE_SUCCESS) {
2,147,483,647✔
UNCOV
211
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
UNCOV
212
      pTaskInfo->code = code;
×
213
      return NULL;
×
214
    }
215
  }
216

217
  // 2. set the new time window to be the new active time window
218
  pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
2,147,483,647✔
219

220
  // too many time window in query
221
  if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH &&
2,147,483,647✔
222
      tSimpleHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) {
2,147,483,647✔
UNCOV
223
    pTaskInfo->code = TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW;
×
224
    return NULL;
×
225
  }
226

227
  return pResult;
2,147,483,647✔
228
}
229

230
//  query_range_start, query_range_end, window_duration, window_start, window_end
231
int32_t initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow) {
9,791,072✔
232
  pColData->info.type = TSDB_DATA_TYPE_TIMESTAMP;
9,791,072✔
233
  pColData->info.bytes = sizeof(int64_t);
9,793,095✔
234

235
  int32_t code = colInfoDataEnsureCapacity(pColData, 6, false);
9,787,500✔
236
  if (code != TSDB_CODE_SUCCESS) {
9,794,648✔
UNCOV
237
    return code;
×
238
  }
239
  colDataSetInt64(pColData, 0, &pQueryWindow->skey);
9,794,648✔
240
  colDataSetInt64(pColData, 1, &pQueryWindow->ekey);
9,788,383✔
241

242
  int64_t interval = 0;
9,789,934✔
243
  colDataSetInt64(pColData, 2, &interval);  // this value may be variable in case of 'n' and 'y'.
244
  colDataSetInt64(pColData, 3, &pQueryWindow->skey);
9,789,985✔
245
  colDataSetInt64(pColData, 4, &pQueryWindow->ekey);
9,787,006✔
246

247
  interval = -1;
9,785,984✔
248
  colDataSetInt64(pColData, 5,  &interval);
249
  return TSDB_CODE_SUCCESS;
9,787,915✔
250
}
251

252
static int32_t doSetInputDataBlockInfo(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag) {
23,875,675✔
253
  int32_t         code = TSDB_CODE_SUCCESS;
23,875,675✔
254
  int32_t         lino = 0;
23,875,675✔
255
  SqlFunctionCtx* pCtx = pExprSup->pCtx;
23,875,675✔
256
  for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
130,655,502✔
257
    pCtx[i].order = order;
106,778,108✔
258
    pCtx[i].input.numOfRows = pBlock->info.rows;
106,766,225✔
259
    code = setBlockSMAInfo(&pCtx[i], &pExprSup->pExprInfo[i], pBlock);
106,779,362✔
260
    QUERY_CHECK_CODE(code, lino, _end);
106,782,347✔
261
    pCtx[i].pSrcBlock = pBlock;
106,782,347✔
262
    pCtx[i].scanFlag = scanFlag;
106,787,972✔
263
  }
264

265
_end:
23,886,304✔
266
  if (code != TSDB_CODE_SUCCESS) {
23,886,304✔
UNCOV
267
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
268
  }
269
  return code;
23,884,441✔
270
}
271

272
int32_t setInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
2,147,483,647✔
273
                          bool createDummyCol) {
274
  if (pBlock->pBlockAgg != NULL) {
2,147,483,647✔
275
    return doSetInputDataBlockInfo(pExprSup, pBlock, order, scanFlag);
23,885,062✔
276
  } else {
277
    return doSetInputDataBlock(pExprSup, pBlock, order, scanFlag, createDummyCol);
2,147,483,647✔
278
  }
279
}
280

281
static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t paramIndex,
305,657,023✔
282
                                             int32_t numOfRows) {
283
  int32_t          code = TSDB_CODE_SUCCESS;
305,657,023✔
284
  int32_t          lino = 0;
305,657,023✔
285
  SColumnInfoData* pColInfo = NULL;
305,657,023✔
286
  if (pInput->pData[paramIndex] == NULL) {
305,657,023✔
287
    pColInfo = taosMemoryCalloc(1, sizeof(SColumnInfoData));
571,266✔
288
    QUERY_CHECK_NULL(pColInfo, code, lino, _end, terrno);
571,266✔
289

290
    // Set the correct column info (data type and bytes)
291
    pColInfo->info.type = pFuncParam->param.nType;
571,266✔
292
    pColInfo->info.bytes = pFuncParam->param.nLen;
571,266✔
293

294
    pInput->pData[paramIndex] = pColInfo;
571,266✔
295
  } else {
296
    pColInfo = pInput->pData[paramIndex];
305,102,756✔
297
  }
298

299
  code = colInfoDataEnsureCapacity(pColInfo, numOfRows, false);
305,706,459✔
300
  QUERY_CHECK_CODE(code, lino, _end);
305,652,695✔
301

302
  int8_t type = pFuncParam->param.nType;
305,652,695✔
303
  if (type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_UBIGINT) {
305,669,843✔
304
    int64_t v = pFuncParam->param.i;
386,368✔
305
    for (int32_t i = 0; i < numOfRows; ++i) {
802,707,770✔
306
      colDataSetInt64(pColInfo, i, &v);
802,314,934✔
307
    }
308
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
305,283,475✔
UNCOV
309
    double v = pFuncParam->param.d;
×
310
    for (int32_t i = 0; i < numOfRows; ++i) {
×
311
      colDataSetDouble(pColInfo, i, &v);
×
312
    }
313
  } else if (type == TSDB_DATA_TYPE_VARCHAR || type == TSDB_DATA_TYPE_GEOMETRY) {
305,283,475✔
314
    char* tmp = taosMemoryMalloc(pFuncParam->param.nLen + VARSTR_HEADER_SIZE);
2,435✔
UNCOV
315
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
×
316

UNCOV
317
    STR_WITH_SIZE_TO_VARSTR(tmp, pFuncParam->param.pz, pFuncParam->param.nLen);
×
318
    for (int32_t i = 0; i < numOfRows; ++i) {
×
319
      code = colDataSetVal(pColInfo, i, tmp, false);
×
320
      QUERY_CHECK_CODE(code, lino, _end);
×
321
    }
UNCOV
322
    taosMemoryFree(tmp);
×
323
  }
324

325
_end:
305,281,040✔
326
  if (code != TSDB_CODE_SUCCESS) {
305,673,876✔
UNCOV
327
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
328
  }
329
  return code;
305,661,495✔
330
}
331

332
static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
2,147,483,647✔
333
                                   bool createDummyCol) {
334
  int32_t         code = TSDB_CODE_SUCCESS;
2,147,483,647✔
335
  int32_t         lino = 0;
2,147,483,647✔
336
  SqlFunctionCtx* pCtx = pExprSup->pCtx;
2,147,483,647✔
337

338
  for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
2,147,483,647✔
339
    pCtx[i].order = order;
2,147,483,647✔
340
    pCtx[i].input.numOfRows = pBlock->info.rows;
2,147,483,647✔
341

342
    pCtx[i].pSrcBlock = pBlock;
2,147,483,647✔
343
    pCtx[i].scanFlag = scanFlag;
2,147,483,647✔
344

345
    SInputColumnInfoData* pInput = &pCtx[i].input;
2,147,483,647✔
346
    pInput->uid = pBlock->info.id.uid;
2,147,483,647✔
347
    pInput->colDataSMAIsSet = false;
2,147,483,647✔
348

349
    SExprInfo* pOneExpr = &pExprSup->pExprInfo[i];
2,147,483,647✔
350
    bool       hasPk = pOneExpr->pExpr->nodeType == QUERY_NODE_FUNCTION && pOneExpr->pExpr->_function.pFunctNode->hasPk;
2,147,483,647✔
351
    pCtx[i].hasPrimaryKey = hasPk;
2,147,483,647✔
352

353
    int16_t tsParamIdx = (!hasPk) ? pOneExpr->base.numOfParams - 1 : pOneExpr->base.numOfParams - 2;
2,147,483,647✔
354
    int16_t pkParamIdx = pOneExpr->base.numOfParams - 1;
2,147,483,647✔
355

356
    for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
2,147,483,647✔
357
      SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
2,147,483,647✔
358
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
2,147,483,647✔
359
        int32_t slotId = pFuncParam->pCol->slotId;
2,147,483,647✔
360
        pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
2,147,483,647✔
361
        pInput->totalRows = pBlock->info.rows;
2,147,483,647✔
362
        pInput->numOfRows = pBlock->info.rows;
2,147,483,647✔
363
        pInput->startRowIndex = 0;
2,147,483,647✔
364
        pInput->blankFill = pBlock->info.blankFill;
2,147,483,647✔
365

366
        // NOTE: the last parameter is the primary timestamp column
367
        // todo: refactor this
368

369
        if (fmIsImplicitTsFunc(pCtx[i].functionId) && (j == tsParamIdx)) {
2,147,483,647✔
370
          pInput->pPTS = pInput->pData[j];  // in case of merge function, this is not always the ts column data.
2,147,483,647✔
371
        }
372
        if (hasPk && (j == pkParamIdx)) {
2,147,483,647✔
373
          pInput->pPrimaryKey = pInput->pData[j];
29,024,020✔
374
        }
375
        QUERY_CHECK_CONDITION((pInput->pData[j] != NULL), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
2,147,483,647✔
376
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
638,746,270✔
377
        // todo avoid case: top(k, 12), 12 is the value parameter.
378
        // sum(11), 11 is also the value parameter.
379
        bool needDummyCol = createDummyCol && (pOneExpr->base.numOfParams == 1 ||
504,237,276✔
380
                                               (fmIsIndefiniteRowsFunc(pCtx[i].functionId) && j == 0));
43,119,948✔
381
        if (needDummyCol) {
461,116,783✔
382
          pInput->totalRows = pBlock->info.rows;
305,700,839✔
383
          pInput->numOfRows = pBlock->info.rows;
305,701,806✔
384
          pInput->startRowIndex = 0;
305,700,485✔
385
          pInput->blankFill = pBlock->info.blankFill;
305,702,895✔
386

387
          code = doCreateConstantValColumnInfo(pInput, pFuncParam, j, pBlock->info.rows);
305,691,407✔
388
          QUERY_CHECK_CODE(code, lino, _end);
312,015,600✔
389
        }
390
      }
391
    }
392
  }
393

394
_end:
2,147,483,647✔
395
  if (code != TSDB_CODE_SUCCESS) {
2,147,483,647✔
UNCOV
396
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
397
  }
398
  return code;
2,147,483,647✔
399
}
400

401
bool functionNeedToExecute(SqlFunctionCtx* pCtx) {
2,147,483,647✔
402
  struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
2,147,483,647✔
403

404
  // in case of timestamp column, always generated results.
405
  int32_t functionId = pCtx->functionId;
2,147,483,647✔
406
  if (functionId == -1) {
2,147,483,647✔
UNCOV
407
    return false;
×
408
  }
409

410
  if (pCtx->scanFlag == PRE_SCAN) {
2,147,483,647✔
411
    return fmIsRepeatScanFunc(pCtx->functionId);
3,585,668✔
412
  }
413

414
  if (isRowEntryCompleted(pResInfo)) {
2,147,483,647✔
UNCOV
415
    return false;
×
416
  }
417

418
  return true;
2,147,483,647✔
419
}
420

UNCOV
421
static int32_t doCreateConstantValColumnSMAInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t type,
×
422
                                                int32_t paramIndex, int32_t numOfRows) {
UNCOV
423
  if (pInput->pData[paramIndex] == NULL) {
×
424
    pInput->pData[paramIndex] = taosMemoryCalloc(1, sizeof(SColumnInfoData));
×
UNCOV
425
    if (pInput->pData[paramIndex] == NULL) {
×
426
      return terrno;
×
427
    }
428

429
    // Set the correct column info (data type and bytes)
UNCOV
430
    pInput->pData[paramIndex]->info.type = type;
×
UNCOV
431
    pInput->pData[paramIndex]->info.bytes = tDataTypes[type].bytes;
×
432
  }
433

434
  SColumnDataAgg* da = NULL;
×
UNCOV
435
  if (pInput->pColumnDataAgg[paramIndex] == NULL) {
×
UNCOV
436
    da = taosMemoryCalloc(1, sizeof(SColumnDataAgg));
×
437
    if (!da) {
×
438
      return terrno;
×
439
    }
440
    pInput->pColumnDataAgg[paramIndex] = da;
×
441
  } else {
UNCOV
442
    da = pInput->pColumnDataAgg[paramIndex];
×
443
  }
444

445
  if (type == TSDB_DATA_TYPE_BIGINT) {
×
UNCOV
446
    int64_t v = pFuncParam->param.i;
×
UNCOV
447
    *da = (SColumnDataAgg){.numOfNull = 0, .min = v, .max = v, .sum = v * numOfRows};
×
448
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
×
449
    double v = pFuncParam->param.d;
×
450
    *da = (SColumnDataAgg){.numOfNull = 0};
×
451

452
    *(double*)&da->min = v;
×
453
    *(double*)&da->max = v;
×
UNCOV
454
    *(double*)&da->sum = v * numOfRows;
×
455
  } else if (type == TSDB_DATA_TYPE_BOOL) {  // todo validate this data type
×
456
    bool v = pFuncParam->param.i;
×
457

458
    *da = (SColumnDataAgg){.numOfNull = 0};
×
459
    *(bool*)&da->min = 0;
×
UNCOV
460
    *(bool*)&da->max = v;
×
461
    *(bool*)&da->sum = v * numOfRows;
×
462
  } else if (type == TSDB_DATA_TYPE_TIMESTAMP) {
×
463
    // do nothing
464
  } else {
465
    qError("invalid constant type for sma info");
×
466
  }
467

468
  return TSDB_CODE_SUCCESS;
×
469
}
470

471
int32_t setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pBlock) {
106,737,575✔
472
  int32_t code = TSDB_CODE_SUCCESS;
106,737,575✔
473
  int32_t lino = 0;
106,737,575✔
474
  int32_t numOfRows = pBlock->info.rows;
106,737,575✔
475

476
  SInputColumnInfoData* pInput = &pCtx->input;
106,763,168✔
477
  pInput->numOfRows = numOfRows;
106,769,378✔
478
  pInput->totalRows = numOfRows;
106,784,306✔
479

480
  if (pBlock->pBlockAgg != NULL) {
106,793,609✔
481
    pInput->colDataSMAIsSet = true;
106,817,900✔
482

483
    for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
213,597,262✔
484
      SFunctParam* pFuncParam = &pExprInfo->base.pParam[j];
106,796,762✔
485

486
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
106,801,121✔
487
        int32_t slotId = pFuncParam->pCol->slotId;
106,804,238✔
488
        pInput->pColumnDataAgg[j] = &pBlock->pBlockAgg[slotId];
106,805,468✔
489
        if (pInput->pColumnDataAgg[j]->colId == -1) {
106,803,593✔
490
          pInput->colDataSMAIsSet = false;
19,506✔
491
        }
492

493
        // Here we set the column info data since the data type for each column data is required, but
494
        // the data in the corresponding SColumnInfoData will not be used.
495
        pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
106,800,500✔
UNCOV
496
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
×
UNCOV
497
        code = doCreateConstantValColumnSMAInfo(pInput, pFuncParam, pFuncParam->param.nType, j, pBlock->info.rows);
×
498
        QUERY_CHECK_CODE(code, lino, _end);
1,899✔
499
      }
500
    }
501
  } else {
UNCOV
502
    pInput->colDataSMAIsSet = false;
×
503
  }
504

505
_end:
106,656,845✔
506
  if (code != TSDB_CODE_SUCCESS) {
106,656,845✔
UNCOV
507
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
508
  }
509
  return code;
106,779,851✔
510
}
511

512
/////////////////////////////////////////////////////////////////////////////////////////////
513
STimeWindow getAlignQueryTimeWindow(const SInterval* pInterval, int64_t key) {
2,147,483,647✔
514
  STimeWindow win = {0};
2,147,483,647✔
515
  win.skey = taosTimeTruncate(key, pInterval);
2,147,483,647✔
516

517
  /*
518
   * if the realSkey > INT64_MAX - pInterval->interval, the query duration between
519
   * realSkey and realEkey must be less than one interval.Therefore, no need to adjust the query ranges.
520
   */
521
  win.ekey = taosTimeGetIntervalEnd(win.skey, pInterval);
2,147,483,647✔
522
  if (win.ekey < win.skey) {
2,147,483,647✔
UNCOV
523
    win.ekey = INT64_MAX;
×
524
  }
525

526
  return win;
2,147,483,647✔
527
}
528

529
int32_t setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput,
2,147,483,647✔
530
                            int32_t* rowEntryInfoOffset) {
531
  bool init = false;
2,147,483,647✔
532
  for (int32_t i = 0; i < numOfOutput; ++i) {
2,147,483,647✔
533
    pCtx[i].resultInfo = getResultEntryInfo(pResult, i, rowEntryInfoOffset);
2,147,483,647✔
534
    if (init) {
2,147,483,647✔
535
      continue;
2,147,483,647✔
536
    }
537

538
    struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
2,147,483,647✔
539
    
540
    if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) {
2,147,483,647✔
UNCOV
541
      continue;
×
542
    }
543

544
    if (pCtx[i].isPseudoFunc) {
2,147,483,647✔
545
      continue;
2,147,483,647✔
546
    }
547

548
    if (!pResInfo->initialized) {
2,147,483,647✔
549
      if (pCtx[i].functionId != -1) {
2,147,483,647✔
550
        int32_t code = pCtx[i].fpSet.init(&pCtx[i], pResInfo);
2,147,483,647✔
551
        if (code != TSDB_CODE_SUCCESS && fmIsUserDefinedFunc(pCtx[i].functionId)) {
2,147,483,647✔
UNCOV
552
          pResInfo->initialized = false;
×
UNCOV
553
          qError("failed to initialize udf, funcId:%d error:%s", pCtx[i].functionId, tstrerror(code));
×
UNCOV
554
          return TSDB_CODE_UDF_FUNC_EXEC_FAILURE;
×
555
        } else if (code != TSDB_CODE_SUCCESS) {
2,147,483,647✔
556
          qError("failed to initialize function context, funcId:%d error:%s", pCtx[i].functionId, tstrerror(code));
×
557
          return code;
×
558
        }
559
      } else {
560
        pResInfo->initialized = true;
2,147,483,647✔
561
      }
562
    } else {
563
      init = true;
2,147,483,647✔
564
    }
565
  }
566
  return TSDB_CODE_SUCCESS;
2,147,483,647✔
567
}
568

569
void clearResultRowInitFlag(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
635,191,757✔
570
  for (int32_t i = 0; i < numOfOutput; ++i) {
2,147,483,647✔
571
    SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
2,147,483,647✔
572
    if (pResInfo == NULL) {
2,147,483,647✔
UNCOV
573
      continue;
×
574
    }
575

576
    pResInfo->initialized = false;
2,147,483,647✔
577
    pResInfo->numOfRes = 0;
2,147,483,647✔
578
    pResInfo->isNullRes = 0;
2,147,483,647✔
579
    pResInfo->complete = false;
2,147,483,647✔
580
  }
581
}
635,191,757✔
582

583
int32_t doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo, SColumnInfoData** pRet) {
2,147,483,647✔
584
  int32_t code = TSDB_CODE_SUCCESS;
2,147,483,647✔
585
  int32_t lino = 0;
2,147,483,647✔
586
  if (pFilterInfo == NULL || pBlock->info.rows == 0) {
2,147,483,647✔
587
    return TSDB_CODE_SUCCESS;
2,147,483,647✔
588
  }
589

590
  SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
168,655,717✔
591
  SColumnInfoData*   p = NULL;
168,643,463✔
592

593
  code = filterSetDataFromSlotId(pFilterInfo, &param1);
168,635,693✔
594
  QUERY_CHECK_CODE(code, lino, _err);
168,647,929✔
595

596
  int32_t status = 0;
168,647,929✔
597
  code =
598
      filterExecute(pFilterInfo, pBlock, pRet != NULL ? pRet : &p, NULL, param1.numOfCols, &status);
168,650,552✔
599
  QUERY_CHECK_CODE(code, lino, _err);
168,643,196✔
600

601
  code = extractQualifiedTupleByFilterResult(pBlock, pRet != NULL ? *pRet : p, status);
167,637,810✔
602
  QUERY_CHECK_CODE(code, lino, _err);
167,642,202✔
603

604
  if (pColMatchInfo != NULL) {
167,642,202✔
605
    size_t size = taosArrayGetSize(pColMatchInfo->pList);
128,879,985✔
606
    for (int32_t i = 0; i < size; ++i) {
129,018,110✔
607
      SColMatchItem* pInfo = taosArrayGet(pColMatchInfo->pList, i);
128,884,607✔
608
      QUERY_CHECK_NULL(pInfo, code, lino, _err, terrno);
128,874,775✔
609
      if (pInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
128,874,775✔
610
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pInfo->dstSlotId);
128,753,285✔
611
        QUERY_CHECK_NULL(pColData, code, lino, _err, terrno);
128,745,261✔
612
        if (pColData->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
128,745,261✔
613
          code = blockDataUpdateTsWindow(pBlock, pInfo->dstSlotId);
128,746,771✔
614
          QUERY_CHECK_CODE(code, lino, _err);
128,743,311✔
615
          break;
128,743,311✔
616
        }
617
      }
618
    }
619
  }
620
  code = blockDataCheck(pBlock);
167,639,031✔
621
  QUERY_CHECK_CODE(code, lino, _err);
167,648,500✔
622
_err:
168,653,886✔
623
  if (code != TSDB_CODE_SUCCESS) {
168,646,209✔
624
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
1,005,386✔
625
  }
626
  colDataDestroy(p);
168,646,209✔
627
  taosMemoryFree(p);
168,645,758✔
628
  return code;
168,639,034✔
629
}
630

631
int32_t extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, int32_t status) {
167,826,527✔
632
  int32_t code = TSDB_CODE_SUCCESS;
167,826,527✔
633
  int8_t* pIndicator = (int8_t*)p->pData;
167,826,527✔
634
  if (status == FILTER_RESULT_ALL_QUALIFIED) {
167,839,541✔
635
    // here nothing needs to be done
636
  } else if (status == FILTER_RESULT_NONE_QUALIFIED) {
103,623,038✔
637
    code = trimDataBlock(pBlock, pBlock->info.rows, NULL);
44,993,957✔
638
    pBlock->info.rows = 0;
44,988,950✔
639
  } else if (status == FILTER_RESULT_PARTIAL_QUALIFIED) {
58,629,081✔
640
    code = trimDataBlock(pBlock, pBlock->info.rows, (bool*)pIndicator);
58,629,718✔
641
  } else {
642
    qError("unknown filter result type: %d", status);
18✔
643
  }
644
  return code;
167,834,972✔
645
}
646

647
void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowEntryOffset) {
2,147,483,647✔
648
  bool returnNotNull = false;
2,147,483,647✔
649
  for (int32_t j = 0; j < numOfExprs; ++j) {
2,147,483,647✔
650
    SResultRowEntryInfo* pResInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
2,147,483,647✔
651
    if (!isRowEntryInitialized(pResInfo)) {
2,147,483,647✔
652
      continue;
2,147,483,647✔
653
    } else {
654
    }
655

656
    if (pRow->numOfRows < pResInfo->numOfRes) {
2,147,483,647✔
657
      pRow->numOfRows = pResInfo->numOfRes;
2,147,483,647✔
658
    }
659

660
    if (pCtx[j].isNotNullFunc) {
2,147,483,647✔
661
      returnNotNull = true;
2,147,483,647✔
662
    }
663
  }
664
  // if all expr skips all blocks, e.g. all null inputs for max function, output one row in final result.
665
  //  except for first/last, which require not null output, output no rows
666
  if (pRow->numOfRows == 0 && !returnNotNull) {
2,147,483,647✔
667
    pRow->numOfRows = 1;
2,147,483,647✔
668
  }
669
}
2,147,483,647✔
670

671
int32_t copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx,
2,147,483,647✔
672
                                 SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo) {
673
  int32_t code = TSDB_CODE_SUCCESS;
2,147,483,647✔
674
  int32_t lino = 0;
2,147,483,647✔
675
  int32_t groupKeyIdx = 0;
2,147,483,647✔
676
  for (int32_t j = 0; j < numOfExprs; ++j) {
2,147,483,647✔
677
    int32_t slotId = pExprInfo[j].base.resSchema.slotId;
2,147,483,647✔
678

679
    pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
2,147,483,647✔
680
    if (pCtx[j].fpSet.finalize) {
2,147,483,647✔
681
      if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_group_key") == 0 ||
2,147,483,647✔
682
          strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_group_const_value") == 0) {
2,147,483,647✔
683
        // for groupkey along with functions that output multiple lines(e.g. Histogram)
684
        if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_group_key") == 0 &&
2,147,483,647✔
685
            pCtx[j].resultInfo->numOfRes == 0 && pTaskInfo->pStreamRuntimeInfo != NULL) {
2,147,483,647✔
NEW
686
          SArray* pVals = pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamPartColVals;
×
NEW
687
          if (pVals != NULL && groupKeyIdx < taosArrayGetSize(pVals)) {
×
NEW
688
            SStreamGroupValue* pValue = taosArrayGet(pVals, groupKeyIdx);
×
NEW
689
            if (pValue != NULL) {
×
NEW
690
              SGroupKeyInfo* pInfo = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
×
NEW
691
              pInfo->hasResult = true;
×
NEW
692
              pInfo->isNull = pValue->isNull;
×
NEW
693
              if (!pValue->isNull) {
×
NEW
694
                if (IS_VAR_DATA_TYPE(pValue->data.type) || pValue->data.type == TSDB_DATA_TYPE_DECIMAL) {
×
NEW
695
                  if (pValue->data.pData != NULL && pValue->data.nData > 0) {
×
NEW
696
                    memcpy(pInfo->data, pValue->data.pData, pValue->data.nData);
×
697
                  }
698
                } else {
NEW
699
                  memcpy(pInfo->data, &pValue->data.val, pExprInfo[j].base.resSchema.bytes);
×
700
                }
701
              }
NEW
702
              pCtx[j].resultInfo->numOfRes = 1;
×
703
            }
704
          }
705
        }
706

707
        if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_group_key") == 0) {
2,147,483,647✔
708
          ++groupKeyIdx;
2,147,483,647✔
709
        }
710

711
        // need to match groupkey result for each output row of that function.
712
        if (pCtx[j].resultInfo->numOfRes != 0) {
2,147,483,647✔
713
          pCtx[j].resultInfo->numOfRes = pRow->numOfRows;
2,147,483,647✔
714
        }
715
      }
716

717
      code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
2,147,483,647✔
718
      if (TSDB_CODE_SUCCESS != code) {
2,147,483,647✔
719
        qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
×
720
        QUERY_CHECK_CODE(code, lino, _end);
3,308,603✔
721
      }
722
    } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
2,147,483,647✔
723
      // do nothing
724
    } else {
725
      // expand the result into multiple rows. E.g., _wstart, top(k, 20)
726
      // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
727
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
2,147,483,647✔
728
      QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
2,147,483,647✔
729
      char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
2,147,483,647✔
730
      for (int32_t k = 0; k < pRow->numOfRows; ++k) {        
2,147,483,647✔
731
        code = colDataSetValOrCover(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
2,147,483,647✔
732
        QUERY_CHECK_CODE(code, lino, _end);
2,147,483,647✔
733
      }
734
    }
735
  }
736

737
_end:
2,147,483,647✔
738
  if (code != TSDB_CODE_SUCCESS) {
2,147,483,647✔
UNCOV
739
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
740
  }
741
  return code;
2,147,483,647✔
742
}
743

744
// todo refactor. SResultRow has direct pointer in miainfo
745
void finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup,
947,669,014✔
746
                        SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
747
  SFilePage* page = getBufPage(pBuf, resultRowPosition->pageId);
947,669,014✔
748
  if (page == NULL) {
947,684,956✔
UNCOV
749
    qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
×
750
    T_LONG_JMP(pTaskInfo->env, terrno);
×
751
  }
752

753
  SResultRow* pRow = (SResultRow*)((char*)page + resultRowPosition->offset);
947,684,956✔
754

755
  SqlFunctionCtx* pCtx = pSup->pCtx;
947,684,956✔
756
  SExprInfo*      pExprInfo = pSup->pExprInfo;
947,684,062✔
757
  const int32_t*  rowEntryOffset = pSup->rowEntryInfoOffset;
947,684,538✔
758

759
  doUpdateNumOfRows(pCtx, pRow, pSup->numOfExprs, rowEntryOffset);
947,684,956✔
760
  if (pRow->numOfRows == 0) {
947,681,316✔
UNCOV
761
    releaseBufPage(pBuf, page);
×
UNCOV
762
    return;
×
763
  }
764

765
  int32_t size = pBlock->info.capacity;
947,682,988✔
766
  while (pBlock->info.rows + pRow->numOfRows > size) {
947,906,311✔
767
    size = size * 1.25;
223,085✔
768
  }
769

770
  int32_t code = blockDataEnsureCapacity(pBlock, size);
947,683,226✔
771
  if (TAOS_FAILED(code)) {
947,679,882✔
UNCOV
772
    releaseBufPage(pBuf, page);
×
UNCOV
773
    qError("%s ensure result data capacity failed, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
×
UNCOV
774
    T_LONG_JMP(pTaskInfo->env, code);
×
775
  }
776

777
  code = copyResultrowToDataBlock(pExprInfo, pSup->numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
947,679,882✔
778
  if (TAOS_FAILED(code)) {
947,674,628✔
UNCOV
779
    releaseBufPage(pBuf, page);
×
780
    qError("%s copy result row to datablock failed, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
×
781
    T_LONG_JMP(pTaskInfo->env, code);
×
782
  }
783

784
  releaseBufPage(pBuf, page);
947,674,628✔
785
  pBlock->info.rows += pRow->numOfRows;
947,665,792✔
786
}
787

788
void doCopyToSDataBlockByHash(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
2,146,301,140✔
789
                              SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t threshold, bool ignoreGroup) {
790
  int32_t         code = TSDB_CODE_SUCCESS;
2,146,301,140✔
791
  int32_t         lino = 0;
2,146,301,140✔
792
  SExprInfo*      pExprInfo = pSup->pExprInfo;
2,146,301,140✔
793
  int32_t         numOfExprs = pSup->numOfExprs;
2,146,332,741✔
794
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
2,146,336,387✔
795
  SqlFunctionCtx* pCtx = pSup->pCtx;
2,146,337,253✔
796

797
  size_t  keyLen = 0;
2,146,339,285✔
798
  int32_t numOfRows = tSimpleHashGetSize(pHashmap);
2,146,332,679✔
799

800
  // begin from last iter
801
  void*   pData = pGroupResInfo->dataPos;
2,146,336,365✔
802
  int32_t iter = pGroupResInfo->iter;
2,146,337,683✔
803
  while ((pData = tSimpleHashIterate(pHashmap, pData, &iter)) != NULL) {
2,147,483,647✔
804
    void*               key = tSimpleHashGetKey(pData, &keyLen);
2,147,483,647✔
805
    SResultRowPosition* pos = pData;
2,147,483,647✔
806
    uint64_t            groupId = calcGroupId((char*)key + sizeof(uint64_t), keyLen - sizeof(uint64_t));
2,147,483,647✔
807

808
    SFilePage* page = getBufPage(pBuf, pos->pageId);
2,147,483,647✔
809
    if (page == NULL) {
2,147,483,647✔
810
      qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
×
811
      T_LONG_JMP(pTaskInfo->env, terrno);
×
812
    }
813

814
    SResultRow* pRow = (SResultRow*)((char*)page + pos->offset);
2,147,483,647✔
815

816
    doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
2,147,483,647✔
817

818
    // no results, continue to check the next one
819
    if (pRow->numOfRows == 0) {
2,147,483,647✔
UNCOV
820
      pGroupResInfo->index += 1;
×
UNCOV
821
      pGroupResInfo->iter = iter;
×
UNCOV
822
      pGroupResInfo->dataPos = pData;
×
823

UNCOV
824
      releaseBufPage(pBuf, page);
×
UNCOV
825
      continue;
×
826
    }
827

828
    if (!ignoreGroup) {
2,147,483,647✔
829
      if (pBlock->info.id.groupId == 0) {
2,147,483,647✔
830
        pBlock->info.id.groupId = groupId;
2,128,742,455✔
831
      } else {
832
        // current value belongs to different group, it can't be packed into one datablock
833
        if (pBlock->info.id.groupId != groupId) {
2,122,736,265✔
834
          releaseBufPage(pBuf, page);
2,122,736,923✔
835
          break;
2,122,735,323✔
836
        }
837
      }
838
    }
839

840
    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
2,147,483,647✔
841
      uint32_t newSize = pBlock->info.rows + pRow->numOfRows + ((numOfRows - iter) > 1 ? 1 : 0);
×
842
      code = blockDataEnsureCapacity(pBlock, newSize);
×
UNCOV
843
      QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
844
      qDebug("datablock capacity not sufficient, expand to required:%d, current capacity:%d, %s", newSize,
×
845
             pBlock->info.capacity, GET_TASKID(pTaskInfo));
846
      // todo set the pOperator->resultInfo size
847
    }
848

849
    pGroupResInfo->index += 1;
2,147,483,647✔
850
    pGroupResInfo->iter = iter;
2,147,483,647✔
851
    pGroupResInfo->dataPos = pData;
2,147,483,647✔
852

853
    code = copyResultrowToDataBlock(pExprInfo, numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
2,147,483,647✔
854
    releaseBufPage(pBuf, page);
2,147,483,647✔
855
    QUERY_CHECK_CODE(code, lino, _end);
2,147,483,647✔
856
    pBlock->info.rows += pRow->numOfRows;
2,147,483,647✔
857
    if (pBlock->info.rows >= threshold) {
2,147,483,647✔
858
      break;
16,480✔
859
    }
860
  }
861

862
  qDebug("%s result generated, rows:%" PRId64 ", groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
2,146,291,309✔
863
         pBlock->info.id.groupId);
864
  pBlock->info.dataLoad = 1;
2,146,335,482✔
865
  code = blockDataUpdateTsWindow(pBlock, 0);
2,146,339,910✔
866
  QUERY_CHECK_CODE(code, lino, _end);
2,146,339,886✔
867

868
_end:
2,146,339,886✔
869
  if (code != TSDB_CODE_SUCCESS) {
2,146,340,417✔
UNCOV
870
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
871
    T_LONG_JMP(pTaskInfo->env, code);
×
872
  }
873
}
2,146,340,417✔
874

875
void doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
117,741,284✔
876
                        SGroupResInfo* pGroupResInfo, int32_t threshold, bool ignoreGroup, STrueForInfo *pTrueForInfo) {
877
  int32_t         code = TSDB_CODE_SUCCESS;
117,741,284✔
878
  int32_t         lino = 0;
117,741,284✔
879
  SExprInfo*      pExprInfo = pSup->pExprInfo;
117,741,284✔
880
  int32_t         numOfExprs = pSup->numOfExprs;
117,741,035✔
881
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
117,742,259✔
882
  SqlFunctionCtx* pCtx = pSup->pCtx;
117,741,449✔
883

884
  int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
117,740,905✔
885

886
  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
2,147,483,647✔
887
    SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
2,147,483,647✔
888
    SFilePage*  page = getBufPage(pBuf, pPos->pos.pageId);
2,147,483,647✔
889
    if (page == NULL) {
2,147,483,647✔
UNCOV
890
      qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
×
UNCOV
891
      T_LONG_JMP(pTaskInfo->env, terrno);
×
892
    }
893

894
    SResultRow* pRow = (SResultRow*)((char*)page + pPos->pos.offset);
2,147,483,647✔
895

896
    doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
2,147,483,647✔
897

898
    // no results, continue to check the next one
899
    if (pRow->numOfRows == 0) {
2,147,483,647✔
900
      pGroupResInfo->index += 1;
121,188✔
901
      releaseBufPage(pBuf, page);
121,188✔
902
      continue;
121,188✔
903
    }
904
    // skip the window which is less than the windowMinSize
905
    if (!isTrueForSatisfied(pTrueForInfo, pRow->win.skey, pRow->win.ekey, pRow->nOrigRows)) {
2,147,483,647✔
906
      qDebug("skip small window, groupId: %" PRId64 ", skey: %" PRId64 ", ekey: %" PRId64 ", nrows: %u", pPos->groupId,
45,318✔
907
             pRow->win.skey, pRow->win.ekey, pRow->nOrigRows);
908
      pGroupResInfo->index += 1;
45,318✔
909
      releaseBufPage(pBuf, page);
45,318✔
910
      continue;
45,318✔
911
    }
912

913
    if (!ignoreGroup) {
2,147,483,647✔
914
      if (pBlock->info.id.groupId == 0) {
2,147,483,647✔
915
        pBlock->info.id.groupId = pPos->groupId;
2,147,483,647✔
916
      } else {
917
        // current value belongs to different group, it can't be packed into one datablock
918
        if (pBlock->info.id.groupId != pPos->groupId) {
1,963,527,333✔
919
          releaseBufPage(pBuf, page);
19,512,027✔
920
          break;
19,512,027✔
921
        }
922
      }
923
    }
924

925
    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
2,147,483,647✔
UNCOV
926
      uint32_t newSize = pBlock->info.rows + pRow->numOfRows + ((numOfRows - i) > 1 ? 1 : 0);
×
UNCOV
927
      code = blockDataEnsureCapacity(pBlock, newSize);
×
UNCOV
928
      QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
929
      qDebug("datablock capacity not sufficient, expand to required:%d, current capacity:%d, %s", newSize,
×
930
             pBlock->info.capacity, GET_TASKID(pTaskInfo));
931
      // todo set the pOperator->resultInfo size
932
    }
933

934
    pGroupResInfo->index += 1;
2,147,483,647✔
935
    code = copyResultrowToDataBlock(pExprInfo, numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
2,147,483,647✔
936
    releaseBufPage(pBuf, page);
2,147,483,647✔
937
    QUERY_CHECK_CODE(code, lino, _end);
2,147,483,647✔
938

939
    pBlock->info.rows += pRow->numOfRows;
2,147,483,647✔
940
    if (pBlock->info.rows >= threshold) {
2,147,483,647✔
941
      break;
16,390,712✔
942
    }
943
  }
944

945
  qDebug("%s result generated, rows:%" PRId64 ", groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
117,602,421✔
946
         pBlock->info.id.groupId);
947
  pBlock->info.dataLoad = 1;
117,607,798✔
948
  code = blockDataUpdateTsWindow(pBlock, 0);
117,741,542✔
949
  QUERY_CHECK_CODE(code, lino, _end);
117,740,378✔
950

951
_end:
117,740,378✔
952
  if (code != TSDB_CODE_SUCCESS) {
117,740,378✔
UNCOV
953
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
954
    T_LONG_JMP(pTaskInfo->env, code);
×
955
  }
956
}
117,740,378✔
957

958
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
138,300,970✔
959
                            SDiskbasedBuf* pBuf) {
960
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
138,300,970✔
961
  SSDataBlock*   pBlock = pbInfo->pRes;
138,304,320✔
962

963
  // set output datablock version
964
  pBlock->info.version = pTaskInfo->version;
138,303,534✔
965

966
  blockDataCleanup(pBlock);
138,299,729✔
967
  if (!hasRemainResults(pGroupResInfo)) {
138,304,212✔
968
    return;
21,001,143✔
969
  }
970

971
  // clear the existed group id
972
  pBlock->info.id.groupId = 0;
117,301,303✔
973
  if (!pbInfo->mergeResultBlock) {
117,301,303✔
974
    doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold,
45,863,354✔
975
                       false, getTrueForInfo(pOperator));
976
  } else {
977
    while (hasRemainResults(pGroupResInfo)) {
132,903,943✔
978
      doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold,
71,438,254✔
979
                         true, getTrueForInfo(pOperator));
980
      if (pBlock->info.rows >= pOperator->resultInfo.threshold) {
71,438,406✔
981
        break;
9,972,042✔
982
      }
983

984
      // clearing group id to continue to merge data that belong to different groups
985
      pBlock->info.id.groupId = 0;
61,466,364✔
986
    }
987

988
    // clear the group id info in SSDataBlock, since the client does not need it
989
    pBlock->info.id.groupId = 0;
71,437,729✔
990
  }
991
}
992

993
void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) {
413,081,130✔
994
  for (int32_t i = 0; i < numOfExprs; ++i) {
1,605,125,851✔
995
    SExprInfo* pExprInfo = &pExpr[i];
1,192,041,877✔
996
    for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
2,147,483,647✔
997
      if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_COLUMN) {
1,350,324,212✔
998
        taosMemoryFreeClear(pExprInfo->base.pParam[j].pCol);
1,115,860,588✔
999
      } else if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_VALUE) {
234,452,506✔
1000
        taosVariantDestroy(&pExprInfo->base.pParam[j].param);
135,171,351✔
1001
      }
1002
    }
1003

1004
    taosMemoryFree(pExprInfo->base.pParam);
1,192,145,183✔
1005
    taosMemoryFree(pExprInfo->pExpr);
1,192,120,786✔
1006
  }
1007
}
413,083,974✔
1008

1009
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, int64_t* defaultBufsz) {
297,129,995✔
1010
  *defaultPgsz = 4096;
297,129,995✔
1011
  uint32_t last = *defaultPgsz;
297,157,526✔
1012
  while (*defaultPgsz < rowSize * 4) {
358,157,290✔
1013
    *defaultPgsz <<= 1u;
61,104,247✔
1014
    if (*defaultPgsz < last) {
61,097,770✔
UNCOV
1015
      return TSDB_CODE_INVALID_PARA;
×
1016
    }
1017
    last = *defaultPgsz;
61,088,871✔
1018
  }
1019

1020
  // The default buffer for each operator in query is 10MB.
1021
  // at least four pages need to be in buffer
1022
  // TODO: make this variable to be configurable.
1023
  *defaultBufsz = 4096 * 2560;
297,107,787✔
1024
  if ((*defaultBufsz) <= (*defaultPgsz)) {
297,146,052✔
UNCOV
1025
    (*defaultBufsz) = (*defaultPgsz) * 4;
×
UNCOV
1026
    if (*defaultBufsz < ((int64_t)(*defaultPgsz)) * 4) {
×
UNCOV
1027
      return TSDB_CODE_INVALID_PARA;
×
1028
    }
1029
  }
1030

1031
  return 0;
297,128,166✔
1032
}
1033

1034
void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows) {
634,062,482✔
1035
  if (numOfRows == 0) {
634,062,482✔
UNCOV
1036
    numOfRows = 4096;
×
1037
  }
1038

1039
  pResultInfo->capacity = numOfRows;
634,062,482✔
1040
  pResultInfo->threshold = numOfRows * 0.75;
634,172,825✔
1041

1042
  if (pResultInfo->threshold == 0) {
634,084,577✔
1043
    pResultInfo->threshold = numOfRows;
2,127,420✔
1044
  }
1045
  pResultInfo->totalRows = 0;
634,084,749✔
1046
}
634,092,043✔
1047

1048
void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock) {
279,848,104✔
1049
  pInfo->pRes = pBlock;
279,848,104✔
1050
  initResultRowInfo(&pInfo->resultRowInfo);
279,885,579✔
1051
}
279,788,625✔
1052

1053
void destroySqlFunctionCtx(SqlFunctionCtx* pCtx, SExprInfo* pExpr, int32_t numOfOutput) {
1,200,799,769✔
1054
  if (pCtx == NULL) {
1,200,799,769✔
1055
    return;
760,446,526✔
1056
  }
1057

1058
  for (int32_t i = 0; i < numOfOutput; ++i) {
1,620,304,486✔
1059
    if (pExpr != NULL) {
1,179,983,363✔
1060
      SExprInfo* pExprInfo = &pExpr[i];
1,179,947,495✔
1061
      for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
2,147,483,647✔
1062
        if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_VALUE) {
1,337,582,718✔
1063
          colDataDestroy(pCtx[i].input.pData[j]);
133,189,934✔
1064
          taosMemoryFree(pCtx[i].input.pData[j]);
133,192,964✔
1065
          taosMemoryFree(pCtx[i].input.pColumnDataAgg[j]);
133,195,768✔
1066
        }
1067
      }
1068
    }
1069
    for (int32_t j = 0; j < pCtx[i].numOfParams; ++j) {
2,147,483,647✔
1070
      taosVariantDestroy(&pCtx[i].param[j].param);
1,337,605,098✔
1071
    }
1072

1073
    if(pCtx[i].fpSet.cleanup) {
1,180,022,985✔
1074
      pCtx[i].fpSet.cleanup(&pCtx[i]);
883,437✔
1075
    }
1076

1077
    taosMemoryFreeClear(pCtx[i].subsidiaries.pCtx);
1,180,045,837✔
1078
    taosMemoryFreeClear(pCtx[i].subsidiaries.buf);
1,180,071,190✔
1079
    taosMemoryFree(pCtx[i].input.pData);
1,180,055,928✔
1080
    taosMemoryFree(pCtx[i].input.pColumnDataAgg);
1,180,008,372✔
1081

1082
    if (pCtx[i].udfName != NULL) {
1,179,968,363✔
1083
      taosMemoryFree(pCtx[i].udfName);
36,805✔
1084
    }
1085
  }
1086

1087
  taosMemoryFreeClear(pCtx);
440,321,123✔
1088
  return;
440,306,042✔
1089
}
1090

1091
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr, SFunctionStateStore* pStore) {
437,267,392✔
1092
  pSup->pExprInfo = pExprInfo;
437,267,392✔
1093
  pSup->numOfExprs = numOfExpr;
437,300,294✔
1094
  if (pSup->pExprInfo != NULL) {
437,294,890✔
1095
    pSup->pCtx = createSqlFunctionCtx(pExprInfo, numOfExpr, &pSup->rowEntryInfoOffset, pStore);
339,654,856✔
1096
    if (pSup->pCtx == NULL) {
339,561,995✔
UNCOV
1097
      return terrno;
×
1098
    }
1099
  }
1100

1101
  return TSDB_CODE_SUCCESS;
437,237,544✔
1102
}
1103

1104
void checkIndefRowsFuncs(SExprSupp* pSup) {
2,142✔
1105
  for (int32_t i = 0; i < pSup->numOfExprs; ++i) {
8,568✔
1106
    if (fmIsIndefiniteRowsFunc(pSup->pCtx[i].functionId)) {
6,426✔
UNCOV
1107
      pSup->hasIndefRowsFunc = true;
×
UNCOV
1108
      break;
×
1109
    }
1110
  }
1111
}
2,142✔
1112

1113
void cleanupExprSupp(SExprSupp* pSupp) {
1,088,416,712✔
1114
  destroySqlFunctionCtx(pSupp->pCtx, pSupp->pExprInfo, pSupp->numOfExprs);
1,088,416,712✔
1115
  if (pSupp->pExprInfo != NULL) {
1,088,387,406✔
1116
    destroyExprInfo(pSupp->pExprInfo, pSupp->numOfExprs);
379,322,013✔
1117
    taosMemoryFreeClear(pSupp->pExprInfo);
379,364,174✔
1118
  }
1119

1120
  if (pSupp->pFilterInfo != NULL) {
1,088,422,582✔
1121
    filterFreeInfo(pSupp->pFilterInfo);
99,573,180✔
1122
    pSupp->pFilterInfo = NULL;
99,564,925✔
1123
  }
1124

1125
  taosMemoryFree(pSupp->rowEntryInfoOffset);
1,088,422,216✔
1126
  memset(pSupp, 0, sizeof(SExprSupp));
1,088,405,449✔
1127
}
1,088,405,449✔
1128

1129
void cleanupExprSuppWithoutFilter(SExprSupp* pSupp) {
112,118,409✔
1130
  destroySqlFunctionCtx(pSupp->pCtx, pSupp->pExprInfo, pSupp->numOfExprs);
112,118,409✔
1131
  if (pSupp->pExprInfo != NULL) {
112,112,781✔
1132
    destroyExprInfo(pSupp->pExprInfo, pSupp->numOfExprs);
31,483,408✔
1133
    taosMemoryFreeClear(pSupp->pExprInfo);
31,484,265✔
1134
  }
1135

1136
  taosMemoryFreeClear(pSupp->rowEntryInfoOffset);
112,113,932✔
1137
  pSupp->numOfExprs = 0;
112,116,189✔
1138
  pSupp->hasWindowOrGroup = false;
112,126,605✔
1139
  pSupp->pCtx = NULL;
112,122,629✔
1140
}
112,135,255✔
1141

1142
void cleanupBasicInfo(SOptrBasicInfo* pInfo) {
283,176,789✔
1143
  blockDataDestroy(pInfo->pRes);
283,176,789✔
1144
  pInfo->pRes = NULL;
283,175,942✔
1145
}
283,164,741✔
1146

1147
bool groupbyTbname(SNodeList* pGroupList) {
249,182,281✔
1148
  bool   bytbname = false;
249,182,281✔
1149
  SNode* pNode = NULL;
249,182,281✔
1150
  FOREACH(pNode, pGroupList) {
256,111,588✔
1151
    if (pNode->type == QUERY_NODE_FUNCTION) {
38,246,445✔
1152
      bytbname = (strcmp(((struct SFunctionNode*)pNode)->functionName, "tbname") == 0);
31,315,470✔
1153
      break;
31,316,500✔
1154
    }
1155
  }
1156
  return bytbname;
249,157,169✔
1157
}
1158

1159
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, SExecTaskInfo* pTask, SReadHandle* readHandle) {
359,880,577✔
1160
  switch (pNode->type) {
359,880,577✔
1161
    case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: {
482,022✔
1162
      SInserterParam* pInserterParam = taosMemoryCalloc(1, sizeof(SInserterParam));
482,022✔
1163
      if (NULL == pInserterParam) {
482,022✔
UNCOV
1164
        return terrno;
×
1165
      }
1166
      pInserterParam->readHandle = readHandle;
482,022✔
1167

1168
      *pParam = pInserterParam;
482,022✔
1169
      break;
482,022✔
1170
    }
1171
    case QUERY_NODE_PHYSICAL_PLAN_DELETE: {
1,895,841✔
1172
      SDeleterParam* pDeleterParam = taosMemoryCalloc(1, sizeof(SDeleterParam));
1,895,841✔
1173
      if (NULL == pDeleterParam) {
1,820,827✔
UNCOV
1174
        return terrno;
×
1175
      }
1176

1177
      SArray* pInfoList = NULL;
1,820,827✔
1178
      int32_t code = getTableListInfo(pTask, &pInfoList);
1,820,827✔
1179
      if (code != TSDB_CODE_SUCCESS || pInfoList == NULL) {
1,821,409✔
1180
        taosMemoryFree(pDeleterParam);
582✔
UNCOV
1181
        return code;
×
1182
      }
1183

1184
      STableListInfo* pTableListInfo = taosArrayGetP(pInfoList, 0);
1,820,827✔
1185
      taosArrayDestroy(pInfoList);
1,822,579✔
1186

1187
      pDeleterParam->suid = tableListGetSuid(pTableListInfo);
1,820,827✔
1188

1189
      // TODO extract uid list
1190
      int32_t numOfTables = 0;
1,821,409✔
1191
      code = tableListGetSize(pTableListInfo, &numOfTables);
1,821,994✔
1192
      if (code != TSDB_CODE_SUCCESS) {
1,820,827✔
UNCOV
1193
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
UNCOV
1194
        taosMemoryFree(pDeleterParam);
×
1195
        return code;
×
1196
      }
1197

1198
      pDeleterParam->pUidList = taosArrayInit(numOfTables, sizeof(uint64_t));
1,820,827✔
1199
      if (NULL == pDeleterParam->pUidList) {
1,820,827✔
UNCOV
1200
        taosMemoryFree(pDeleterParam);
×
UNCOV
1201
        return terrno;
×
1202
      }
1203

1204
      for (int32_t i = 0; i < numOfTables; ++i) {
3,888,914✔
1205
        STableKeyInfo* pTable = tableListGetInfo(pTableListInfo, i);
2,068,087✔
1206
        if (!pTable) {
2,068,087✔
UNCOV
1207
          taosArrayDestroy(pDeleterParam->pUidList);
×
UNCOV
1208
          taosMemoryFree(pDeleterParam);
×
UNCOV
1209
          return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1210
        }
1211
        void* tmp = taosArrayPush(pDeleterParam->pUidList, &pTable->uid);
2,068,087✔
1212
        if (!tmp) {
2,068,087✔
UNCOV
1213
          taosArrayDestroy(pDeleterParam->pUidList);
×
UNCOV
1214
          taosMemoryFree(pDeleterParam);
×
UNCOV
1215
          return terrno;
×
1216
        }
1217
      }
1218

1219
      *pParam = pDeleterParam;
1,820,827✔
1220
      break;
1,821,994✔
1221
    }
1222
    default:
357,544,868✔
1223
      break;
357,544,868✔
1224
  }
1225

1226
  return TSDB_CODE_SUCCESS;
359,907,538✔
1227
}
1228

UNCOV
1229
void streamOpReleaseState(SOperatorInfo* pOperator) {
×
UNCOV
1230
  SOperatorInfo* downstream = pOperator->pDownstream[0];
×
1231
  if (downstream->fpSet.releaseStreamStateFn) {
×
1232
    downstream->fpSet.releaseStreamStateFn(downstream);
×
1233
  }
UNCOV
1234
}
×
1235

UNCOV
1236
void streamOpReloadState(SOperatorInfo* pOperator) {
×
UNCOV
1237
  SOperatorInfo* downstream = pOperator->pDownstream[0];
×
1238
  if (downstream->fpSet.reloadStreamStateFn) {
×
1239
    downstream->fpSet.reloadStreamStateFn(downstream);
×
1240
  }
UNCOV
1241
}
×
1242

1243
void freeOperatorParamImpl(SOperatorParam* pParam, SOperatorParamType type) {
133,696,263✔
1244
  int32_t childrenNum = taosArrayGetSize(pParam->pChildren);
133,696,263✔
1245
  for (int32_t i = 0; i < childrenNum; ++i) {
135,439,982✔
1246
    SOperatorParam* pChild = taosArrayGetP(pParam->pChildren, i);
1,743,169✔
1247
    freeOperatorParam(pChild, type);
1,743,169✔
1248
  }
1249

1250
  taosArrayDestroy(pParam->pChildren);
133,696,813✔
1251
  pParam->pChildren = NULL;
133,703,819✔
1252

1253
  taosMemoryFreeClear(pParam->value);
133,705,006✔
1254

1255
  taosMemoryFree(pParam);
133,704,734✔
1256
}
133,697,900✔
1257

1258
void freeExchangeGetBasicOperatorParam(void* pParam) {
23,730,387✔
1259
  SExchangeOperatorBasicParam* pBasic = (SExchangeOperatorBasicParam*)pParam;
23,730,387✔
1260
  if (pBasic->uidList) {
23,730,387✔
1261
    taosArrayDestroy(pBasic->uidList);
19,518,295✔
1262
    pBasic->uidList = NULL;
19,518,295✔
1263
  }
1264
  if (pBasic->orgTbInfo) {
23,730,387✔
1265
    taosArrayDestroy(pBasic->orgTbInfo->colMap);
10,777,758✔
1266
    taosMemoryFreeClear(pBasic->orgTbInfo);
10,777,758✔
1267
  }
1268
  if (pBasic->batchOrgTbInfo) {
23,730,387✔
1269
    taosArrayDestroyEx(pBasic->batchOrgTbInfo, destroySOrgTbInfo);
3,168,963✔
1270
    pBasic->batchOrgTbInfo = NULL;
3,168,963✔
1271
  }
1272
  if (pBasic->tagList) {
23,730,387✔
1273
    taosArrayDestroyEx(pBasic->tagList, destroyTagVal);
840,498✔
1274
    pBasic->tagList = NULL;
840,498✔
1275
  }
1276
}
23,730,387✔
1277

1278
void freeExchangeGetOperatorParam(SOperatorParam* pParam) {
22,556,458✔
1279
  SExchangeOperatorParam* pExcParam = (SExchangeOperatorParam*)pParam->value;
22,556,458✔
1280
  if (pExcParam->multiParams) {
22,556,458✔
1281
    SExchangeOperatorBatchParam* pExcBatch = (SExchangeOperatorBatchParam*)pParam->value;
2,254,016✔
1282
    tSimpleHashSetFreeFp(pExcBatch->pBatchs, freeExchangeGetBasicOperatorParam);
2,254,016✔
1283
    tSimpleHashCleanup(pExcBatch->pBatchs);
2,254,016✔
1284
  } else {
1285
    freeExchangeGetBasicOperatorParam(&pExcParam->basic);
20,302,442✔
1286
  }
1287

1288
  freeOperatorParamImpl(pParam, OP_GET_PARAM);
22,556,458✔
1289
}
22,556,458✔
1290

UNCOV
1291
void freeExchangeNotifyOperatorParam(SOperatorParam* pParam) { freeOperatorParamImpl(pParam, OP_NOTIFY_PARAM); }
×
1292

1293
void freeGroupCacheGetOperatorParam(SOperatorParam* pParam) { freeOperatorParamImpl(pParam, OP_GET_PARAM); }
7,804,066✔
1294

UNCOV
1295
void freeGroupCacheNotifyOperatorParam(SOperatorParam* pParam) { freeOperatorParamImpl(pParam, OP_NOTIFY_PARAM); }
×
1296

1297
void freeMergeJoinGetOperatorParam(SOperatorParam* pParam) { freeOperatorParamImpl(pParam, OP_GET_PARAM); }
3,902,033✔
1298

UNCOV
1299
void freeMergeJoinNotifyOperatorParam(SOperatorParam* pParam) { freeOperatorParamImpl(pParam, OP_NOTIFY_PARAM); }
×
1300

1301
void freeTagScanGetOperatorParam(SOperatorParam* pParam) { freeOperatorParamImpl(pParam, OP_GET_PARAM); }
6,032,170✔
1302

1303
void freeMergeGetOperatorParam(SOperatorParam* pParam) { freeOperatorParamImpl(pParam, OP_GET_PARAM); }
4,766,553✔
1304

UNCOV
1305
void freeDynQueryCtrlGetOperatorParam(SOperatorParam* pParam) { freeOperatorParamImpl(pParam, OP_GET_PARAM); }
×
1306

UNCOV
1307
void freeDynQueryCtrlNotifyOperatorParam(SOperatorParam* pParam) { freeOperatorParamImpl(pParam, OP_NOTIFY_PARAM); }
×
1308

UNCOV
1309
void freeInterpFuncGetOperatorParam(SOperatorParam* pParam) {
×
UNCOV
1310
  freeOperatorParamImpl(pParam, OP_GET_PARAM);
×
UNCOV
1311
}
×
1312

UNCOV
1313
void freeInterpFuncNotifyOperatorParam(SOperatorParam* pParam) {
×
UNCOV
1314
  freeOperatorParamImpl(pParam, OP_NOTIFY_PARAM);
×
UNCOV
1315
}
×
1316

1317
void freeTableScanGetOperatorParam(SOperatorParam* pParam) {
83,093,545✔
1318
  STableScanOperatorParam* pTableScanParam =
83,093,545✔
1319
    (STableScanOperatorParam*)pParam->value;
1320
  taosArrayDestroy(pTableScanParam->pUidList);
83,096,658✔
1321
  if (pTableScanParam->pOrgTbInfo) {
83,098,904✔
1322
    taosArrayDestroy(pTableScanParam->pOrgTbInfo->colMap);
72,464,598✔
1323
    taosMemoryFreeClear(pTableScanParam->pOrgTbInfo);
72,464,598✔
1324
  }
1325
  if (pTableScanParam->pBatchTbInfo) {
83,102,496✔
1326
    for (int32_t i = 0;
2,602,219✔
1327
      i < taosArrayGetSize(pTableScanParam->pBatchTbInfo); ++i) {
7,520,649✔
1328
      SOrgTbInfo* pOrgTbInfo =
1329
        (SOrgTbInfo*)taosArrayGet(pTableScanParam->pBatchTbInfo, i);
4,918,430✔
1330
      taosArrayDestroy(pOrgTbInfo->colMap);
4,918,430✔
1331
    }
1332
    taosArrayDestroy(pTableScanParam->pBatchTbInfo);
2,602,219✔
1333
    pTableScanParam->pBatchTbInfo = NULL;
2,602,219✔
1334
  }
1335
  if (pTableScanParam->pTagList) {
83,106,011✔
1336
    for (int32_t i = 0;
840,498✔
1337
      i < taosArrayGetSize(pTableScanParam->pTagList); ++i) {
5,209,368✔
1338
      STagVal* pTagVal =
1339
        (STagVal*)taosArrayGet(pTableScanParam->pTagList, i);
4,368,870✔
1340
      if (IS_VAR_DATA_TYPE(pTagVal->type)) {
4,368,870✔
1341
        taosMemoryFreeClear(pTagVal->pData);
1,896,894✔
1342
      }
1343
    }
1344
    taosArrayDestroy(pTableScanParam->pTagList);
840,498✔
1345
    pTableScanParam->pTagList = NULL;
840,498✔
1346
  }
1347
  freeOperatorParamImpl(pParam, OP_GET_PARAM);
83,104,875✔
1348
}
83,091,893✔
1349

UNCOV
1350
void freeTableScanNotifyOperatorParam(SOperatorParam* pParam) { freeOperatorParamImpl(pParam, OP_NOTIFY_PARAM); }
×
1351

UNCOV
1352
void freeTagScanNotifyOperatorParam(SOperatorParam* pParam) { freeOperatorParamImpl(pParam, OP_NOTIFY_PARAM); }
×
1353

UNCOV
1354
void freeMergeNotifyOperatorParam(SOperatorParam* pParam) { freeOperatorParamImpl(pParam, OP_NOTIFY_PARAM); }
×
1355

1356
void freeOpParamItem(void* pItem) {
15,335,637✔
1357
  SOperatorParam* pParam = *(SOperatorParam**)pItem;
15,335,637✔
1358
  pParam->reUse = false;
15,335,637✔
1359
  freeOperatorParam(pParam, OP_GET_PARAM);
15,335,637✔
1360
}
15,335,637✔
1361

1362
static void destroyRefColIdGroupParam(void* info) {
8,341✔
1363
  SRefColIdGroup* pGroup = (SRefColIdGroup*)info;
8,341✔
1364
  if (pGroup && pGroup->pSlotIdList) {
8,341✔
1365
    taosArrayDestroy(pGroup->pSlotIdList);
8,341✔
1366
    pGroup->pSlotIdList = NULL;
8,341✔
1367
  }
1368
}
8,341✔
1369

1370
void freeExternalWindowGetOperatorParam(SOperatorParam* pParam) {
983,991✔
1371
  SExternalWindowOperatorParam *pExtParam = (SExternalWindowOperatorParam*)pParam->value;
983,991✔
1372
  taosArrayDestroy(pExtParam->ExtWins);
983,991✔
1373
  for (int32_t i = 0; i < taosArrayGetSize(pParam->pChildren); i++) {
983,991✔
UNCOV
1374
    SOperatorParam* pChild = *(SOperatorParam**)taosArrayGet(pParam->pChildren, i);
×
UNCOV
1375
    pChild->reUse = false;
×
1376
  }
1377
  freeOperatorParamImpl(pParam, OP_GET_PARAM);
983,991✔
1378
}
983,991✔
1379

1380
void freeVirtualTableScanGetOperatorParam(SOperatorParam* pParam) {
4,557,879✔
1381
  SVTableScanOperatorParam* pVTableScanParam = (SVTableScanOperatorParam*)pParam->value;
4,557,879✔
1382
  taosArrayDestroyEx(pVTableScanParam->pOpParamArray, freeOpParamItem);
4,557,879✔
1383
  if (pVTableScanParam->pRefColGroups) {
4,557,879✔
1384
    taosArrayDestroyEx(pVTableScanParam->pRefColGroups, destroyRefColIdGroupParam);
5,136✔
1385
    pVTableScanParam->pRefColGroups = NULL;
5,136✔
1386
  }
1387
  freeOpParamItem(&pVTableScanParam->pTagScanOp);
4,557,879✔
1388
  freeOperatorParamImpl(pParam, OP_GET_PARAM);
4,557,879✔
1389
}
4,557,879✔
1390

UNCOV
1391
void freeVTableScanNotifyOperatorParam(SOperatorParam* pParam) { freeOperatorParamImpl(pParam, OP_NOTIFY_PARAM); }
×
1392

UNCOV
1393
void freeExternalWindowNotifyOperatorParam(SOperatorParam* pParam) { freeOperatorParamImpl(pParam, OP_NOTIFY_PARAM); }
×
1394

1395
void freeOperatorParam(SOperatorParam* pParam, SOperatorParamType type) {
790,845,798✔
1396
  if (NULL == pParam || pParam->reUse) {
790,845,798✔
1397
    return;
657,166,316✔
1398
  }
1399

1400
  switch (pParam->opType) {
133,680,736✔
1401
    case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE:
22,556,458✔
1402
      type == OP_GET_PARAM ? freeExchangeGetOperatorParam(pParam) : freeExchangeNotifyOperatorParam(pParam);
22,556,458✔
1403
      break;
22,556,458✔
1404
    case QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE:
7,804,066✔
1405
      type == OP_GET_PARAM ? freeGroupCacheGetOperatorParam(pParam) : freeGroupCacheNotifyOperatorParam(pParam);
7,804,066✔
1406
      break;
7,804,066✔
1407
    case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN:
3,902,033✔
1408
      type == OP_GET_PARAM ? freeMergeJoinGetOperatorParam(pParam) : freeMergeJoinNotifyOperatorParam(pParam);
3,902,033✔
1409
      break;
3,902,033✔
1410
    case QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN:
83,101,801✔
1411
    case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:
1412
      type == OP_GET_PARAM ? freeTableScanGetOperatorParam(pParam) : freeTableScanNotifyOperatorParam(pParam);
83,101,801✔
1413
      break;
83,094,971✔
1414
    case QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN:
4,557,879✔
1415
      type == OP_GET_PARAM ? freeVirtualTableScanGetOperatorParam(pParam) : freeVTableScanNotifyOperatorParam(pParam);
4,557,879✔
1416
      break;
4,557,879✔
1417
    case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
6,032,170✔
1418
      type == OP_GET_PARAM ? freeTagScanGetOperatorParam(pParam) : freeTagScanNotifyOperatorParam(pParam);
6,032,170✔
1419
      break;
6,032,170✔
1420
    case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG:
4,765,932✔
1421
    case QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL:
1422
    case QUERY_NODE_PHYSICAL_PLAN_MERGE:
1423
    case QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL:
1424
    case QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL:
1425
      type == OP_GET_PARAM ? freeMergeGetOperatorParam(pParam) : freeMergeNotifyOperatorParam(pParam);
4,765,932✔
1426
      break;
4,767,186✔
1427
    case QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW:
983,991✔
1428
      type == OP_GET_PARAM ? freeExternalWindowGetOperatorParam(pParam) : freeExternalWindowNotifyOperatorParam(pParam);
983,991✔
1429
      break;
983,991✔
UNCOV
1430
    case QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL:
×
UNCOV
1431
      type == OP_GET_PARAM ? freeDynQueryCtrlGetOperatorParam(pParam) : freeDynQueryCtrlNotifyOperatorParam(pParam);
×
UNCOV
1432
      break;
×
UNCOV
1433
    case QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC:
×
UNCOV
1434
      type == OP_GET_PARAM ? freeInterpFuncGetOperatorParam(pParam) : freeInterpFuncNotifyOperatorParam(pParam);
×
UNCOV
1435
      break;
×
1436
    default:
26✔
1437
      qError("%s unsupported op %d param, param type %d, param:%p value:%p children:%p reuse:%d",
26✔
1438
             __func__, pParam->opType, type, pParam, pParam->value, pParam->pChildren, pParam->reUse);
UNCOV
1439
      break;
×
1440
  }
1441
}
1442

1443
void freeResetOperatorParams(struct SOperatorInfo* pOperator, SOperatorParamType type, bool allFree) {
1,694,865,559✔
1444
  SOperatorParam**  ppParam = NULL;
1,694,865,559✔
1445
  SOperatorParam*** pppDownstramParam = NULL;
1,694,865,559✔
1446
  switch (type) {
1,694,865,559✔
1447
    case OP_GET_PARAM:
899,818,679✔
1448
      ppParam = &pOperator->pOperatorGetParam;
899,818,679✔
1449
      pppDownstramParam = &pOperator->pDownstreamGetParams;
899,818,811✔
1450
      break;
899,826,365✔
1451
    case OP_NOTIFY_PARAM:
795,118,096✔
1452
      ppParam = &pOperator->pOperatorNotifyParam;
795,118,096✔
1453
      pppDownstramParam = &pOperator->pDownstreamNotifyParams;
795,115,254✔
1454
      break;
795,121,130✔
UNCOV
1455
    default:
×
UNCOV
1456
      return;
×
1457
  }
1458

1459
  if (*ppParam) {
1,694,947,495✔
1460
    qDebug("%s free self param, operator:%s type:%d paramType:%d param:%p", __func__, pOperator->name,
8,544,859✔
1461
           pOperator->operatorType, type, *ppParam);
1462
    freeOperatorParam(*ppParam, type);
8,544,859✔
1463
    *ppParam = NULL;
8,544,859✔
1464
  }
1465

1466
  if (*pppDownstramParam) {
1,694,837,456✔
1467
    for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
130,923,765✔
1468
      if ((*pppDownstramParam)[i]) {
23,167,252✔
UNCOV
1469
        qDebug("%s free downstream param, operator:%s type:%d idx:%d downstream:%s paramType:%d param:%p", __func__,
×
1470
               pOperator->name, pOperator->operatorType, i, pOperator->pDownstream[i]->name, type,
1471
               (*pppDownstramParam)[i]);
UNCOV
1472
        freeOperatorParam((*pppDownstramParam)[i], type);
×
UNCOV
1473
        (*pppDownstramParam)[i] = NULL;
×
1474
      }
1475
    }
1476
    if (allFree) {
107,755,895✔
1477
      taosMemoryFreeClear(*pppDownstramParam);
21,157,238✔
1478
    }
1479
  }
1480
}
1481

1482
FORCE_INLINE int32_t getNextBlockFromDownstreamImpl(struct SOperatorInfo* pOperator, int32_t idx, bool clearParam,
937,198,418✔
1483
                                                    SSDataBlock** pResBlock) {
1484
  QRY_PARAM_CHECK(pResBlock);
937,198,418✔
1485

1486
  int32_t code = 0;
937,269,690✔
1487
  if (pOperator->pDownstreamGetParams && pOperator->pDownstreamGetParams[idx]) {
937,269,690✔
1488
    qDebug("DynOp: op %s start to get block from downstream %s", pOperator->name, pOperator->pDownstream[idx]->name);
19,168,270✔
1489
    code = pOperator->pDownstream[idx]->fpSet.getNextExtFn(pOperator->pDownstream[idx],
38,336,540✔
1490
                                                           pOperator->pDownstreamGetParams[idx], pResBlock);
19,168,270✔
1491
    if (clearParam && (code == 0)) {
19,168,270✔
1492
      qDebug("%s clear downstream param, operator:%s type:%d idx:%d downstream:%s param:%p", __func__,
7,307,854✔
1493
             pOperator->name, pOperator->operatorType, idx, pOperator->pDownstream[idx]->name,
1494
             pOperator->pDownstreamGetParams[idx]);
1495
      freeOperatorParam(pOperator->pDownstreamGetParams[idx], OP_GET_PARAM);
7,307,854✔
1496
      pOperator->pDownstreamGetParams[idx] = NULL;
7,307,854✔
1497
    }
1498

1499
    if (code) {
19,168,270✔
1500
      qError("failed to get next data block from upstream at %s, line:%d code:%s", __func__, __LINE__, tstrerror(code));
×
1501
    }
1502
    return code;
19,168,270✔
1503
  }
1504

1505
  code = pOperator->pDownstream[idx]->fpSet.getNextFn(pOperator->pDownstream[idx], pResBlock);
918,098,378✔
1506
  if (code) {
915,322,556✔
UNCOV
1507
    qError("failed to get next data block from upstream at %s, %d code:%s", __func__, __LINE__, tstrerror(code));
×
1508
  }
1509
  return code;
915,337,872✔
1510
}
1511

1512
bool compareVal(const char* v, const SStateKeys* pKey) {
2,147,483,647✔
1513
  if (IS_VAR_DATA_TYPE(pKey->type)) {
2,147,483,647✔
1514
    if (IS_STR_DATA_BLOB(pKey->type)) {
275,152✔
UNCOV
1515
      if (blobDataLen(v) != blobDataLen(pKey->pData)) {
×
UNCOV
1516
        return false;
×
1517
      } else {
UNCOV
1518
        return memcmp(blobDataVal(v), blobDataVal(pKey->pData), blobDataLen(v)) == 0;
×
1519
      }
1520
    } else {
1521
      if (varDataLen(v) != varDataLen(pKey->pData)) {
275,152✔
UNCOV
1522
        return false;
×
1523
      } else {
1524
        return memcmp(varDataVal(v), varDataVal(pKey->pData), varDataLen(v)) == 0;
275,152✔
1525
      }
1526
    }
1527
  } else {
1528
    return memcmp(pKey->pData, v, pKey->bytes) == 0;
2,147,483,647✔
1529
  }
1530
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc