• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5014

03 Apr 2026 03:59PM UTC coverage: 72.256% (-0.06%) from 72.317%
#5014

push

travis-ci

web-flow
merge: from main to 3.0 branch #35067

4054 of 5985 new or added lines in 68 files covered. (67.74%)

13285 existing lines in 168 files now uncovered.

257272 of 356056 relevant lines covered (72.26%)

133154720.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.31
/source/libs/executor/src/executorInt.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "filter.h"
17
#include "function.h"
18
#include "functionMgt.h"
19
#include "../../function/inc/functionResInfoInt.h"
20
#include "os.h"
21
#include "querynodes.h"
22
#include "tfill.h"
23
#include "tname.h"
24

25
#include "tdatablock.h"
26
#include "tmsg.h"
27
#include "ttime.h"
28

29
#include "executorInt.h"
30
#include "index.h"
31
#include "operator.h"
32
#include "query.h"
33
#include "querytask.h"
34
#include "storageapi.h"
35
#include "tcompare.h"
36
#include "thash.h"
37
#include "ttypes.h"
38

39
#define SET_REVERSE_SCAN_FLAG(runtime)    ((runtime)->scanFlag = REVERSE_SCAN)
40
#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) != TSDB_ORDER_DESC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)
41

42
#if 0
43
static UNUSED_FUNC void *u_malloc (size_t __size) {
44
  uint32_t v = taosRand();
45

46
  if (v % 1000 <= 0) {
47
    return NULL;
48
  } else {
49
    return taosMemoryMalloc(__size);
50
  }
51
}
52

53
static UNUSED_FUNC void* u_calloc(size_t num, size_t __size) {
54
  uint32_t v = taosRand();
55
  if (v % 1000 <= 0) {
56
    return NULL;
57
  } else {
58
    return taosMemoryCalloc(num, __size);
59
  }
60
}
61

62
static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
63
  uint32_t v = taosRand();
64
  if (v % 5 <= 1) {
65
    return NULL;
66
  } else {
67
    return taosMemoryRealloc(p, __size);
68
  }
69
}
70

71
#define calloc  u_calloc
72
#define malloc  u_malloc
73
#define realloc u_realloc
74
#endif
75

76
static int32_t setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pBlock);
77

78
static int32_t initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
79
static void    doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag);
80

81
static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
82
                                   bool createDummyCol);
83

84
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize) {
2,147,483,647✔
85
  SFilePage* pData = NULL;
2,147,483,647✔
86

87
  // in the first scan, new space needed for results
88
  int32_t pageId = -1;
2,147,483,647✔
89
  if (*currentPageId == -1) {
2,147,483,647✔
90
    pData = getNewBufPage(pResultBuf, &pageId);
266,446,522✔
91
    if (pData == NULL) {
266,448,593✔
92
      qError("failed to get buffer, code:%s", tstrerror(terrno));
46✔
93
      return NULL;
×
94
    }
95
    pData->num = sizeof(SFilePage);
266,448,547✔
96
  } else {
97
    pData = getBufPage(pResultBuf, *currentPageId);
2,147,483,647✔
98
    if (pData == NULL) {
2,147,483,647✔
UNCOV
99
      qError("failed to get buffer, code:%s", tstrerror(terrno));
×
100
      return NULL;
×
101
    }
102

103
    pageId = *currentPageId;
2,147,483,647✔
104

105
    if (pData->num + interBufSize > getBufPageSize(pResultBuf)) {
2,147,483,647✔
106
      // release current page first, and prepare the next one
107
      releaseBufPage(pResultBuf, pData);
874,012,842✔
108

109
      pData = getNewBufPage(pResultBuf, &pageId);
874,018,869✔
110
      if (pData == NULL) {
874,002,395✔
UNCOV
111
        qError("failed to get buffer, code:%s", tstrerror(terrno));
×
112
        return NULL;
×
113
      }
114
      pData->num = sizeof(SFilePage);
874,002,395✔
115
    }
116
  }
117

118
  setBufPageDirty(pData, true);
2,147,483,647✔
119

120
  // set the number of rows in current disk page
121
  SResultRow* pResultRow = (SResultRow*)((char*)pData + pData->num);
2,147,483,647✔
122

123
  memset((char*)pResultRow, 0, interBufSize);
2,147,483,647✔
124
  pResultRow->pageId = pageId;
2,147,483,647✔
125
  pResultRow->offset = (int32_t)pData->num;
2,147,483,647✔
126

127
  *currentPageId = pageId;
2,147,483,647✔
128
  pData->num += interBufSize;
2,147,483,647✔
129
  return pResultRow;
2,147,483,647✔
130
}
131

132
/**
133
 * the struct of key in hash table
134
 * +----------+---------------+
135
 * | group id |   key data    |
136
 * | 8 bytes  | actual length |
137
 * +----------+---------------+
138
 */
139
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData,
2,147,483,647✔
140
                                   int32_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo,
141
                                   bool isIntervalQuery, SAggSupporter* pSup, bool keepGroup) {
142
  SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId);
2,147,483,647✔
143
  if (!keepGroup) {
2,147,483,647✔
144
    *(uint64_t*)pSup->keyBuf = calcGroupId(pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
2,147,483,647✔
145
  }
146

147
  SResultRowPosition* p1 =
148
      (SResultRowPosition*)tSimpleHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
2,147,483,647✔
149

150
  SResultRow* pResult = NULL;
2,147,483,647✔
151

152
  // in case of repeat scan/reverse scan, no new time window added.
153
  if (isIntervalQuery) {
2,147,483,647✔
154
    if (p1 != NULL) {  // the *p1 may be NULL in case of sliding+offset exists.
2,147,483,647✔
155
      pResult = getResultRowByPos(pResultBuf, p1, true);
1,681,277,528✔
156
      if (pResult == NULL) {
1,681,277,528✔
UNCOV
157
        pTaskInfo->code = terrno;
×
158
        return NULL;
×
159
      }
160

161
      if (pResult->pageId != p1->pageId || pResult->offset != p1->offset) {
1,681,277,528✔
162
        terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
85✔
UNCOV
163
        pTaskInfo->code = terrno;
×
164
        return NULL;
×
165
      }
166
    }
167
  } else {
168
    // In case of group by column query, the required SResultRow object must be existInCurrentResusltRowInfo in the
169
    // pResultRowInfo object.
170
    if (p1 != NULL) {
2,147,483,647✔
171
      // todo
172
      pResult = getResultRowByPos(pResultBuf, p1, true);
2,147,483,647✔
173
      if (NULL == pResult) {
2,147,483,647✔
UNCOV
174
        pTaskInfo->code = terrno;
×
175
        return NULL;
×
176
      }
177

178
      if (pResult->pageId != p1->pageId || pResult->offset != p1->offset) {
2,147,483,647✔
UNCOV
179
        terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
180
        pTaskInfo->code = terrno;
×
181
        return NULL;
×
182
      }
183
    }
184
  }
185

186
  // 1. close current opened time window
187
  if (pResultRowInfo->cur.pageId != -1 && ((pResult == NULL) || (pResult->pageId != pResultRowInfo->cur.pageId))) {
2,147,483,647✔
188
    SResultRowPosition pos = pResultRowInfo->cur;
2,147,483,647✔
189
    SFilePage*         pPage = getBufPage(pResultBuf, pos.pageId);
2,147,483,647✔
190
    if (pPage == NULL) {
2,147,483,647✔
UNCOV
191
      qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
×
192
      pTaskInfo->code = terrno;
×
193
      return NULL;
×
194
    }
195
    releaseBufPage(pResultBuf, pPage);
2,147,483,647✔
196
  }
197

198
  // allocate a new buffer page
199
  if (pResult == NULL) {
2,147,483,647✔
200
    pResult = getNewResultRow(pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
2,147,483,647✔
201
    if (pResult == NULL) {
2,147,483,647✔
UNCOV
202
      pTaskInfo->code = terrno;
×
203
      return NULL;
×
204
    }
205

206
    // add a new result set for a new group
207
    SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset};
2,147,483,647✔
208
     int32_t code = tSimpleHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos,
2,147,483,647✔
209
                                  sizeof(SResultRowPosition));
210
    if (code != TSDB_CODE_SUCCESS) {
2,147,483,647✔
211
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
46✔
212
      pTaskInfo->code = code;
×
213
      return NULL;
×
214
    }
215
  }
216

217
  // 2. set the new time window to be the new active time window
218
  pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
2,147,483,647✔
219

220
  // too many time window in query
221
  if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH &&
2,147,483,647✔
222
      tSimpleHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) {
2,147,483,647✔
UNCOV
223
    pTaskInfo->code = TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW;
×
224
    return NULL;
×
225
  }
226

227
  return pResult;
2,147,483,647✔
228
}
229

230
//  query_range_start, query_range_end, window_duration, window_start, window_end
231
int32_t initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow) {
10,885,715✔
232
  pColData->info.type = TSDB_DATA_TYPE_TIMESTAMP;
10,885,715✔
233
  pColData->info.bytes = sizeof(int64_t);
10,887,263✔
234

235
  int32_t code = colInfoDataEnsureCapacity(pColData, 6, false);
10,883,131✔
236
  if (code != TSDB_CODE_SUCCESS) {
10,884,996✔
UNCOV
237
    return code;
×
238
  }
239
  colDataSetInt64(pColData, 0, &pQueryWindow->skey);
10,884,996✔
240
  colDataSetInt64(pColData, 1, &pQueryWindow->ekey);
10,882,600✔
241

242
  int64_t interval = 0;
10,882,583✔
243
  colDataSetInt64(pColData, 2, &interval);  // this value may be variable in case of 'n' and 'y'.
244
  colDataSetInt64(pColData, 3, &pQueryWindow->skey);
10,882,742✔
245
  colDataSetInt64(pColData, 4, &pQueryWindow->ekey);
10,878,979✔
246

247
  interval = -1;
10,878,685✔
248
  colDataSetInt64(pColData, 5,  &interval);
249
  return TSDB_CODE_SUCCESS;
10,879,355✔
250
}
251

252
static int32_t doSetInputDataBlockInfo(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag) {
24,053,066✔
253
  int32_t         code = TSDB_CODE_SUCCESS;
24,053,066✔
254
  int32_t         lino = 0;
24,053,066✔
255
  SqlFunctionCtx* pCtx = pExprSup->pCtx;
24,053,066✔
256
  for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
130,543,206✔
257
    pCtx[i].order = order;
106,479,756✔
258
    pCtx[i].input.numOfRows = pBlock->info.rows;
106,499,966✔
259
    code = setBlockSMAInfo(&pCtx[i], &pExprSup->pExprInfo[i], pBlock);
106,511,783✔
260
    QUERY_CHECK_CODE(code, lino, _end);
106,503,235✔
261
    pCtx[i].pSrcBlock = pBlock;
106,503,235✔
262
    pCtx[i].scanFlag = scanFlag;
106,511,783✔
263
  }
264

265
_end:
24,064,883✔
266
  if (code != TSDB_CODE_SUCCESS) {
24,064,883✔
UNCOV
267
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
268
  }
269
  return code;
24,062,372✔
270
}
271

272
int32_t setInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
2,147,483,647✔
273
                          bool createDummyCol) {
274
  if (pBlock->pBlockAgg != NULL) {
2,147,483,647✔
275
    return doSetInputDataBlockInfo(pExprSup, pBlock, order, scanFlag);
24,055,714✔
276
  } else {
277
    return doSetInputDataBlock(pExprSup, pBlock, order, scanFlag, createDummyCol);
2,147,483,647✔
278
  }
279
}
280

281
static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t paramIndex,
304,692,261✔
282
                                             int32_t numOfRows) {
283
  int32_t          code = TSDB_CODE_SUCCESS;
304,692,261✔
284
  int32_t          lino = 0;
304,692,261✔
285
  SColumnInfoData* pColInfo = NULL;
304,692,261✔
286
  if (pInput->pData[paramIndex] == NULL) {
304,692,261✔
287
    pColInfo = taosMemoryCalloc(1, sizeof(SColumnInfoData));
562,012✔
288
    QUERY_CHECK_NULL(pColInfo, code, lino, _end, terrno);
562,012✔
289

290
    // Set the correct column info (data type and bytes)
291
    pColInfo->info.type = pFuncParam->param.nType;
562,012✔
292
    pColInfo->info.bytes = pFuncParam->param.nLen;
562,012✔
293

294
    pInput->pData[paramIndex] = pColInfo;
562,012✔
295
  } else {
296
    pColInfo = pInput->pData[paramIndex];
304,143,167✔
297
  }
298

299
  code = colInfoDataEnsureCapacity(pColInfo, numOfRows, false);
304,706,998✔
300
  QUERY_CHECK_CODE(code, lino, _end);
304,654,912✔
301

302
  int8_t type = pFuncParam->param.nType;
304,654,912✔
303
  if (type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_UBIGINT) {
304,652,856✔
304
    int64_t v = pFuncParam->param.i;
369,875✔
305
    for (int32_t i = 0; i < numOfRows; ++i) {
789,716,208✔
306
      colDataSetInt64(pColInfo, i, &v);
789,331,713✔
307
    }
308
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
304,282,981✔
UNCOV
309
    double v = pFuncParam->param.d;
×
310
    for (int32_t i = 0; i < numOfRows; ++i) {
×
311
      colDataSetDouble(pColInfo, i, &v);
×
312
    }
313
  } else if (type == TSDB_DATA_TYPE_VARCHAR || type == TSDB_DATA_TYPE_GEOMETRY) {
304,282,981✔
314
    char* tmp = taosMemoryMalloc(pFuncParam->param.nLen + VARSTR_HEADER_SIZE);
15,994✔
UNCOV
315
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
×
316

UNCOV
317
    STR_WITH_SIZE_TO_VARSTR(tmp, pFuncParam->param.pz, pFuncParam->param.nLen);
×
318
    for (int32_t i = 0; i < numOfRows; ++i) {
×
319
      code = colDataSetVal(pColInfo, i, tmp, false);
×
320
      QUERY_CHECK_CODE(code, lino, _end);
×
321
    }
UNCOV
322
    taosMemoryFree(tmp);
×
323
  }
324

325
_end:
304,267,134✔
326
  if (code != TSDB_CODE_SUCCESS) {
304,651,629✔
UNCOV
327
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
328
  }
329
  return code;
304,602,517✔
330
}
331

332
static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
2,147,483,647✔
333
                                   bool createDummyCol) {
334
  int32_t         code = TSDB_CODE_SUCCESS;
2,147,483,647✔
335
  int32_t         lino = 0;
2,147,483,647✔
336
  SqlFunctionCtx* pCtx = pExprSup->pCtx;
2,147,483,647✔
337

338
  for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
2,147,483,647✔
339
    pCtx[i].order = order;
2,147,483,647✔
340
    pCtx[i].input.numOfRows = pBlock->info.rows;
2,147,483,647✔
341

342
    pCtx[i].pSrcBlock = pBlock;
2,147,483,647✔
343
    pCtx[i].scanFlag = scanFlag;
2,147,483,647✔
344

345
    SInputColumnInfoData* pInput = &pCtx[i].input;
2,147,483,647✔
346
    pInput->uid = pBlock->info.id.uid;
2,147,483,647✔
347
    pInput->colDataSMAIsSet = false;
2,147,483,647✔
348

349
    SExprInfo* pOneExpr = &pExprSup->pExprInfo[i];
2,147,483,647✔
350
    bool       hasPk = pOneExpr->pExpr->nodeType == QUERY_NODE_FUNCTION && pOneExpr->pExpr->_function.pFunctNode->hasPk;
2,147,483,647✔
351
    pCtx[i].hasPrimaryKey = hasPk;
2,147,483,647✔
352

353
    int16_t tsParamIdx = (!hasPk) ? pOneExpr->base.numOfParams - 1 : pOneExpr->base.numOfParams - 2;
2,147,483,647✔
354
    int16_t pkParamIdx = pOneExpr->base.numOfParams - 1;
2,147,483,647✔
355

356
    for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
2,147,483,647✔
357
      SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
2,147,483,647✔
358
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
2,147,483,647✔
359
        int32_t slotId = pFuncParam->pCol->slotId;
2,147,483,647✔
360
        pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
2,147,483,647✔
361
        pInput->totalRows = pBlock->info.rows;
2,147,483,647✔
362
        pInput->numOfRows = pBlock->info.rows;
2,147,483,647✔
363
        pInput->startRowIndex = 0;
2,147,483,647✔
364
        pInput->blankFill = pBlock->info.blankFill;
2,147,483,647✔
365

366
        // NOTE: the last parameter is the primary timestamp column
367
        // todo: refactor this
368

369
        if (fmIsImplicitTsFunc(pCtx[i].functionId) && (j == tsParamIdx)) {
2,147,483,647✔
370
          pInput->pPTS = pInput->pData[j];  // in case of merge function, this is not always the ts column data.
2,147,483,647✔
371
        }
372
        if (hasPk && (j == pkParamIdx)) {
2,147,483,647✔
373
          pInput->pPrimaryKey = pInput->pData[j];
30,590,053✔
374
        }
375
        QUERY_CHECK_CONDITION((pInput->pData[j] != NULL), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
2,147,483,647✔
376
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
637,012,761✔
377
        // todo avoid case: top(k, 12), 12 is the value parameter.
378
        // sum(11), 11 is also the value parameter.
379
        bool needDummyCol = createDummyCol && (pOneExpr->base.numOfParams == 1 ||
505,426,392✔
380
                                               (fmIsIndefiniteRowsFunc(pCtx[i].functionId) && j == 0));
45,044,198✔
381
        if (needDummyCol) {
460,375,903✔
382
          pInput->totalRows = pBlock->info.rows;
304,694,340✔
383
          pInput->numOfRows = pBlock->info.rows;
304,697,344✔
384
          pInput->startRowIndex = 0;
304,693,030✔
385
          pInput->blankFill = pBlock->info.blankFill;
304,702,499✔
386

387
          code = doCreateConstantValColumnInfo(pInput, pFuncParam, j, pBlock->info.rows);
304,701,233✔
388
          QUERY_CHECK_CODE(code, lino, _end);
298,705,019✔
389
        }
390
      }
391
    }
392
  }
393

394
_end:
2,147,483,647✔
395
  if (code != TSDB_CODE_SUCCESS) {
2,147,483,647✔
UNCOV
396
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
397
  }
398
  return code;
2,147,483,647✔
399
}
400

401
bool functionNeedToExecute(SqlFunctionCtx* pCtx) {
2,147,483,647✔
402
  struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
2,147,483,647✔
403

404
  // in case of timestamp column, always generated results.
405
  int32_t functionId = pCtx->functionId;
2,147,483,647✔
406
  if (functionId == -1) {
2,147,483,647✔
UNCOV
407
    return false;
×
408
  }
409

410
  if (pCtx->scanFlag == PRE_SCAN) {
2,147,483,647✔
411
    return fmIsRepeatScanFunc(pCtx->functionId);
3,467,519✔
412
  }
413

414
  if (isRowEntryCompleted(pResInfo)) {
2,147,483,647✔
UNCOV
415
    return false;
×
416
  }
417

418
  return true;
2,147,483,647✔
419
}
420

UNCOV
421
static int32_t doCreateConstantValColumnSMAInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t type,
×
422
                                                int32_t paramIndex, int32_t numOfRows) {
UNCOV
423
  if (pInput->pData[paramIndex] == NULL) {
×
424
    pInput->pData[paramIndex] = taosMemoryCalloc(1, sizeof(SColumnInfoData));
×
UNCOV
425
    if (pInput->pData[paramIndex] == NULL) {
×
426
      return terrno;
×
427
    }
428

429
    // Set the correct column info (data type and bytes)
UNCOV
430
    pInput->pData[paramIndex]->info.type = type;
×
UNCOV
431
    pInput->pData[paramIndex]->info.bytes = tDataTypes[type].bytes;
×
432
  }
433

434
  SColumnDataAgg* da = NULL;
×
UNCOV
435
  if (pInput->pColumnDataAgg[paramIndex] == NULL) {
×
UNCOV
436
    da = taosMemoryCalloc(1, sizeof(SColumnDataAgg));
×
437
    if (!da) {
×
438
      return terrno;
×
439
    }
440
    pInput->pColumnDataAgg[paramIndex] = da;
×
441
  } else {
UNCOV
442
    da = pInput->pColumnDataAgg[paramIndex];
×
443
  }
444

445
  if (type == TSDB_DATA_TYPE_BIGINT) {
×
UNCOV
446
    int64_t v = pFuncParam->param.i;
×
UNCOV
447
    *da = (SColumnDataAgg){.numOfNull = 0, .min = v, .max = v, .sum = v * numOfRows};
×
448
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
×
449
    double v = pFuncParam->param.d;
×
450
    *da = (SColumnDataAgg){.numOfNull = 0};
×
451

452
    *(double*)&da->min = v;
×
453
    *(double*)&da->max = v;
×
UNCOV
454
    *(double*)&da->sum = v * numOfRows;
×
455
  } else if (type == TSDB_DATA_TYPE_BOOL) {  // todo validate this data type
×
456
    bool v = pFuncParam->param.i;
×
457

458
    *da = (SColumnDataAgg){.numOfNull = 0};
×
459
    *(bool*)&da->min = 0;
×
UNCOV
460
    *(bool*)&da->max = v;
×
461
    *(bool*)&da->sum = v * numOfRows;
×
462
  } else if (type == TSDB_DATA_TYPE_TIMESTAMP) {
×
463
    // do nothing
464
  } else {
465
    qError("invalid constant type for sma info");
×
466
  }
467

468
  return TSDB_CODE_SUCCESS;
×
469
}
470

471
int32_t setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pBlock) {
106,459,520✔
472
  int32_t code = TSDB_CODE_SUCCESS;
106,459,520✔
473
  int32_t lino = 0;
106,459,520✔
474
  int32_t numOfRows = pBlock->info.rows;
106,459,520✔
475

476
  SInputColumnInfoData* pInput = &pCtx->input;
106,509,683✔
477
  pInput->numOfRows = numOfRows;
106,514,030✔
478
  pInput->totalRows = numOfRows;
106,548,203✔
479

480
  if (pBlock->pBlockAgg != NULL) {
106,561,262✔
481
    pInput->colDataSMAIsSet = true;
106,692,520✔
482

483
    for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
213,214,257✔
484
      SFunctParam* pFuncParam = &pExprInfo->base.pParam[j];
106,639,389✔
485

486
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
106,619,042✔
487
        int32_t slotId = pFuncParam->pCol->slotId;
106,636,950✔
488
        pInput->pColumnDataAgg[j] = &pBlock->pBlockAgg[slotId];
106,624,676✔
489
        if (pInput->pColumnDataAgg[j]->colId == -1) {
106,629,032✔
490
          pInput->colDataSMAIsSet = false;
19,601✔
491
        }
492

493
        // Here we set the column info data since the data type for each column data is required, but
494
        // the data in the corresponding SColumnInfoData will not be used.
495
        pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
106,610,567✔
UNCOV
496
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
×
UNCOV
497
        code = doCreateConstantValColumnSMAInfo(pInput, pFuncParam, pFuncParam->param.nType, j, pBlock->info.rows);
×
UNCOV
498
        QUERY_CHECK_CODE(code, lino, _end);
×
499
      }
500
    }
501
  } else {
UNCOV
502
    pInput->colDataSMAIsSet = false;
×
503
  }
504

505
_end:
105,876,275✔
506
  if (code != TSDB_CODE_SUCCESS) {
105,876,275✔
UNCOV
507
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
508
  }
509
  return code;
106,529,326✔
510
}
511

512
/////////////////////////////////////////////////////////////////////////////////////////////
513
STimeWindow getAlignQueryTimeWindow(const SInterval* pInterval, int64_t key) {
2,147,483,647✔
514
  STimeWindow win = {0};
2,147,483,647✔
515
  win.skey = taosTimeTruncate(key, pInterval);
2,147,483,647✔
516

517
  /*
518
   * if the realSkey > INT64_MAX - pInterval->interval, the query duration between
519
   * realSkey and realEkey must be less than one interval.Therefore, no need to adjust the query ranges.
520
   */
521
  win.ekey = taosTimeGetIntervalEnd(win.skey, pInterval);
2,147,483,647✔
522
  if (win.ekey < win.skey) {
2,147,483,647✔
UNCOV
523
    win.ekey = INT64_MAX;
×
524
  }
525

526
  return win;
2,147,483,647✔
527
}
528

529
int32_t setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput,
2,147,483,647✔
530
                            int32_t* rowEntryInfoOffset) {
531
  bool init = false;
2,147,483,647✔
532
  for (int32_t i = 0; i < numOfOutput; ++i) {
2,147,483,647✔
533
    pCtx[i].resultInfo = getResultEntryInfo(pResult, i, rowEntryInfoOffset);
2,147,483,647✔
534
    if (init) {
2,147,483,647✔
535
      continue;
2,147,483,647✔
536
    }
537

538
    struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
2,147,483,647✔
539
    
540
    if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) {
2,147,483,647✔
UNCOV
541
      continue;
×
542
    }
543

544
    if (pCtx[i].isPseudoFunc) {
2,147,483,647✔
545
      continue;
2,147,483,647✔
546
    }
547

548
    if (!pResInfo->initialized) {
2,147,483,647✔
549
      if (pCtx[i].functionId != -1) {
2,147,483,647✔
550
        int32_t code = pCtx[i].fpSet.init(&pCtx[i], pResInfo);
2,147,483,647✔
551
        if (code != TSDB_CODE_SUCCESS && fmIsUserDefinedFunc(pCtx[i].functionId)) {
2,147,483,647✔
UNCOV
552
          pResInfo->initialized = false;
×
UNCOV
553
          qError("failed to initialize udf, funcId:%d error:%s", pCtx[i].functionId, tstrerror(code));
×
UNCOV
554
          return TSDB_CODE_UDF_FUNC_EXEC_FAILURE;
×
555
        } else if (code != TSDB_CODE_SUCCESS) {
2,147,483,647✔
556
          qError("failed to initialize function context, funcId:%d error:%s", pCtx[i].functionId, tstrerror(code));
×
557
          return code;
×
558
        }
559
      } else {
560
        pResInfo->initialized = true;
2,147,483,647✔
561
      }
562
    } else {
563
      init = true;
2,147,483,647✔
564
    }
565
  }
566
  return TSDB_CODE_SUCCESS;
2,147,483,647✔
567
}
568

569
void clearResultRowInitFlag(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
864,510,114✔
570
  for (int32_t i = 0; i < numOfOutput; ++i) {
2,147,483,647✔
571
    SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
2,147,483,647✔
572
    if (pResInfo == NULL) {
2,147,483,647✔
UNCOV
573
      continue;
×
574
    }
575

576
    pResInfo->initialized = false;
2,147,483,647✔
577
    pResInfo->numOfRes = 0;
2,147,483,647✔
578
    pResInfo->isNullRes = 0;
2,147,483,647✔
579
    pResInfo->complete = false;
2,147,483,647✔
580
  }
581
}
864,510,114✔
582

583
int32_t doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo, SColumnInfoData** pRet) {
2,147,483,647✔
584
  int32_t code = TSDB_CODE_SUCCESS;
2,147,483,647✔
585
  int32_t lino = 0;
2,147,483,647✔
586
  if (pFilterInfo == NULL || pBlock->info.rows == 0) {
2,147,483,647✔
587
    return TSDB_CODE_SUCCESS;
2,147,483,647✔
588
  }
589

590
  SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
170,448,792✔
591
  SColumnInfoData*   p = NULL;
170,435,640✔
592

593
  code = filterSetDataFromSlotId(pFilterInfo, &param1);
170,431,561✔
594
  QUERY_CHECK_CODE(code, lino, _err);
170,444,423✔
595

596
  int32_t status = 0;
170,444,423✔
597
  code =
598
      filterExecute(pFilterInfo, pBlock, pRet != NULL ? pRet : &p, NULL, param1.numOfCols, &status);
170,446,627✔
599
  QUERY_CHECK_CODE(code, lino, _err);
170,430,624✔
600

601
  code = extractQualifiedTupleByFilterResult(pBlock, pRet != NULL ? *pRet : p, status);
169,441,520✔
602
  QUERY_CHECK_CODE(code, lino, _err);
169,440,224✔
603

604
  if (pColMatchInfo != NULL) {
169,440,224✔
605
    size_t size = taosArrayGetSize(pColMatchInfo->pList);
130,377,928✔
606
    for (int32_t i = 0; i < size; ++i) {
130,522,444✔
607
      SColMatchItem* pInfo = taosArrayGet(pColMatchInfo->pList, i);
130,389,764✔
608
      QUERY_CHECK_NULL(pInfo, code, lino, _err, terrno);
130,375,230✔
609
      if (pInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
130,375,230✔
610
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pInfo->dstSlotId);
130,261,893✔
611
        QUERY_CHECK_NULL(pColData, code, lino, _err, terrno);
130,228,782✔
612
        if (pColData->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
130,228,782✔
613
          code = blockDataUpdateTsWindow(pBlock, pInfo->dstSlotId);
130,254,333✔
614
          QUERY_CHECK_CODE(code, lino, _err);
130,250,149✔
615
          break;
130,250,149✔
616
        }
617
      }
618
    }
619
  }
620
  code = blockDataCheck(pBlock);
169,445,125✔
621
  QUERY_CHECK_CODE(code, lino, _err);
169,453,948✔
622
_err:
170,443,052✔
623
  if (code != TSDB_CODE_SUCCESS) {
170,450,361✔
624
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
989,104✔
625
  }
626
  colDataDestroy(p);
170,450,361✔
627
  taosMemoryFree(p);
170,450,619✔
628
  return code;
170,440,866✔
629
}
630

631
int32_t extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, int32_t status) {
169,627,524✔
632
  int32_t code = TSDB_CODE_SUCCESS;
169,627,524✔
633
  int8_t* pIndicator = (int8_t*)p->pData;
169,627,524✔
634
  if (status == FILTER_RESULT_ALL_QUALIFIED) {
169,642,534✔
635
    // here nothing needs to be done
636
  } else if (status == FILTER_RESULT_NONE_QUALIFIED) {
104,163,882✔
637
    code = trimDataBlock(pBlock, pBlock->info.rows, NULL);
45,916,499✔
638
    pBlock->info.rows = 0;
45,917,850✔
639
  } else if (status == FILTER_RESULT_PARTIAL_QUALIFIED) {
58,247,383✔
640
    code = trimDataBlock(pBlock, pBlock->info.rows, (bool*)pIndicator);
58,248,672✔
641
  } else {
UNCOV
642
    qError("unknown filter result type: %d", status);
×
643
  }
644
  return code;
169,627,997✔
645
}
646

647
void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowEntryOffset) {
2,147,483,647✔
648
  bool returnNotNull = false;
2,147,483,647✔
649
  for (int32_t j = 0; j < numOfExprs; ++j) {
2,147,483,647✔
650
    SResultRowEntryInfo* pResInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
2,147,483,647✔
651
    if (!isRowEntryInitialized(pResInfo)) {
2,147,483,647✔
652
      continue;
2,147,483,647✔
653
    } else {
654
    }
655

656
    if (pRow->numOfRows < pResInfo->numOfRes) {
2,147,483,647✔
657
      pRow->numOfRows = pResInfo->numOfRes;
2,147,483,647✔
658
    }
659

660
    if (pCtx[j].isNotNullFunc) {
2,147,483,647✔
661
      returnNotNull = true;
2,147,483,647✔
662
    }
663
  }
664
  // if all expr skips all blocks, e.g. all null inputs for max function, output one row in final result.
665
  //  except for first/last, which require not null output, output no rows
666
  if (pRow->numOfRows == 0 && !returnNotNull) {
2,147,483,647✔
667
    pRow->numOfRows = 1;
2,147,483,647✔
668
  }
669
}
2,147,483,647✔
670

671
int32_t copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx,
2,147,483,647✔
672
                                 SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo) {
673
  int32_t code = TSDB_CODE_SUCCESS;
2,147,483,647✔
674
  int32_t lino = 0;
2,147,483,647✔
675
  int32_t groupKeyIdx = 0;
2,147,483,647✔
676
  for (int32_t j = 0; j < numOfExprs; ++j) {
2,147,483,647✔
677
    int32_t slotId = pExprInfo[j].base.resSchema.slotId;
2,147,483,647✔
678

679
    pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
2,147,483,647✔
680
    if (pCtx[j].fpSet.finalize) {
2,147,483,647✔
681
      if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_group_key") == 0 ||
2,147,483,647✔
682
          strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_group_const_value") == 0) {
2,147,483,647✔
683
        // for groupkey along with functions that output multiple lines(e.g. Histogram)
684
        if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_group_key") == 0 &&
2,147,483,647✔
685
            pCtx[j].resultInfo->numOfRes == 0 && pTaskInfo->pStreamRuntimeInfo != NULL) {
2,147,483,647✔
NEW
686
          SArray* pVals = pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamPartColVals;
×
NEW
687
          if (pVals != NULL && groupKeyIdx < taosArrayGetSize(pVals)) {
×
NEW
688
            SStreamGroupValue* pValue = taosArrayGet(pVals, groupKeyIdx);
×
NEW
689
            if (pValue != NULL) {
×
NEW
690
              SGroupKeyInfo* pInfo = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
×
NEW
691
              pInfo->hasResult = true;
×
NEW
692
              pInfo->isNull = pValue->isNull;
×
NEW
693
              if (!pValue->isNull) {
×
NEW
694
                if (IS_VAR_DATA_TYPE(pValue->data.type) || pValue->data.type == TSDB_DATA_TYPE_DECIMAL) {
×
NEW
695
                  if (pValue->data.pData != NULL && pValue->data.nData > 0) {
×
NEW
696
                    memcpy(pInfo->data, pValue->data.pData, pValue->data.nData);
×
697
                  }
698
                } else {
NEW
699
                  memcpy(pInfo->data, &pValue->data.val, pExprInfo[j].base.resSchema.bytes);
×
700
                }
701
              }
NEW
702
              pCtx[j].resultInfo->numOfRes = 1;
×
703
            }
704
          }
705
        }
706

707
        if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_group_key") == 0) {
2,147,483,647✔
708
          ++groupKeyIdx;
2,147,483,647✔
709
        }
710

711
        // need to match groupkey result for each output row of that function.
712
        if (pCtx[j].resultInfo->numOfRes != 0) {
2,147,483,647✔
713
          pCtx[j].resultInfo->numOfRes = pRow->numOfRows;
2,147,483,647✔
714
        }
715
      }
716

717
      code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
2,147,483,647✔
718
      if (TSDB_CODE_SUCCESS != code) {
2,147,483,647✔
719
        qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
×
720
        QUERY_CHECK_CODE(code, lino, _end);
2,490,742✔
721
      }
722
    } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
2,147,483,647✔
723
      // do nothing
724
    } else {
725
      // expand the result into multiple rows. E.g., _wstart, top(k, 20)
726
      // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
727
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
2,147,483,647✔
728
      QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
2,147,483,647✔
729
      char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
2,147,483,647✔
730
      for (int32_t k = 0; k < pRow->numOfRows; ++k) {        
2,147,483,647✔
731
        code = colDataSetValOrCover(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
2,147,483,647✔
732
        QUERY_CHECK_CODE(code, lino, _end);
2,147,483,647✔
733
      }
734
    }
735
  }
736

737
_end:
2,147,483,647✔
738
  if (code != TSDB_CODE_SUCCESS) {
2,147,483,647✔
UNCOV
739
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
740
  }
741
  return code;
2,147,483,647✔
742
}
743

744
// todo refactor. SResultRow has direct pointer in miainfo
745
void finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup,
941,531,392✔
746
                        SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
747
  SFilePage* page = getBufPage(pBuf, resultRowPosition->pageId);
941,531,392✔
748
  if (page == NULL) {
941,531,037✔
UNCOV
749
    qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
×
750
    T_LONG_JMP(pTaskInfo->env, terrno);
×
751
  }
752

753
  SResultRow* pRow = (SResultRow*)((char*)page + resultRowPosition->offset);
941,531,037✔
754

755
  SqlFunctionCtx* pCtx = pSup->pCtx;
941,531,507✔
756
  SExprInfo*      pExprInfo = pSup->pExprInfo;
941,531,272✔
757
  const int32_t*  rowEntryOffset = pSup->rowEntryInfoOffset;
941,531,272✔
758

759
  doUpdateNumOfRows(pCtx, pRow, pSup->numOfExprs, rowEntryOffset);
941,531,627✔
760
  if (pRow->numOfRows == 0) {
941,531,862✔
UNCOV
761
    releaseBufPage(pBuf, page);
×
UNCOV
762
    return;
×
763
  }
764

765
  int32_t size = pBlock->info.capacity;
941,531,862✔
766
  while (pBlock->info.rows + pRow->numOfRows > size) {
941,764,760✔
767
    size = size * 1.25;
232,898✔
768
  }
769

770
  int32_t code = blockDataEnsureCapacity(pBlock, size);
941,531,862✔
771
  if (TAOS_FAILED(code)) {
941,531,862✔
UNCOV
772
    releaseBufPage(pBuf, page);
×
UNCOV
773
    qError("%s ensure result data capacity failed, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
×
UNCOV
774
    T_LONG_JMP(pTaskInfo->env, code);
×
775
  }
776

777
  code = copyResultrowToDataBlock(pExprInfo, pSup->numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
941,531,862✔
778
  if (TAOS_FAILED(code)) {
941,530,744✔
UNCOV
779
    releaseBufPage(pBuf, page);
×
780
    qError("%s copy result row to datablock failed, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
×
781
    T_LONG_JMP(pTaskInfo->env, code);
×
782
  }
783

784
  releaseBufPage(pBuf, page);
941,530,744✔
785
  pBlock->info.rows += pRow->numOfRows;
941,529,156✔
786
}
787

788
void doCopyToSDataBlockByHash(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
2,147,483,647✔
789
                              SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t threshold, bool ignoreGroup) {
790
  int32_t         code = TSDB_CODE_SUCCESS;
2,147,483,647✔
791
  int32_t         lino = 0;
2,147,483,647✔
792
  SExprInfo*      pExprInfo = pSup->pExprInfo;
2,147,483,647✔
793
  int32_t         numOfExprs = pSup->numOfExprs;
2,147,483,647✔
794
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
2,147,483,647✔
795
  SqlFunctionCtx* pCtx = pSup->pCtx;
2,147,483,647✔
796

797
  size_t  keyLen = 0;
2,147,483,647✔
798
  int32_t numOfRows = tSimpleHashGetSize(pHashmap);
2,147,483,647✔
799

800
  // begin from last iter
801
  void*   pData = pGroupResInfo->dataPos;
2,147,483,647✔
802
  int32_t iter = pGroupResInfo->iter;
2,147,483,647✔
803
  while ((pData = tSimpleHashIterate(pHashmap, pData, &iter)) != NULL) {
2,147,483,647✔
804
    void*               key = tSimpleHashGetKey(pData, &keyLen);
2,147,483,647✔
805
    SResultRowPosition* pos = pData;
2,147,483,647✔
806
    uint64_t            groupId = calcGroupId((char*)key + sizeof(uint64_t), keyLen - sizeof(uint64_t));
2,147,483,647✔
807

808
    SFilePage* page = getBufPage(pBuf, pos->pageId);
2,147,483,647✔
809
    if (page == NULL) {
2,147,483,647✔
810
      qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
×
811
      T_LONG_JMP(pTaskInfo->env, terrno);
×
812
    }
813

814
    SResultRow* pRow = (SResultRow*)((char*)page + pos->offset);
2,147,483,647✔
815

816
    doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
2,147,483,647✔
817

818
    // no results, continue to check the next one
819
    if (pRow->numOfRows == 0) {
2,147,483,647✔
UNCOV
820
      pGroupResInfo->index += 1;
×
UNCOV
821
      pGroupResInfo->iter = iter;
×
UNCOV
822
      pGroupResInfo->dataPos = pData;
×
823

UNCOV
824
      releaseBufPage(pBuf, page);
×
UNCOV
825
      continue;
×
826
    }
827

828
    if (!ignoreGroup) {
2,147,483,647✔
829
      if (pBlock->info.id.groupId == 0) {
2,147,483,647✔
830
        pBlock->info.id.groupId = groupId;
2,133,459,709✔
831
      } else {
832
        // current value belongs to different group, it can't be packed into one datablock
833
        if (pBlock->info.id.groupId != groupId) {
2,127,324,314✔
834
          releaseBufPage(pBuf, page);
2,127,324,962✔
835
          break;
2,127,321,010✔
836
        }
837
      }
838
    }
839

840
    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
2,147,483,647✔
841
      uint32_t newSize = pBlock->info.rows + pRow->numOfRows + ((numOfRows - iter) > 1 ? 1 : 0);
×
842
      code = blockDataEnsureCapacity(pBlock, newSize);
×
UNCOV
843
      QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
844
      qDebug("datablock capacity not sufficient, expand to required:%d, current capacity:%d, %s", newSize,
×
845
             pBlock->info.capacity, GET_TASKID(pTaskInfo));
846
      // todo set the pOperator->resultInfo size
847
    }
848

849
    pGroupResInfo->index += 1;
2,147,483,647✔
850
    pGroupResInfo->iter = iter;
2,147,483,647✔
851
    pGroupResInfo->dataPos = pData;
2,147,483,647✔
852

853
    code = copyResultrowToDataBlock(pExprInfo, numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
2,147,483,647✔
854
    releaseBufPage(pBuf, page);
2,147,483,647✔
855
    QUERY_CHECK_CODE(code, lino, _end);
2,147,483,647✔
856
    pBlock->info.rows += pRow->numOfRows;
2,147,483,647✔
857
    if (pBlock->info.rows >= threshold) {
2,147,483,647✔
858
      break;
16,520✔
859
    }
860
  }
861

862
  qDebug("%s result generated, rows:%" PRId64 ", groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
2,147,483,647✔
863
         pBlock->info.id.groupId);
864
  pBlock->info.dataLoad = 1;
2,147,483,647✔
865
  code = blockDataUpdateTsWindow(pBlock, 0);
2,147,483,647✔
866
  QUERY_CHECK_CODE(code, lino, _end);
2,147,483,647✔
867

868
_end:
2,147,483,647✔
869
  if (code != TSDB_CODE_SUCCESS) {
2,147,483,647✔
UNCOV
870
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
871
    T_LONG_JMP(pTaskInfo->env, code);
×
872
  }
873
}
2,147,483,647✔
874

875
void doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
123,479,920✔
876
                        SGroupResInfo* pGroupResInfo, int32_t threshold, bool ignoreGroup, STrueForInfo *pTrueForInfo) {
877
  int32_t         code = TSDB_CODE_SUCCESS;
123,479,920✔
878
  int32_t         lino = 0;
123,479,920✔
879
  SExprInfo*      pExprInfo = pSup->pExprInfo;
123,479,920✔
880
  int32_t         numOfExprs = pSup->numOfExprs;
123,483,244✔
881
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
123,486,102✔
882
  SqlFunctionCtx* pCtx = pSup->pCtx;
123,480,235✔
883

884
  int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
123,478,644✔
885

886
  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
2,147,483,647✔
887
    SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
2,147,483,647✔
888
    SFilePage*  page = getBufPage(pBuf, pPos->pos.pageId);
2,147,483,647✔
889
    if (page == NULL) {
2,147,483,647✔
UNCOV
890
      qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
×
UNCOV
891
      T_LONG_JMP(pTaskInfo->env, terrno);
×
892
    }
893

894
    SResultRow* pRow = (SResultRow*)((char*)page + pPos->pos.offset);
2,147,483,647✔
895

896
    doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
2,147,483,647✔
897

898
    // no results, continue to check the next one
899
    if (pRow->numOfRows == 0) {
2,147,483,647✔
900
      pGroupResInfo->index += 1;
119,111✔
901
      releaseBufPage(pBuf, page);
119,111✔
902
      continue;
119,111✔
903
    }
904
    // skip the window which is less than the windowMinSize
905
    if (!isTrueForSatisfied(pTrueForInfo, pRow->win.skey, pRow->win.ekey, pRow->nOrigRows)) {
2,147,483,647✔
906
      qDebug("skip small window, groupId: %" PRId64 ", skey: %" PRId64 ", ekey: %" PRId64 ", nrows: %u", pPos->groupId,
45,136✔
907
             pRow->win.skey, pRow->win.ekey, pRow->nOrigRows);
908
      pGroupResInfo->index += 1;
45,136✔
909
      releaseBufPage(pBuf, page);
45,136✔
910
      continue;
45,136✔
911
    }
912

913
    if (!ignoreGroup) {
2,147,483,647✔
914
      if (pBlock->info.id.groupId == 0) {
2,147,483,647✔
915
        pBlock->info.id.groupId = pPos->groupId;
2,147,483,647✔
916
      } else {
917
        // current value belongs to different group, it can't be packed into one datablock
918
        if (pBlock->info.id.groupId != pPos->groupId) {
2,052,481,351✔
919
          releaseBufPage(pBuf, page);
19,644,922✔
920
          break;
19,645,682✔
921
        }
922
      }
923
    }
924

925
    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
2,147,483,647✔
UNCOV
926
      uint32_t newSize = pBlock->info.rows + pRow->numOfRows + ((numOfRows - i) > 1 ? 1 : 0);
×
UNCOV
927
      code = blockDataEnsureCapacity(pBlock, newSize);
×
UNCOV
928
      QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
929
      qDebug("datablock capacity not sufficient, expand to required:%d, current capacity:%d, %s", newSize,
×
930
             pBlock->info.capacity, GET_TASKID(pTaskInfo));
931
      // todo set the pOperator->resultInfo size
932
    }
933

934
    pGroupResInfo->index += 1;
2,147,483,647✔
935
    code = copyResultrowToDataBlock(pExprInfo, numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
2,147,483,647✔
936
    releaseBufPage(pBuf, page);
2,147,483,647✔
937
    QUERY_CHECK_CODE(code, lino, _end);
2,147,483,647✔
938

939
    pBlock->info.rows += pRow->numOfRows;
2,147,483,647✔
940
    if (pBlock->info.rows >= threshold) {
2,147,483,647✔
941
      break;
16,711,913✔
942
    }
943
  }
944

945
  qDebug("%s result generated, rows:%" PRId64 ", groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
123,327,163✔
946
         pBlock->info.id.groupId);
947
  pBlock->info.dataLoad = 1;
123,341,599✔
948
  code = blockDataUpdateTsWindow(pBlock, 0);
123,486,224✔
949
  QUERY_CHECK_CODE(code, lino, _end);
123,484,111✔
950

951
_end:
123,484,111✔
952
  if (code != TSDB_CODE_SUCCESS) {
123,484,111✔
UNCOV
953
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
954
    T_LONG_JMP(pTaskInfo->env, code);
×
955
  }
956
}
123,484,111✔
957

958
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
145,741,954✔
959
                            SDiskbasedBuf* pBuf) {
960
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
145,741,954✔
961
  SSDataBlock*   pBlock = pbInfo->pRes;
145,746,847✔
962

963
  // set output datablock version
964
  pBlock->info.version = pTaskInfo->version;
145,745,634✔
965

966
  blockDataCleanup(pBlock);
145,739,333✔
967
  if (!hasRemainResults(pGroupResInfo)) {
145,746,988✔
968
    return;
22,694,450✔
969
  }
970

971
  // clear the existed group id
972
  pBlock->info.id.groupId = 0;
123,043,425✔
973
  if (!pbInfo->mergeResultBlock) {
123,043,944✔
974
    doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold,
49,735,665✔
975
                       false, getTrueForInfo(pOperator));
976
  } else {
977
    while (hasRemainResults(pGroupResInfo)) {
136,667,808✔
978
      doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold,
73,304,743✔
979
                         true, getTrueForInfo(pOperator));
980
      if (pBlock->info.rows >= pOperator->resultInfo.threshold) {
73,305,192✔
981
        break;
9,940,618✔
982
      }
983

984
      // clearing group id to continue to merge data that belong to different groups
985
      pBlock->info.id.groupId = 0;
63,364,574✔
986
    }
987

988
    // clear the group id info in SSDataBlock, since the client does not need it
989
    pBlock->info.id.groupId = 0;
73,305,289✔
990
  }
991
}
992

993
void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) {
427,662,029✔
994
  for (int32_t i = 0; i < numOfExprs; ++i) {
1,706,191,769✔
995
    SExprInfo* pExprInfo = &pExpr[i];
1,278,465,670✔
996
    for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
2,147,483,647✔
997
      if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_COLUMN) {
1,457,720,138✔
998
        taosMemoryFreeClear(pExprInfo->base.pParam[j].pCol);
1,218,149,259✔
999
      } else if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_VALUE) {
239,583,012✔
1000
        taosVariantDestroy(&pExprInfo->base.pParam[j].param);
141,142,228✔
1001
      }
1002
    }
1003

1004
    taosMemoryFree(pExprInfo->base.pParam);
1,278,550,285✔
1005
    taosMemoryFree(pExprInfo->pExpr);
1,278,581,718✔
1006
  }
1007
}
427,726,099✔
1008

1009
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, int64_t* defaultBufsz) {
304,970,455✔
1010
  *defaultPgsz = 4096;
304,970,455✔
1011
  uint32_t last = *defaultPgsz;
304,989,838✔
1012
  while (*defaultPgsz < rowSize * 4) {
368,062,917✔
1013
    *defaultPgsz <<= 1u;
63,213,412✔
1014
    if (*defaultPgsz < last) {
63,186,610✔
UNCOV
1015
      return TSDB_CODE_INVALID_PARA;
×
1016
    }
1017
    last = *defaultPgsz;
63,195,619✔
1018
  }
1019

1020
  // The default buffer for each operator in query is 10MB.
1021
  // at least four pages need to be in buffer
1022
  // TODO: make this variable to be configurable.
1023
  *defaultBufsz = 4096 * 2560;
304,840,967✔
1024
  if ((*defaultBufsz) <= (*defaultPgsz)) {
304,826,409✔
UNCOV
1025
    (*defaultBufsz) = (*defaultPgsz) * 4;
×
UNCOV
1026
    if (*defaultBufsz < ((int64_t)(*defaultPgsz)) * 4) {
×
UNCOV
1027
      return TSDB_CODE_INVALID_PARA;
×
1028
    }
1029
  }
1030

1031
  return 0;
304,899,878✔
1032
}
1033

1034
void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows) {
659,672,958✔
1035
  if (numOfRows == 0) {
659,672,958✔
UNCOV
1036
    numOfRows = 4096;
×
1037
  }
1038

1039
  pResultInfo->capacity = numOfRows;
659,672,958✔
1040
  pResultInfo->threshold = numOfRows * 0.75;
659,792,864✔
1041

1042
  if (pResultInfo->threshold == 0) {
659,632,122✔
1043
    pResultInfo->threshold = numOfRows;
2,289,602✔
1044
  }
1045
  pResultInfo->totalRows = 0;
659,605,798✔
1046
}
659,535,387✔
1047

1048
void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock) {
286,996,571✔
1049
  pInfo->pRes = pBlock;
286,996,571✔
1050
  initResultRowInfo(&pInfo->resultRowInfo);
287,039,595✔
1051
}
286,899,989✔
1052

1053
void destroySqlFunctionCtx(SqlFunctionCtx* pCtx, SExprInfo* pExpr, int32_t numOfOutput) {
1,248,627,438✔
1054
  if (pCtx == NULL) {
1,248,627,438✔
1055
    return;
793,154,464✔
1056
  }
1057

1058
  for (int32_t i = 0; i < numOfOutput; ++i) {
1,720,500,165✔
1059
    if (pExpr != NULL) {
1,265,051,158✔
1060
      SExprInfo* pExprInfo = &pExpr[i];
1,265,006,076✔
1061
      for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
2,147,483,647✔
1062
        if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_VALUE) {
1,443,660,459✔
1063
          colDataDestroy(pCtx[i].input.pData[j]);
139,215,017✔
1064
          taosMemoryFree(pCtx[i].input.pData[j]);
139,218,453✔
1065
          taosMemoryFree(pCtx[i].input.pColumnDataAgg[j]);
139,207,423✔
1066
        }
1067
      }
1068
    }
1069
    for (int32_t j = 0; j < pCtx[i].numOfParams; ++j) {
2,147,483,647✔
1070
      taosVariantDestroy(&pCtx[i].param[j].param);
1,443,703,498✔
1071
    }
1072

1073
    if(pCtx[i].fpSet.cleanup) {
1,265,110,920✔
1074
      pCtx[i].fpSet.cleanup(&pCtx[i]);
1,130,653✔
1075
    }
1076

1077
    taosMemoryFreeClear(pCtx[i].subsidiaries.pCtx);
1,265,125,812✔
1078
    taosMemoryFreeClear(pCtx[i].subsidiaries.buf);
1,265,153,942✔
1079
    taosMemoryFree(pCtx[i].input.pData);
1,265,114,509✔
1080
    taosMemoryFree(pCtx[i].input.pColumnDataAgg);
1,265,068,764✔
1081

1082
    if (pCtx[i].udfName != NULL) {
1,265,069,335✔
1083
      taosMemoryFree(pCtx[i].udfName);
36,532✔
1084
    }
1085
  }
1086

1087
  taosMemoryFreeClear(pCtx);
455,449,007✔
1088
  return;
455,448,588✔
1089
}
1090

1091
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr, SFunctionStateStore* pStore) {
451,200,933✔
1092
  pSup->pExprInfo = pExprInfo;
451,200,933✔
1093
  pSup->numOfExprs = numOfExpr;
451,230,991✔
1094
  if (pSup->pExprInfo != NULL) {
451,195,926✔
1095
    pSup->pCtx = createSqlFunctionCtx(pExprInfo, numOfExpr, &pSup->rowEntryInfoOffset, pStore);
349,257,731✔
1096
    if (pSup->pCtx == NULL) {
349,149,367✔
1097
      return terrno;
46✔
1098
    }
1099
  }
1100

1101
  return TSDB_CODE_SUCCESS;
451,040,022✔
1102
}
1103

1104
void checkIndefRowsFuncs(SExprSupp* pSup) {
2,115✔
1105
  for (int32_t i = 0; i < pSup->numOfExprs; ++i) {
8,460✔
1106
    if (fmIsIndefiniteRowsFunc(pSup->pCtx[i].functionId)) {
6,345✔
UNCOV
1107
      pSup->hasIndefRowsFunc = true;
×
UNCOV
1108
      break;
×
1109
    }
1110
  }
1111
}
2,115✔
1112

1113
void cleanupExprSupp(SExprSupp* pSupp) {
1,130,474,724✔
1114
  destroySqlFunctionCtx(pSupp->pCtx, pSupp->pExprInfo, pSupp->numOfExprs);
1,130,474,724✔
1115
  if (pSupp->pExprInfo != NULL) {
1,130,436,530✔
1116
    destroyExprInfo(pSupp->pExprInfo, pSupp->numOfExprs);
392,291,575✔
1117
    taosMemoryFreeClear(pSupp->pExprInfo);
392,302,487✔
1118
  }
1119

1120
  if (pSupp->pFilterInfo != NULL) {
1,130,435,941✔
1121
    filterFreeInfo(pSupp->pFilterInfo);
99,318,342✔
1122
    pSupp->pFilterInfo = NULL;
99,313,322✔
1123
  }
1124

1125
  taosMemoryFree(pSupp->rowEntryInfoOffset);
1,130,478,549✔
1126
  memset(pSupp, 0, sizeof(SExprSupp));
1,130,496,588✔
1127
}
1,130,496,588✔
1128

1129
void cleanupExprSuppWithoutFilter(SExprSupp* pSupp) {
117,941,372✔
1130
  destroySqlFunctionCtx(pSupp->pCtx, pSupp->pExprInfo, pSupp->numOfExprs);
117,941,372✔
1131
  if (pSupp->pExprInfo != NULL) {
117,938,000✔
1132
    destroyExprInfo(pSupp->pExprInfo, pSupp->numOfExprs);
33,249,418✔
1133
    taosMemoryFreeClear(pSupp->pExprInfo);
33,249,504✔
1134
  }
1135

1136
  taosMemoryFreeClear(pSupp->rowEntryInfoOffset);
117,942,776✔
1137
  pSupp->numOfExprs = 0;
117,938,983✔
1138
  pSupp->hasWindowOrGroup = false;
117,953,321✔
1139
  pSupp->pCtx = NULL;
117,937,207✔
1140
}
117,953,241✔
1141

1142
void cleanupBasicInfo(SOptrBasicInfo* pInfo) {
290,648,641✔
1143
  blockDataDestroy(pInfo->pRes);
290,648,641✔
1144
  pInfo->pRes = NULL;
290,653,828✔
1145
}
290,635,057✔
1146

1147
bool groupbyTbname(SNodeList* pGroupList) {
253,889,388✔
1148
  bool   bytbname = false;
253,889,388✔
1149
  SNode* pNode = NULL;
253,889,388✔
1150
  FOREACH(pNode, pGroupList) {
261,503,670✔
1151
    if (pNode->type == QUERY_NODE_FUNCTION) {
40,491,386✔
1152
      bytbname = (strcmp(((struct SFunctionNode*)pNode)->functionName, "tbname") == 0);
32,877,104✔
1153
      break;
32,876,050✔
1154
    }
1155
  }
1156
  return bytbname;
253,846,442✔
1157
}
1158

1159
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, SExecTaskInfo* pTask, SReadHandle* readHandle) {
377,591,580✔
1160
  switch (pNode->type) {
377,591,580✔
1161
    case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: {
559,429✔
1162
      SInserterParam* pInserterParam = taosMemoryCalloc(1, sizeof(SInserterParam));
559,429✔
1163
      if (NULL == pInserterParam) {
559,429✔
UNCOV
1164
        return terrno;
×
1165
      }
1166
      pInserterParam->readHandle = readHandle;
559,429✔
1167

1168
      *pParam = pInserterParam;
559,429✔
1169
      break;
559,429✔
1170
    }
1171
    case QUERY_NODE_PHYSICAL_PLAN_DELETE: {
1,878,146✔
1172
      SDeleterParam* pDeleterParam = taosMemoryCalloc(1, sizeof(SDeleterParam));
1,878,146✔
1173
      if (NULL == pDeleterParam) {
1,803,830✔
UNCOV
1174
        return terrno;
×
1175
      }
1176

1177
      SArray* pInfoList = NULL;
1,803,830✔
1178
      int32_t code = getTableListInfo(pTask, &pInfoList);
1,803,830✔
1179
      if (code != TSDB_CODE_SUCCESS || pInfoList == NULL) {
1,801,509✔
UNCOV
1180
        taosMemoryFree(pDeleterParam);
×
UNCOV
1181
        return code;
×
1182
      }
1183

1184
      STableListInfo* pTableListInfo = taosArrayGetP(pInfoList, 0);
1,802,664✔
1185
      taosArrayDestroy(pInfoList);
1,803,247✔
1186

1187
      pDeleterParam->suid = tableListGetSuid(pTableListInfo);
1,803,247✔
1188

1189
      // TODO extract uid list
1190
      int32_t numOfTables = 0;
1,801,498✔
1191
      code = tableListGetSize(pTableListInfo, &numOfTables);
1,801,498✔
1192
      if (code != TSDB_CODE_SUCCESS) {
1,802,092✔
UNCOV
1193
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
UNCOV
1194
        taosMemoryFree(pDeleterParam);
×
1195
        return code;
×
1196
      }
1197

1198
      pDeleterParam->pUidList = taosArrayInit(numOfTables, sizeof(uint64_t));
1,802,092✔
1199
      if (NULL == pDeleterParam->pUidList) {
1,803,830✔
UNCOV
1200
        taosMemoryFree(pDeleterParam);
×
UNCOV
1201
        return terrno;
×
1202
      }
1203

1204
      for (int32_t i = 0; i < numOfTables; ++i) {
3,850,848✔
1205
        STableKeyInfo* pTable = tableListGetInfo(pTableListInfo, i);
2,047,018✔
1206
        if (!pTable) {
2,047,018✔
UNCOV
1207
          taosArrayDestroy(pDeleterParam->pUidList);
×
UNCOV
1208
          taosMemoryFree(pDeleterParam);
×
UNCOV
1209
          return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1210
        }
1211
        void* tmp = taosArrayPush(pDeleterParam->pUidList, &pTable->uid);
2,047,018✔
1212
        if (!tmp) {
2,047,018✔
UNCOV
1213
          taosArrayDestroy(pDeleterParam->pUidList);
×
UNCOV
1214
          taosMemoryFree(pDeleterParam);
×
UNCOV
1215
          return terrno;
×
1216
        }
1217
      }
1218

1219
      *pParam = pDeleterParam;
1,803,830✔
1220
      break;
1,803,830✔
1221
    }
1222
    default:
375,188,661✔
1223
      break;
375,188,661✔
1224
  }
1225

1226
  return TSDB_CODE_SUCCESS;
377,623,563✔
1227
}
1228

UNCOV
1229
void streamOpReleaseState(SOperatorInfo* pOperator) {
×
UNCOV
1230
  SOperatorInfo* downstream = pOperator->pDownstream[0];
×
1231
  if (downstream->fpSet.releaseStreamStateFn) {
×
1232
    downstream->fpSet.releaseStreamStateFn(downstream);
×
1233
  }
UNCOV
1234
}
×
1235

UNCOV
1236
void streamOpReloadState(SOperatorInfo* pOperator) {
×
UNCOV
1237
  SOperatorInfo* downstream = pOperator->pDownstream[0];
×
1238
  if (downstream->fpSet.reloadStreamStateFn) {
×
1239
    downstream->fpSet.reloadStreamStateFn(downstream);
×
1240
  }
UNCOV
1241
}
×
1242

1243
void freeOperatorParamImpl(SOperatorParam* pParam, SOperatorParamType type) {
164,292,605✔
1244
  int32_t childrenNum = taosArrayGetSize(pParam->pChildren);
164,292,605✔
1245
  for (int32_t i = 0; i < childrenNum; ++i) {
168,386,453✔
1246
    SOperatorParam* pChild = taosArrayGetP(pParam->pChildren, i);
4,092,675✔
1247
    freeOperatorParam(pChild, type);
4,092,675✔
1248
  }
1249

1250
  taosArrayDestroy(pParam->pChildren);
164,293,778✔
1251
  pParam->pChildren = NULL;
164,298,763✔
1252

1253
  taosMemoryFreeClear(pParam->value);
164,302,161✔
1254

1255
  taosMemoryFree(pParam);
164,311,316✔
1256
}
164,284,688✔
1257

1258
void freeExchangeGetBasicOperatorParam(void* pParam) {
29,759,551✔
1259
  SExchangeOperatorBasicParam* pBasic = (SExchangeOperatorBasicParam*)pParam;
29,759,551✔
1260
  if (pBasic->uidList) {
29,759,551✔
1261
    taosArrayDestroy(pBasic->uidList);
25,541,095✔
1262
    pBasic->uidList = NULL;
25,541,095✔
1263
  }
1264
  if (pBasic->orgTbInfo) {
29,759,551✔
1265
    taosArrayDestroy(pBasic->orgTbInfo->colMap);
12,320,620✔
1266
    taosMemoryFreeClear(pBasic->orgTbInfo);
12,320,620✔
1267
  }
1268
  if (pBasic->batchOrgTbInfo) {
29,759,551✔
1269
    taosArrayDestroyEx(pBasic->batchOrgTbInfo, destroySOrgTbInfo);
6,505,763✔
1270
    pBasic->batchOrgTbInfo = NULL;
6,505,763✔
1271
  }
1272
  if (pBasic->tagList) {
29,759,551✔
1273
    taosArrayDestroyEx(pBasic->tagList, destroyTagVal);
1,918,410✔
1274
    pBasic->tagList = NULL;
1,918,410✔
1275
  }
1276
}
29,759,551✔
1277

1278
void freeExchangeGetOperatorParam(SOperatorParam* pParam) {
27,280,770✔
1279
  SExchangeOperatorParam* pExcParam = (SExchangeOperatorParam*)pParam->value;
27,280,770✔
1280
  if (pExcParam->multiParams) {
27,280,770✔
1281
    SExchangeOperatorBatchParam* pExcBatch = (SExchangeOperatorBatchParam*)pParam->value;
4,278,174✔
1282
    tSimpleHashSetFreeFp(pExcBatch->pBatchs, freeExchangeGetBasicOperatorParam);
4,278,174✔
1283
    tSimpleHashCleanup(pExcBatch->pBatchs);
4,278,174✔
1284
  } else {
1285
    freeExchangeGetBasicOperatorParam(&pExcParam->basic);
23,002,596✔
1286
  }
1287

1288
  freeOperatorParamImpl(pParam, OP_GET_PARAM);
27,280,770✔
1289
}
27,280,770✔
1290

UNCOV
1291
void freeExchangeNotifyOperatorParam(SOperatorParam* pParam) { freeOperatorParamImpl(pParam, OP_NOTIFY_PARAM); }
×
1292

1293
void freeGroupCacheGetOperatorParam(SOperatorParam* pParam) { freeOperatorParamImpl(pParam, OP_GET_PARAM); }
7,818,642✔
1294

UNCOV
1295
void freeGroupCacheNotifyOperatorParam(SOperatorParam* pParam) { freeOperatorParamImpl(pParam, OP_NOTIFY_PARAM); }
×
1296

1297
void freeMergeJoinGetOperatorParam(SOperatorParam* pParam) { freeOperatorParamImpl(pParam, OP_GET_PARAM); }
3,909,321✔
1298

UNCOV
1299
void freeMergeJoinNotifyOperatorParam(SOperatorParam* pParam) { freeOperatorParamImpl(pParam, OP_NOTIFY_PARAM); }
×
1300

1301
void freeTagScanGetOperatorParam(SOperatorParam* pParam) { freeOperatorParamImpl(pParam, OP_GET_PARAM); }
7,118,678✔
1302

1303
void freeMergeGetOperatorParam(SOperatorParam* pParam) { freeOperatorParamImpl(pParam, OP_GET_PARAM); }
10,514,906✔
1304

UNCOV
1305
void freeDynQueryCtrlGetOperatorParam(SOperatorParam* pParam) { freeOperatorParamImpl(pParam, OP_GET_PARAM); }
×
1306

UNCOV
1307
void freeDynQueryCtrlNotifyOperatorParam(SOperatorParam* pParam) { freeOperatorParamImpl(pParam, OP_NOTIFY_PARAM); }
×
1308

UNCOV
1309
void freeInterpFuncGetOperatorParam(SOperatorParam* pParam) {
×
UNCOV
1310
  freeOperatorParamImpl(pParam, OP_GET_PARAM);
×
UNCOV
1311
}
×
1312

UNCOV
1313
void freeInterpFuncNotifyOperatorParam(SOperatorParam* pParam) {
×
UNCOV
1314
  freeOperatorParamImpl(pParam, OP_NOTIFY_PARAM);
×
UNCOV
1315
}
×
1316

1317
void freeTableScanGetOperatorParam(SOperatorParam* pParam) {
100,849,190✔
1318
  STableScanOperatorParam* pTableScanParam =
100,849,190✔
1319
    (STableScanOperatorParam*)pParam->value;
1320
  taosArrayDestroy(pTableScanParam->pUidList);
100,865,480✔
1321
  if (pTableScanParam->pOrgTbInfo) {
100,856,821✔
1322
    taosArrayDestroy(pTableScanParam->pOrgTbInfo->colMap);
84,321,552✔
1323
    taosMemoryFreeClear(pTableScanParam->pOrgTbInfo);
84,321,552✔
1324
  }
1325
  if (pTableScanParam->pBatchTbInfo) {
100,864,443✔
1326
    for (int32_t i = 0;
5,445,119✔
1327
      i < taosArrayGetSize(pTableScanParam->pBatchTbInfo); ++i) {
14,444,988✔
1328
      SOrgTbInfo* pOrgTbInfo =
1329
        (SOrgTbInfo*)taosArrayGet(pTableScanParam->pBatchTbInfo, i);
8,999,869✔
1330
      taosArrayDestroy(pOrgTbInfo->colMap);
8,999,869✔
1331
    }
1332
    taosArrayDestroy(pTableScanParam->pBatchTbInfo);
5,445,119✔
1333
    pTableScanParam->pBatchTbInfo = NULL;
5,445,119✔
1334
  }
1335
  if (pTableScanParam->pTagList) {
100,871,526✔
1336
    for (int32_t i = 0;
1,918,410✔
1337
      i < taosArrayGetSize(pTableScanParam->pTagList); ++i) {
12,533,916✔
1338
      STagVal* pTagVal =
1339
        (STagVal*)taosArrayGet(pTableScanParam->pTagList, i);
10,615,506✔
1340
      if (IS_VAR_DATA_TYPE(pTagVal->type)) {
10,615,506✔
1341
        taosMemoryFreeClear(pTagVal->pData);
4,641,390✔
1342
      }
1343
    }
1344
    taosArrayDestroy(pTableScanParam->pTagList);
1,918,410✔
1345
    pTableScanParam->pTagList = NULL;
1,918,410✔
1346
  }
1347
  freeOperatorParamImpl(pParam, OP_GET_PARAM);
100,871,818✔
1348
}
100,844,974✔
1349

UNCOV
1350
void freeTableScanNotifyOperatorParam(SOperatorParam* pParam) { freeOperatorParamImpl(pParam, OP_NOTIFY_PARAM); }
×
1351

UNCOV
1352
void freeTagScanNotifyOperatorParam(SOperatorParam* pParam) { freeOperatorParamImpl(pParam, OP_NOTIFY_PARAM); }
×
1353

UNCOV
1354
void freeMergeNotifyOperatorParam(SOperatorParam* pParam) { freeOperatorParamImpl(pParam, OP_NOTIFY_PARAM); }
×
1355

1356
void freeOpParamItem(void* pItem) {
17,855,286✔
1357
  SOperatorParam* pParam = *(SOperatorParam**)pItem;
17,855,286✔
1358
  pParam->reUse = false;
17,855,286✔
1359
  freeOperatorParam(pParam, OP_GET_PARAM);
17,855,286✔
1360
}
17,855,286✔
1361

1362
static void destroyRefColIdGroupParam(void* info) {
8,327✔
1363
  SRefColIdGroup* pGroup = (SRefColIdGroup*)info;
8,327✔
1364
  if (pGroup && pGroup->pSlotIdList) {
8,327✔
1365
    taosArrayDestroy(pGroup->pSlotIdList);
8,327✔
1366
    pGroup->pSlotIdList = NULL;
8,327✔
1367
  }
1368
}
8,327✔
1369

1370
void freeExternalWindowGetOperatorParam(SOperatorParam* pParam) {
1,258,073✔
1371
  SExternalWindowOperatorParam *pExtParam = (SExternalWindowOperatorParam*)pParam->value;
1,258,073✔
1372
  taosArrayDestroy(pExtParam->ExtWins);
1,258,073✔
1373
  for (int32_t i = 0; i < taosArrayGetSize(pParam->pChildren); i++) {
1,258,073✔
UNCOV
1374
    SOperatorParam* pChild = *(SOperatorParam**)taosArrayGet(pParam->pChildren, i);
×
UNCOV
1375
    pChild->reUse = false;
×
1376
  }
1377
  freeOperatorParamImpl(pParam, OP_GET_PARAM);
1,258,073✔
1378
}
1,258,073✔
1379

1380
void freeVirtualTableScanGetOperatorParam(SOperatorParam* pParam) {
5,534,666✔
1381
  SVTableScanOperatorParam* pVTableScanParam = (SVTableScanOperatorParam*)pParam->value;
5,534,666✔
1382
  taosArrayDestroyEx(pVTableScanParam->pOpParamArray, freeOpParamItem);
5,534,666✔
1383
  if (pVTableScanParam->pRefColGroups) {
5,534,666✔
1384
    taosArrayDestroyEx(pVTableScanParam->pRefColGroups, destroyRefColIdGroupParam);
5,132✔
1385
    pVTableScanParam->pRefColGroups = NULL;
5,132✔
1386
  }
1387
  freeOpParamItem(&pVTableScanParam->pTagScanOp);
5,534,666✔
1388
  freeOperatorParamImpl(pParam, OP_GET_PARAM);
5,534,666✔
1389
}
5,534,666✔
1390

UNCOV
1391
void freeVTableScanNotifyOperatorParam(SOperatorParam* pParam) { freeOperatorParamImpl(pParam, OP_NOTIFY_PARAM); }
×
1392

UNCOV
1393
void freeExternalWindowNotifyOperatorParam(SOperatorParam* pParam) { freeOperatorParamImpl(pParam, OP_NOTIFY_PARAM); }
×
1394

1395
void freeOperatorParam(SOperatorParam* pParam, SOperatorParamType type) {
865,132,275✔
1396
  if (NULL == pParam || pParam->reUse) {
865,132,275✔
1397
    return;
700,838,061✔
1398
  }
1399

1400
  switch (pParam->opType) {
164,300,199✔
1401
    case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE:
27,280,770✔
1402
      type == OP_GET_PARAM ? freeExchangeGetOperatorParam(pParam) : freeExchangeNotifyOperatorParam(pParam);
27,280,770✔
1403
      break;
27,280,770✔
1404
    case QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE:
7,818,642✔
1405
      type == OP_GET_PARAM ? freeGroupCacheGetOperatorParam(pParam) : freeGroupCacheNotifyOperatorParam(pParam);
7,818,642✔
1406
      break;
7,818,642✔
1407
    case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN:
3,909,321✔
1408
      type == OP_GET_PARAM ? freeMergeJoinGetOperatorParam(pParam) : freeMergeJoinNotifyOperatorParam(pParam);
3,909,321✔
1409
      break;
3,909,321✔
1410
    case QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN:
100,863,536✔
1411
    case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:
1412
      type == OP_GET_PARAM ? freeTableScanGetOperatorParam(pParam) : freeTableScanNotifyOperatorParam(pParam);
100,863,536✔
1413
      break;
100,850,779✔
1414
    case QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN:
5,534,666✔
1415
      type == OP_GET_PARAM ? freeVirtualTableScanGetOperatorParam(pParam) : freeVTableScanNotifyOperatorParam(pParam);
5,534,666✔
1416
      break;
5,534,666✔
1417
    case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
7,118,678✔
1418
      type == OP_GET_PARAM ? freeTagScanGetOperatorParam(pParam) : freeTagScanNotifyOperatorParam(pParam);
7,118,678✔
1419
      break;
7,118,678✔
1420
    case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG:
10,514,285✔
1421
    case QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL:
1422
    case QUERY_NODE_PHYSICAL_PLAN_MERGE:
1423
    case QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL:
1424
    case QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL:
1425
      type == OP_GET_PARAM ? freeMergeGetOperatorParam(pParam) : freeMergeNotifyOperatorParam(pParam);
10,514,285✔
1426
      break;
10,513,433✔
1427
    case QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW:
1,258,073✔
1428
      type == OP_GET_PARAM ? freeExternalWindowGetOperatorParam(pParam) : freeExternalWindowNotifyOperatorParam(pParam);
1,258,073✔
1429
      break;
1,258,073✔
UNCOV
1430
    case QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL:
×
UNCOV
1431
      type == OP_GET_PARAM ? freeDynQueryCtrlGetOperatorParam(pParam) : freeDynQueryCtrlNotifyOperatorParam(pParam);
×
UNCOV
1432
      break;
×
UNCOV
1433
    case QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC:
×
UNCOV
1434
      type == OP_GET_PARAM ? freeInterpFuncGetOperatorParam(pParam) : freeInterpFuncNotifyOperatorParam(pParam);
×
UNCOV
1435
      break;
×
1436
    default:
18,552✔
1437
      qError("%s unsupported op %d param, param type %d, param:%p value:%p children:%p reuse:%d",
18,552✔
1438
             __func__, pParam->opType, type, pParam, pParam->value, pParam->pChildren, pParam->reUse);
UNCOV
1439
      break;
×
1440
  }
1441
}
1442

1443
void freeResetOperatorParams(struct SOperatorInfo* pOperator, SOperatorParamType type, bool allFree) {
1,778,940,683✔
1444
  SOperatorParam**  ppParam = NULL;
1,778,940,683✔
1445
  SOperatorParam*** pppDownstramParam = NULL;
1,778,940,683✔
1446
  switch (type) {
1,778,940,683✔
1447
    case OP_GET_PARAM:
953,129,247✔
1448
      ppParam = &pOperator->pOperatorGetParam;
953,129,247✔
1449
      pppDownstramParam = &pOperator->pDownstreamGetParams;
953,132,633✔
1450
      break;
953,142,393✔
1451
    case OP_NOTIFY_PARAM:
825,889,191✔
1452
      ppParam = &pOperator->pOperatorNotifyParam;
825,889,191✔
1453
      pppDownstramParam = &pOperator->pDownstreamNotifyParams;
825,886,856✔
1454
      break;
825,895,489✔
UNCOV
1455
    default:
×
UNCOV
1456
      return;
×
1457
  }
1458

1459
  if (*ppParam) {
1,779,037,882✔
1460
    qDebug("%s free self param, operator:%s type:%d paramType:%d param:%p", __func__, pOperator->name,
9,566,515✔
1461
           pOperator->operatorType, type, *ppParam);
1462
    freeOperatorParam(*ppParam, type);
9,566,515✔
1463
    *ppParam = NULL;
9,566,515✔
1464
  }
1465

1466
  if (*pppDownstramParam) {
1,778,918,947✔
1467
    for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
158,908,200✔
1468
      if ((*pppDownstramParam)[i]) {
28,632,149✔
UNCOV
1469
        qDebug("%s free downstream param, operator:%s type:%d idx:%d downstream:%s paramType:%d param:%p", __func__,
×
1470
               pOperator->name, pOperator->operatorType, i, pOperator->pDownstream[i]->name, type,
1471
               (*pppDownstramParam)[i]);
UNCOV
1472
        freeOperatorParam((*pppDownstramParam)[i], type);
×
UNCOV
1473
        (*pppDownstramParam)[i] = NULL;
×
1474
      }
1475
    }
1476
    if (allFree) {
130,277,470✔
1477
      taosMemoryFreeClear(*pppDownstramParam);
26,585,964✔
1478
    }
1479
  }
1480
}
1481

1482
FORCE_INLINE int32_t getNextBlockFromDownstreamImpl(struct SOperatorInfo* pOperator, int32_t idx, bool clearParam,
997,619,351✔
1483
                                                    SSDataBlock** pResBlock) {
1484
  QRY_PARAM_CHECK(pResBlock);
997,619,351✔
1485

1486
  int32_t code = 0;
997,700,403✔
1487
  if (pOperator->pDownstreamGetParams && pOperator->pDownstreamGetParams[idx]) {
997,700,403✔
1488
    qDebug("DynOp: op %s start to get block from downstream %s", pOperator->name, pOperator->pDownstream[idx]->name);
19,181,440✔
1489
    code = pOperator->pDownstream[idx]->fpSet.getNextExtFn(pOperator->pDownstream[idx],
38,362,880✔
1490
                                                           pOperator->pDownstreamGetParams[idx], pResBlock);
19,181,440✔
1491
    if (clearParam && (code == 0)) {
19,181,440✔
1492
      qDebug("%s clear downstream param, operator:%s type:%d idx:%d downstream:%s param:%p", __func__,
7,299,857✔
1493
             pOperator->name, pOperator->operatorType, idx, pOperator->pDownstream[idx]->name,
1494
             pOperator->pDownstreamGetParams[idx]);
1495
      freeOperatorParam(pOperator->pDownstreamGetParams[idx], OP_GET_PARAM);
7,299,857✔
1496
      pOperator->pDownstreamGetParams[idx] = NULL;
7,299,857✔
1497
    }
1498

1499
    if (code) {
19,181,440✔
1500
      qError("failed to get next data block from upstream at %s, line:%d code:%s", __func__, __LINE__, tstrerror(code));
×
1501
    }
1502
    return code;
19,181,440✔
1503
  }
1504

1505
  code = pOperator->pDownstream[idx]->fpSet.getNextFn(pOperator->pDownstream[idx], pResBlock);
978,491,451✔
1506
  if (code) {
975,892,196✔
UNCOV
1507
    qError("failed to get next data block from upstream at %s, %d code:%s", __func__, __LINE__, tstrerror(code));
×
1508
  }
1509
  return code;
975,842,153✔
1510
}
1511

1512
bool compareVal(const char* v, const SStateKeys* pKey) {
2,147,483,647✔
1513
  if (IS_VAR_DATA_TYPE(pKey->type)) {
2,147,483,647✔
1514
    if (IS_STR_DATA_BLOB(pKey->type)) {
533,525✔
UNCOV
1515
      if (blobDataLen(v) != blobDataLen(pKey->pData)) {
×
UNCOV
1516
        return false;
×
1517
      } else {
UNCOV
1518
        return memcmp(blobDataVal(v), blobDataVal(pKey->pData), blobDataLen(v)) == 0;
×
1519
      }
1520
    } else {
1521
      if (varDataLen(v) != varDataLen(pKey->pData)) {
533,525✔
UNCOV
1522
        return false;
×
1523
      } else {
1524
        return memcmp(varDataVal(v), varDataVal(pKey->pData), varDataLen(v)) == 0;
533,525✔
1525
      }
1526
    }
1527
  } else {
1528
    return memcmp(pKey->pData, v, pKey->bytes) == 0;
2,147,483,647✔
1529
  }
1530
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc