• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.55
/source/libs/executor/src/executorInt.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "filter.h"
17
#include "function.h"
18
#include "functionMgt.h"
19
#include "os.h"
20
#include "querynodes.h"
21
#include "tfill.h"
22
#include "tname.h"
23

24
#include "tdatablock.h"
25
#include "tmsg.h"
26
#include "ttime.h"
27

28
#include "executorInt.h"
29
#include "index.h"
30
#include "operator.h"
31
#include "query.h"
32
#include "querytask.h"
33
#include "storageapi.h"
34
#include "tcompare.h"
35
#include "thash.h"
36
#include "ttypes.h"
37

38
#define SET_REVERSE_SCAN_FLAG(runtime)    ((runtime)->scanFlag = REVERSE_SCAN)
39
#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)
40

41
#if 0
42
static UNUSED_FUNC void *u_malloc (size_t __size) {
43
  uint32_t v = taosRand();
44

45
  if (v % 1000 <= 0) {
46
    return NULL;
47
  } else {
48
    return taosMemoryMalloc(__size);
49
  }
50
}
51

52
static UNUSED_FUNC void* u_calloc(size_t num, size_t __size) {
53
  uint32_t v = taosRand();
54
  if (v % 1000 <= 0) {
55
    return NULL;
56
  } else {
57
    return taosMemoryCalloc(num, __size);
58
  }
59
}
60

61
static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
62
  uint32_t v = taosRand();
63
  if (v % 5 <= 1) {
64
    return NULL;
65
  } else {
66
    return taosMemoryRealloc(p, __size);
67
  }
68
}
69

70
#define calloc  u_calloc
71
#define malloc  u_malloc
72
#define realloc u_realloc
73
#endif
74

75
static int32_t setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pBlock);
76

77
static int32_t initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
78
static void    doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag);
79

80
static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
81
                                   bool createDummyCol);
82
static void    doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
83
                                  SGroupResInfo* pGroupResInfo, int32_t threshold, bool ignoreGroup,
84
                                  int64_t minWindowSize);
85

86
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize) {
211,493,297✔
87
  SFilePage* pData = NULL;
211,493,297✔
88

89
  // in the first scan, new space needed for results
90
  int32_t pageId = -1;
211,493,297✔
91
  if (*currentPageId == -1) {
211,493,297✔
92
    pData = getNewBufPage(pResultBuf, &pageId);
5,228,438✔
93
    if (pData == NULL) {
5,227,870!
UNCOV
94
      qError("failed to get buffer, code:%s", tstrerror(terrno));
×
95
      return NULL;
×
96
    }
97
    pData->num = sizeof(SFilePage);
5,227,920✔
98
  } else {
99
    pData = getBufPage(pResultBuf, *currentPageId);
206,264,859✔
100
    if (pData == NULL) {
206,316,692!
101
      qError("failed to get buffer, code:%s", tstrerror(terrno));
×
102
      return NULL;
×
103
    }
104

105
    pageId = *currentPageId;
206,316,692✔
106

107
    if (pData->num + interBufSize > getBufPageSize(pResultBuf)) {
206,316,692✔
108
      // release current page first, and prepare the next one
109
      releaseBufPage(pResultBuf, pData);
14,396,217✔
110

111
      pData = getNewBufPage(pResultBuf, &pageId);
14,573,659✔
112
      if (pData == NULL) {
14,583,537!
113
        qError("failed to get buffer, code:%s", tstrerror(terrno));
×
114
        return NULL;
×
115
      }
116
      pData->num = sizeof(SFilePage);
14,583,537✔
117
    }
118
  }
119

120
  if (pData == NULL) {
211,648,719!
121
    return NULL;
×
122
  }
123

124
  setBufPageDirty(pData, true);
211,648,719✔
125

126
  // set the number of rows in current disk page
127
  SResultRow* pResultRow = (SResultRow*)((char*)pData + pData->num);
211,462,310✔
128

129
  memset((char*)pResultRow, 0, interBufSize);
211,462,310✔
130
  pResultRow->pageId = pageId;
211,462,310✔
131
  pResultRow->offset = (int32_t)pData->num;
211,462,310✔
132

133
  *currentPageId = pageId;
211,462,310✔
134
  pData->num += interBufSize;
211,462,310✔
135
  return pResultRow;
211,462,310✔
136
}
137

138
/**
139
 * the struct of key in hash table
140
 * +----------+---------------+
141
 * | group id |   key data    |
142
 * | 8 bytes  | actual length |
143
 * +----------+---------------+
144
 */
145
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData,
260,186,485✔
146
                                   int32_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo,
147
                                   bool isIntervalQuery, SAggSupporter* pSup, bool keepGroup) {
148
  SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId);
260,186,485✔
149
  if (!keepGroup) {
260,186,485✔
150
    *(uint64_t*)pSup->keyBuf = calcGroupId(pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
16,445,117✔
151
  }
152

153
  SResultRowPosition* p1 =
154
      (SResultRowPosition*)tSimpleHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
260,410,689✔
155

156
  SResultRow* pResult = NULL;
260,263,284✔
157

158
  // in case of repeat scan/reverse scan, no new time window added.
159
  if (isIntervalQuery) {
260,263,284✔
160
    if (p1 != NULL) {  // the *p1 may be NULL in case of sliding+offset exists.
239,337,017✔
161
      pResult = getResultRowByPos(pResultBuf, p1, true);
42,234,886✔
162
      if (pResult == NULL) {
42,234,886!
163
        pTaskInfo->code = terrno;
×
164
        return NULL;
×
165
      }
166

167
      if (pResult->pageId != p1->pageId || pResult->offset != p1->offset) {
42,234,886!
168
        terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
270✔
169
        pTaskInfo->code = terrno;
×
170
        return NULL;
×
171
      }
172
    }
173
  } else {
174
    // In case of group by column query, the required SResultRow object must be existInCurrentResusltRowInfo in the
175
    // pResultRowInfo object.
176
    if (p1 != NULL) {
20,926,267✔
177
      // todo
178
      pResult = getResultRowByPos(pResultBuf, p1, true);
5,906,990✔
179
      if (NULL == pResult) {
5,906,990!
180
        pTaskInfo->code = terrno;
×
181
        return NULL;
×
182
      }
183

184
      if (pResult->pageId != p1->pageId || pResult->offset != p1->offset) {
5,906,990!
185
        terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
186
        pTaskInfo->code = terrno;
×
187
        return NULL;
10,973✔
188
      }
189
    }
190
  }
191

192
  // 1. close current opened time window
193
  if (pResultRowInfo->cur.pageId != -1 && ((pResult == NULL) || (pResult->pageId != pResultRowInfo->cur.pageId))) {
260,020,999✔
194
    SResultRowPosition pos = pResultRowInfo->cur;
210,494,232✔
195
    SFilePage*         pPage = getBufPage(pResultBuf, pos.pageId);
210,494,232✔
196
    if (pPage == NULL) {
210,003,031!
197
      qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
×
198
      pTaskInfo->code = terrno;
×
199
      return NULL;
×
200
    }
201
    releaseBufPage(pResultBuf, pPage);
210,003,031✔
202
  }
203

204
  // allocate a new buffer page
205
  if (pResult == NULL) {
259,223,199✔
206
    pResult = getNewResultRow(pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
211,209,221✔
207
    if (pResult == NULL) {
211,447,873!
208
      pTaskInfo->code = terrno;
×
209
      return NULL;
×
210
    }
211

212
    // add a new result set for a new group
213
    SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset};
211,447,873✔
214
    int32_t code = tSimpleHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos,
211,447,873✔
215
                                  sizeof(SResultRowPosition));
216
    if (code != TSDB_CODE_SUCCESS) {
212,968,167!
217
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
218
      pTaskInfo->code = code;
×
219
      return NULL;
×
220
    }
221
  }
222

223
  // 2. set the new time window to be the new active time window
224
  pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
261,071,601✔
225

226
  // too many time window in query
227
  if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH &&
521,524,294!
228
      tSimpleHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) {
260,992,295✔
229
    pTaskInfo->code = TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW;
×
230
    return NULL;
×
231
  }
232

233
  return pResult;
260,531,999✔
234
}
235

236
//  query_range_start, query_range_end, window_duration, window_start, window_end
237
int32_t initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow) {
2,472,771✔
238
  pColData->info.type = TSDB_DATA_TYPE_TIMESTAMP;
2,472,771✔
239
  pColData->info.bytes = sizeof(int64_t);
2,472,771✔
240

241
  int32_t code = colInfoDataEnsureCapacity(pColData, 5, false);
2,472,771✔
242
  if (code != TSDB_CODE_SUCCESS) {
2,475,882!
243
    return code;
×
244
  }
245
  colDataSetInt64(pColData, 0, &pQueryWindow->skey);
2,475,882✔
246
  colDataSetInt64(pColData, 1, &pQueryWindow->ekey);
2,475,882✔
247

248
  int64_t interval = 0;
2,475,882✔
249
  colDataSetInt64(pColData, 2, &interval);  // this value may be variable in case of 'n' and 'y'.
250
  colDataSetInt64(pColData, 3, &pQueryWindow->skey);
2,475,882✔
251
  colDataSetInt64(pColData, 4, &pQueryWindow->ekey);
2,475,882✔
252
  return TSDB_CODE_SUCCESS;
2,475,882✔
253
}
254

255
static int32_t doSetInputDataBlockInfo(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag) {
1,630,644✔
256
  int32_t         code = TSDB_CODE_SUCCESS;
1,630,644✔
257
  int32_t         lino = 0;
1,630,644✔
258
  SqlFunctionCtx* pCtx = pExprSup->pCtx;
1,630,644✔
259
  for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
3,261,290✔
260
    pCtx[i].order = order;
1,630,648✔
261
    pCtx[i].input.numOfRows = pBlock->info.rows;
1,630,648✔
262
    code = setBlockSMAInfo(&pCtx[i], &pExprSup->pExprInfo[i], pBlock);
1,630,648✔
263
    QUERY_CHECK_CODE(code, lino, _end);
1,630,646!
264
    pCtx[i].pSrcBlock = pBlock;
1,630,646✔
265
    pCtx[i].scanFlag = scanFlag;
1,630,646✔
266
  }
267

268
_end:
1,630,642✔
269
  if (code != TSDB_CODE_SUCCESS) {
1,630,642!
270
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
271
  }
272
  return code;
1,630,642✔
273
}
274

275
int32_t setInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
24,435,895✔
276
                          bool createDummyCol) {
277
  if (pBlock->pBlockAgg != NULL) {
24,435,895✔
278
    return doSetInputDataBlockInfo(pExprSup, pBlock, order, scanFlag);
1,630,644✔
279
  } else {
280
    return doSetInputDataBlock(pExprSup, pBlock, order, scanFlag, createDummyCol);
22,805,251✔
281
  }
282
}
283

284
static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t paramIndex,
×
285
                                             int32_t numOfRows) {
286
  int32_t          code = TSDB_CODE_SUCCESS;
×
287
  int32_t          lino = 0;
×
288
  SColumnInfoData* pColInfo = NULL;
×
289
  if (pInput->pData[paramIndex] == NULL) {
×
290
    pColInfo = taosMemoryCalloc(1, sizeof(SColumnInfoData));
×
291
    QUERY_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
292

293
    // Set the correct column info (data type and bytes)
294
    pColInfo->info.type = pFuncParam->param.nType;
×
295
    pColInfo->info.bytes = pFuncParam->param.nLen;
×
296

297
    pInput->pData[paramIndex] = pColInfo;
×
298
  } else {
299
    pColInfo = pInput->pData[paramIndex];
×
300
  }
301

302
  code = colInfoDataEnsureCapacity(pColInfo, numOfRows, false);
×
303
  QUERY_CHECK_CODE(code, lino, _end);
×
304

305
  int8_t type = pFuncParam->param.nType;
×
306
  if (type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_UBIGINT) {
×
307
    int64_t v = pFuncParam->param.i;
×
308
    for (int32_t i = 0; i < numOfRows; ++i) {
×
309
      colDataSetInt64(pColInfo, i, &v);
×
310
    }
311
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
×
312
    double v = pFuncParam->param.d;
×
313
    for (int32_t i = 0; i < numOfRows; ++i) {
×
314
      colDataSetDouble(pColInfo, i, &v);
×
315
    }
316
  } else if (type == TSDB_DATA_TYPE_VARCHAR || type == TSDB_DATA_TYPE_GEOMETRY) {
×
317
    char* tmp = taosMemoryMalloc(pFuncParam->param.nLen + VARSTR_HEADER_SIZE);
×
318
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
×
319

320
    STR_WITH_SIZE_TO_VARSTR(tmp, pFuncParam->param.pz, pFuncParam->param.nLen);
×
321
    for (int32_t i = 0; i < numOfRows; ++i) {
×
322
      code = colDataSetVal(pColInfo, i, tmp, false);
×
323
      QUERY_CHECK_CODE(code, lino, _end);
×
324
    }
325
    taosMemoryFree(tmp);
×
326
  }
327

328
_end:
×
329
  if (code != TSDB_CODE_SUCCESS) {
×
330
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
331
  }
332
  return code;
×
333
}
334

335
static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
22,804,911✔
336
                                   bool createDummyCol) {
337
  int32_t         code = TSDB_CODE_SUCCESS;
22,804,911✔
338
  int32_t         lino = 0;
22,804,911✔
339
  SqlFunctionCtx* pCtx = pExprSup->pCtx;
22,804,911✔
340

341
  for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
81,781,880✔
342
    pCtx[i].order = order;
58,977,447✔
343
    pCtx[i].input.numOfRows = pBlock->info.rows;
58,977,447✔
344

345
    pCtx[i].pSrcBlock = pBlock;
58,977,447✔
346
    pCtx[i].scanFlag = scanFlag;
58,977,447✔
347

348
    SInputColumnInfoData* pInput = &pCtx[i].input;
58,977,447✔
349
    pInput->uid = pBlock->info.id.uid;
58,977,447✔
350
    pInput->colDataSMAIsSet = false;
58,977,447✔
351

352
    SExprInfo* pOneExpr = &pExprSup->pExprInfo[i];
58,977,447✔
353
    bool       hasPk = pOneExpr->pExpr->nodeType == QUERY_NODE_FUNCTION && pOneExpr->pExpr->_function.pFunctNode->hasPk;
58,977,447✔
354
    pCtx[i].hasPrimaryKey = hasPk;
58,977,447✔
355

356
    int16_t tsParamIdx = (!hasPk) ? pOneExpr->base.numOfParams - 1 : pOneExpr->base.numOfParams - 2;
58,977,447✔
357
    int16_t pkParamIdx = pOneExpr->base.numOfParams - 1;
58,977,447✔
358

359
    for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
123,117,094✔
360
      SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
64,140,125✔
361
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
64,140,125✔
362
        int32_t slotId = pFuncParam->pCol->slotId;
56,306,674✔
363
        pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
56,306,674✔
364
        pInput->totalRows = pBlock->info.rows;
56,306,018✔
365
        pInput->numOfRows = pBlock->info.rows;
56,306,018✔
366
        pInput->startRowIndex = 0;
56,306,018✔
367
        pInput->blankFill = pBlock->info.blankFill;
56,306,018✔
368

369
        // NOTE: the last parameter is the primary timestamp column
370
        // todo: refactor this
371

372
        if (fmIsImplicitTsFunc(pCtx[i].functionId) && (j == tsParamIdx)) {
56,306,018✔
373
          pInput->pPTS = pInput->pData[j];  // in case of merge function, this is not always the ts column data.
7,473,763✔
374
        }
375
        if (hasPk && (j == pkParamIdx)) {
56,306,196✔
376
          pInput->pPrimaryKey = pInput->pData[j];
30,342✔
377
        }
378
        QUERY_CHECK_CONDITION((pInput->pData[j] != NULL), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
56,306,196!
379
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
7,833,451✔
380
        // todo avoid case: top(k, 12), 12 is the value parameter.
381
        // sum(11), 11 is also the value parameter.
382
        if (createDummyCol && pOneExpr->base.numOfParams == 1) {
6,668,632!
383
          pInput->totalRows = pBlock->info.rows;
×
384
          pInput->numOfRows = pBlock->info.rows;
×
385
          pInput->startRowIndex = 0;
×
386
          pInput->blankFill = pBlock->info.blankFill;
×
387

388
          code = doCreateConstantValColumnInfo(pInput, pFuncParam, j, pBlock->info.rows);
×
389
          QUERY_CHECK_CODE(code, lino, _end);
×
390
        }
391
      }
392
    }
393
  }
394

395
_end:
22,804,433✔
396
  if (code != TSDB_CODE_SUCCESS) {
22,804,433!
397
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
398
  }
399
  return code;
22,806,447✔
400
}
401

402
bool functionNeedToExecute(SqlFunctionCtx* pCtx) {
872,240,881✔
403
  struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
872,240,881✔
404

405
  // in case of timestamp column, always generated results.
406
  int32_t functionId = pCtx->functionId;
872,240,881✔
407
  if (functionId == -1) {
872,240,881✔
408
    return false;
34,695,731✔
409
  }
410

411
  if (pCtx->scanFlag == PRE_SCAN) {
837,545,150✔
412
    return fmIsRepeatScanFunc(pCtx->functionId);
1,106,839✔
413
  }
414

415
  if (isRowEntryCompleted(pResInfo)) {
836,438,311!
416
    return false;
×
417
  }
418

419
  return true;
836,438,311✔
420
}
421

422
static int32_t doCreateConstantValColumnSMAInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t type,
1,329,696✔
423
                                                int32_t paramIndex, int32_t numOfRows) {
424
  if (pInput->pData[paramIndex] == NULL) {
1,329,696✔
425
    pInput->pData[paramIndex] = taosMemoryCalloc(1, sizeof(SColumnInfoData));
18!
426
    if (pInput->pData[paramIndex] == NULL) {
18!
427
      return terrno;
×
428
    }
429

430
    // Set the correct column info (data type and bytes)
431
    pInput->pData[paramIndex]->info.type = type;
18✔
432
    pInput->pData[paramIndex]->info.bytes = tDataTypes[type].bytes;
18✔
433
  }
434

435
  SColumnDataAgg* da = NULL;
1,329,696✔
436
  if (pInput->pColumnDataAgg[paramIndex] == NULL) {
1,329,696✔
437
    da = taosMemoryCalloc(1, sizeof(SColumnDataAgg));
18!
438
    if (!da) {
18!
439
      return terrno;
×
440
    }
441
    pInput->pColumnDataAgg[paramIndex] = da;
18✔
442
  } else {
443
    da = pInput->pColumnDataAgg[paramIndex];
1,329,678✔
444
  }
445

446
  if (type == TSDB_DATA_TYPE_BIGINT) {
1,329,696!
447
    int64_t v = pFuncParam->param.i;
1,329,696✔
448
    *da = (SColumnDataAgg){.numOfNull = 0, .min = v, .max = v, .sum = v * numOfRows};
1,329,696✔
449
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
×
450
    double v = pFuncParam->param.d;
×
451
    *da = (SColumnDataAgg){.numOfNull = 0};
×
452

453
    *(double*)&da->min = v;
×
454
    *(double*)&da->max = v;
×
455
    *(double*)&da->sum = v * numOfRows;
×
456
  } else if (type == TSDB_DATA_TYPE_BOOL) {  // todo validate this data type
×
457
    bool v = pFuncParam->param.i;
×
458

459
    *da = (SColumnDataAgg){.numOfNull = 0};
×
460
    *(bool*)&da->min = 0;
×
461
    *(bool*)&da->max = v;
×
462
    *(bool*)&da->sum = v * numOfRows;
×
463
  } else if (type == TSDB_DATA_TYPE_TIMESTAMP) {
×
464
    // do nothing
465
  } else {
466
    qError("invalid constant type for sma info");
×
467
  }
468

469
  return TSDB_CODE_SUCCESS;
1,329,696✔
470
}
471

472
int32_t setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pBlock) {
1,630,648✔
473
  int32_t code = TSDB_CODE_SUCCESS;
1,630,648✔
474
  int32_t lino = 0;
1,630,648✔
475
  int32_t numOfRows = pBlock->info.rows;
1,630,648✔
476

477
  SInputColumnInfoData* pInput = &pCtx->input;
1,630,648✔
478
  pInput->numOfRows = numOfRows;
1,630,648✔
479
  pInput->totalRows = numOfRows;
1,630,648✔
480

481
  if (pBlock->pBlockAgg != NULL) {
1,630,648!
482
    pInput->colDataSMAIsSet = true;
1,630,648✔
483

484
    for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
4,590,992✔
485
      SFunctParam* pFuncParam = &pExprInfo->base.pParam[j];
2,960,344✔
486

487
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
2,960,344✔
488
        int32_t slotId = pFuncParam->pCol->slotId;
1,630,648✔
489
        pInput->pColumnDataAgg[j] = &pBlock->pBlockAgg[slotId];
1,630,648✔
490
        if (pInput->pColumnDataAgg[j]->colId == -1) {
1,630,648✔
491
          pInput->colDataSMAIsSet = false;
1,716✔
492
        }
493

494
        // Here we set the column info data since the data type for each column data is required, but
495
        // the data in the corresponding SColumnInfoData will not be used.
496
        pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
1,630,648✔
497
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
1,329,696!
498
        code = doCreateConstantValColumnSMAInfo(pInput, pFuncParam, pFuncParam->param.nType, j, pBlock->info.rows);
1,329,696✔
499
        QUERY_CHECK_CODE(code, lino, _end);
1,329,696!
500
      }
501
    }
502
  } else {
UNCOV
503
    pInput->colDataSMAIsSet = false;
×
504
  }
505

506
_end:
1,630,648✔
507
  if (code != TSDB_CODE_SUCCESS) {
1,630,648!
508
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
509
  }
510
  return code;
1,630,648✔
511
}
512

513
/////////////////////////////////////////////////////////////////////////////////////////////
514
STimeWindow getAlignQueryTimeWindow(const SInterval* pInterval, int64_t key) {
33,899,704✔
515
  STimeWindow win = {0};
33,899,704✔
516
  win.skey = taosTimeTruncate(key, pInterval);
33,899,704✔
517

518
  /*
519
   * if the realSkey > INT64_MAX - pInterval->interval, the query duration between
520
   * realSkey and realEkey must be less than one interval.Therefore, no need to adjust the query ranges.
521
   */
522
  win.ekey = taosTimeGetIntervalEnd(win.skey, pInterval);
33,901,380✔
523
  if (win.ekey < win.skey) {
33,887,317✔
524
    win.ekey = INT64_MAX;
2,066✔
525
  }
526

527
  return win;
33,887,317✔
528
}
529

530
int32_t setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput,
407,381,934✔
531
                            int32_t* rowEntryInfoOffset) {
532
  bool init = false;
407,381,934✔
533
  for (int32_t i = 0; i < numOfOutput; ++i) {
1,554,922,000✔
534
    pCtx[i].resultInfo = getResultEntryInfo(pResult, i, rowEntryInfoOffset);
1,149,128,241✔
535
    if (init) {
1,149,191,108✔
536
      continue;
114,385,646✔
537
    }
538

539
    struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
1,034,805,462✔
540
    if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) {
1,034,805,462!
541
      continue;
×
542
    }
543

544
    if (pCtx[i].isPseudoFunc) {
1,034,805,462✔
545
      continue;
270,997,581✔
546
    }
547

548
    if (!pResInfo->initialized) {
763,807,881✔
549
      if (pCtx[i].functionId != -1) {
720,822,539✔
550
        int32_t code = pCtx[i].fpSet.init(&pCtx[i], pResInfo);
697,540,421✔
551
        if (code != TSDB_CODE_SUCCESS && fmIsUserDefinedFunc(pCtx[i].functionId)) {
697,836,663!
552
          pResInfo->initialized = false;
1,947,284✔
553
          qError("failed to initialize udf, funcId:%d error:%s", pCtx[i].functionId, tstrerror(code));
1,947,284!
554
          return TSDB_CODE_UDF_FUNC_EXEC_FAILURE;
×
555
        } else if (code != TSDB_CODE_SUCCESS) {
695,889,379!
556
          qError("failed to initialize function context, funcId:%d error:%s", pCtx[i].functionId, tstrerror(code));
×
557
          return code;
×
558
        }
559
      } else {
560
        pResInfo->initialized = true;
23,282,118✔
561
      }
562
    } else {
563
      init = true;
42,985,342✔
564
    }
565
  }
566
  return TSDB_CODE_SUCCESS;
405,793,759✔
567
}
568

569
void clearResultRowInitFlag(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
75,847,278✔
570
  for (int32_t i = 0; i < numOfOutput; ++i) {
192,703,070✔
571
    SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
116,855,792✔
572
    if (pResInfo == NULL) {
116,855,792!
573
      continue;
×
574
    }
575

576
    pResInfo->initialized = false;
116,855,792✔
577
    pResInfo->numOfRes = 0;
116,855,792✔
578
    pResInfo->isNullRes = 0;
116,855,792✔
579
    pResInfo->complete = false;
116,855,792✔
580
  }
581
}
75,847,278✔
582

583
int32_t doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo) {
35,275,902✔
584
  int32_t code = TSDB_CODE_SUCCESS;
35,275,902✔
585
  int32_t lino = 0;
35,275,902✔
586
  if (pFilterInfo == NULL || pBlock->info.rows == 0) {
35,275,902✔
587
    return TSDB_CODE_SUCCESS;
30,471,912✔
588
  }
589

590
  SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
4,803,990✔
591
  SColumnInfoData*   p = NULL;
4,803,884✔
592

593
  code = filterSetDataFromSlotId(pFilterInfo, &param1);
4,803,884✔
594
  QUERY_CHECK_CODE(code, lino, _err);
4,803,686!
595

596
  int32_t status = 0;
4,803,686✔
597
  code = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status);
4,803,686✔
598
  QUERY_CHECK_CODE(code, lino, _err);
4,803,829!
599

600
  code = extractQualifiedTupleByFilterResult(pBlock, p, status);
4,803,829✔
601
  QUERY_CHECK_CODE(code, lino, _err);
4,803,727!
602

603
  if (pColMatchInfo != NULL) {
4,803,727✔
604
    size_t size = taosArrayGetSize(pColMatchInfo->pList);
3,769,968✔
605
    for (int32_t i = 0; i < size; ++i) {
3,774,819✔
606
      SColMatchItem* pInfo = taosArrayGet(pColMatchInfo->pList, i);
3,772,776✔
607
      QUERY_CHECK_NULL(pInfo, code, lino, _err, terrno);
3,772,749!
608
      if (pInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
3,772,749✔
609
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pInfo->dstSlotId);
3,768,044✔
610
        QUERY_CHECK_NULL(pColData, code, lino, _err, terrno);
3,767,933!
611
        if (pColData->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
3,768,066✔
612
          code = blockDataUpdateTsWindow(pBlock, pInfo->dstSlotId);
3,767,973✔
613
          QUERY_CHECK_CODE(code, lino, _err);
3,768,036!
614
          break;
3,768,036✔
615
        }
616
      }
617
    }
618
  }
619
  code = blockDataCheck(pBlock);
4,803,838✔
620
  QUERY_CHECK_CODE(code, lino, _err);
4,803,725!
621
_err:
4,803,725✔
622
  if (code != TSDB_CODE_SUCCESS) {
4,803,725!
UNCOV
623
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
624
  }
625
  colDataDestroy(p);
4,803,725✔
626
  taosMemoryFree(p);
4,804,112!
627
  return code;
4,804,208✔
628
}
629

630
int32_t extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, int32_t status) {
4,803,849✔
631
  int32_t code = TSDB_CODE_SUCCESS;
4,803,849✔
632
  int8_t* pIndicator = (int8_t*)p->pData;
4,803,849✔
633
  if (status == FILTER_RESULT_ALL_QUALIFIED) {
4,803,849✔
634
    // here nothing needs to be done
635
  } else if (status == FILTER_RESULT_NONE_QUALIFIED) {
1,158,374✔
636
    code = trimDataBlock(pBlock, pBlock->info.rows, NULL);
493,967✔
637
    pBlock->info.rows = 0;
493,804✔
638
  } else if (status == FILTER_RESULT_PARTIAL_QUALIFIED) {
664,407!
639
    code = trimDataBlock(pBlock, pBlock->info.rows, (bool*)pIndicator);
664,485✔
640
  } else {
641
    qError("unknown filter result type: %d", status);
×
642
  }
643
  return code;
4,803,936✔
644
}
645

646
void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowEntryOffset) {
360,000,764✔
647
  bool returnNotNull = false;
360,000,764✔
648
  for (int32_t j = 0; j < numOfExprs; ++j) {
1,302,529,048✔
649
    SResultRowEntryInfo* pResInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
942,126,279✔
650
    if (!isRowEntryInitialized(pResInfo)) {
942,528,284✔
651
      continue;
219,655,677✔
652
    } else {
653
    }
654

655
    if (pRow->numOfRows < pResInfo->numOfRes) {
722,872,607✔
656
      pRow->numOfRows = pResInfo->numOfRes;
355,975,550✔
657
    }
658

659
    if (pCtx[j].isNotNullFunc) {
722,872,607✔
660
      returnNotNull = true;
206,274,065✔
661
    }
662
  }
663
  // if all expr skips all blocks, e.g. all null inputs for max function, output one row in final result.
664
  //  except for first/last, which require not null output, output no rows
665
  if (pRow->numOfRows == 0 && !returnNotNull) {
360,402,769✔
666
    pRow->numOfRows = 1;
28,947✔
667
  }
668
}
360,402,769✔
669

670
int32_t copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx,
347,702,441✔
671
                                 SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo) {
672
  int32_t code = TSDB_CODE_SUCCESS;
347,702,441✔
673
  int32_t lino = 0;
347,702,441✔
674
  for (int32_t j = 0; j < numOfExprs; ++j) {
1,217,181,351✔
675
    int32_t slotId = pExprInfo[j].base.resSchema.slotId;
870,117,707✔
676

677
    pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
870,117,707✔
678
    if (pCtx[j].fpSet.finalize) {
869,768,261✔
679
      if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_group_key") == 0 ||
525,869,894✔
680
          strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_group_const_value") == 0) {
444,160,060✔
681
        // for groupkey along with functions that output multiple lines(e.g. Histogram)
682
        // need to match groupkey result for each output row of that function.
683
        if (pCtx[j].resultInfo->numOfRes != 0) {
81,710,827!
684
          pCtx[j].resultInfo->numOfRes = pRow->numOfRows;
82,926,265✔
685
        }
686
      }
687

688
      code = blockDataEnsureCapacity(pBlock, pBlock->info.rows + pCtx[j].resultInfo->numOfRes);
525,869,894✔
689
      QUERY_CHECK_CODE(code, lino, _end);
525,821,591!
690

691
      code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
525,821,591✔
692
      if (TSDB_CODE_SUCCESS != code) {
527,637,834!
693
        qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
×
694
        QUERY_CHECK_CODE(code, lino, _end);
×
695
      }
696
    } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
343,898,367✔
697
      // do nothing
698
    } else {
699
      // expand the result into multiple rows. E.g., _wstart, top(k, 20)
700
      // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
701
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
221,581,136✔
702
      QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
220,992,565!
703
      char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
221,131,879✔
704
      for (int32_t k = 0; k < pRow->numOfRows; ++k) {
448,155,596✔
705
        code = colDataSetValOrCover(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
226,840,434✔
706
        QUERY_CHECK_CODE(code, lino, _end);
227,023,717!
707
      }
708
    }
709
  }
710

711
_end:
347,063,644✔
712
  if (code != TSDB_CODE_SUCCESS) {
347,063,644!
713
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
714
  }
715
  return code;
348,319,271✔
716
}
717

718
// todo refactor. SResultRow has direct pointer in miainfo
719
void finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup,
62,687,759✔
720
                        SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
721
  SFilePage* page = getBufPage(pBuf, resultRowPosition->pageId);
62,687,759✔
722
  if (page == NULL) {
62,691,039!
723
    qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
×
724
    T_LONG_JMP(pTaskInfo->env, terrno);
×
725
  }
726

727
  SResultRow* pRow = (SResultRow*)((char*)page + resultRowPosition->offset);
62,691,039✔
728

729
  SqlFunctionCtx* pCtx = pSup->pCtx;
62,691,039✔
730
  SExprInfo*      pExprInfo = pSup->pExprInfo;
62,691,039✔
731
  const int32_t*  rowEntryOffset = pSup->rowEntryInfoOffset;
62,691,039✔
732

733
  doUpdateNumOfRows(pCtx, pRow, pSup->numOfExprs, rowEntryOffset);
62,691,039✔
734
  if (pRow->numOfRows == 0) {
62,684,973!
UNCOV
735
    releaseBufPage(pBuf, page);
×
736
    return;
×
737
  }
738

739
  int32_t size = pBlock->info.capacity;
62,685,058✔
740
  while (pBlock->info.rows + pRow->numOfRows > size) {
62,762,784✔
741
    size = size * 1.25;
77,726✔
742
  }
743

744
  int32_t code = blockDataEnsureCapacity(pBlock, size);
62,685,058✔
745
  if (TAOS_FAILED(code)) {
62,686,713!
746
    releaseBufPage(pBuf, page);
×
747
    qError("%s ensure result data capacity failed, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
×
748
    T_LONG_JMP(pTaskInfo->env, code);
×
749
  }
750

751
  code = copyResultrowToDataBlock(pExprInfo, pSup->numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
62,686,713✔
752
  if (TAOS_FAILED(code)) {
62,672,515!
753
    releaseBufPage(pBuf, page);
×
754
    qError("%s copy result row to datablock failed, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
×
755
    T_LONG_JMP(pTaskInfo->env, code);
×
756
  }
757

758
  releaseBufPage(pBuf, page);
62,672,515✔
759
  pBlock->info.rows += pRow->numOfRows;
62,676,615✔
760
}
761

762
void doCopyToSDataBlockByHash(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
3,315,667✔
763
                              SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t threshold, bool ignoreGroup) {
764
  int32_t         code = TSDB_CODE_SUCCESS;
3,315,667✔
765
  int32_t         lino = 0;
3,315,667✔
766
  SExprInfo*      pExprInfo = pSup->pExprInfo;
3,315,667✔
767
  int32_t         numOfExprs = pSup->numOfExprs;
3,315,667✔
768
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
3,315,667✔
769
  SqlFunctionCtx* pCtx = pSup->pCtx;
3,315,667✔
770

771
  size_t  keyLen = 0;
3,315,667✔
772
  int32_t numOfRows = tSimpleHashGetSize(pHashmap);
3,315,667✔
773

774
  // begin from last iter
775
  void*   pData = pGroupResInfo->dataPos;
3,312,361✔
776
  int32_t iter = pGroupResInfo->iter;
3,312,361✔
777
  while ((pData = tSimpleHashIterate(pHashmap, pData, &iter)) != NULL) {
13,914,074✔
778
    void*               key = tSimpleHashGetKey(pData, &keyLen);
13,604,431✔
779
    SResultRowPosition* pos = pData;
13,604,431✔
780
    uint64_t            groupId = *(uint64_t*)key;
13,604,431✔
781

782
    SFilePage* page = getBufPage(pBuf, pos->pageId);
13,604,431✔
783
    if (page == NULL) {
13,591,709!
784
      qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
×
785
      T_LONG_JMP(pTaskInfo->env, terrno);
×
786
    }
787

788
    SResultRow* pRow = (SResultRow*)((char*)page + pos->offset);
13,591,709✔
789

790
    doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
13,591,709✔
791

792
    // no results, continue to check the next one
793
    if (pRow->numOfRows == 0) {
13,567,886!
794
      pGroupResInfo->index += 1;
×
795
      pGroupResInfo->iter = iter;
×
796
      pGroupResInfo->dataPos = pData;
×
797

798
      releaseBufPage(pBuf, page);
×
799
      continue;
×
800
    }
801

802
    if (!ignoreGroup) {
13,567,886✔
803
      if (pBlock->info.id.groupId == 0) {
6,008,395✔
804
        pBlock->info.id.groupId = groupId;
3,036,858✔
805
      } else {
806
        // current value belongs to different group, it can't be packed into one datablock
807
        if (pBlock->info.id.groupId != groupId) {
2,971,537!
808
          releaseBufPage(pBuf, page);
2,989,655✔
809
          break;
2,989,721✔
810
        }
811
      }
812
    }
813

814
    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
10,578,231!
815
      uint32_t newSize = pBlock->info.rows + pRow->numOfRows + ((numOfRows - iter) > 1 ? 1 : 0);
×
816
      code = blockDataEnsureCapacity(pBlock, newSize);
×
817
      QUERY_CHECK_CODE(code, lino, _end);
×
818
      qDebug("datablock capacity not sufficient, expand to required:%d, current capacity:%d, %s", newSize,
×
819
             pBlock->info.capacity, GET_TASKID(pTaskInfo));
820
      // todo set the pOperator->resultInfo size
821
    }
822

823
    pGroupResInfo->index += 1;
10,578,231✔
824
    pGroupResInfo->iter = iter;
10,578,231✔
825
    pGroupResInfo->dataPos = pData;
10,578,231✔
826

827
    code = copyResultrowToDataBlock(pExprInfo, numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
10,578,231✔
828
    releaseBufPage(pBuf, page);
10,602,476✔
829
    QUERY_CHECK_CODE(code, lino, _end);
10,601,817!
830
    pBlock->info.rows += pRow->numOfRows;
10,601,817✔
831
    if (pBlock->info.rows >= threshold) {
10,601,817✔
832
      break;
104✔
833
    }
834
  }
835

836
  qDebug("%s result generated, rows:%" PRId64 ", groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
3,294,509✔
837
         pBlock->info.id.groupId);
838
  pBlock->info.dataLoad = 1;
3,294,513✔
839
  code = blockDataUpdateTsWindow(pBlock, 0);
3,294,513✔
840
  QUERY_CHECK_CODE(code, lino, _end);
3,293,008!
841

842
_end:
3,293,008✔
843
  if (code != TSDB_CODE_SUCCESS) {
3,293,008!
844
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
845
    T_LONG_JMP(pTaskInfo->env, code);
×
846
  }
847
}
3,293,008✔
848

849
void doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
3,221,534✔
850
                        SGroupResInfo* pGroupResInfo, int32_t threshold, bool ignoreGroup, int64_t minWindowSize) {
851
  int32_t         code = TSDB_CODE_SUCCESS;
3,221,534✔
852
  int32_t         lino = 0;
3,221,534✔
853
  SExprInfo*      pExprInfo = pSup->pExprInfo;
3,221,534✔
854
  int32_t         numOfExprs = pSup->numOfExprs;
3,221,534✔
855
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
3,221,534✔
856
  SqlFunctionCtx* pCtx = pSup->pCtx;
3,221,534✔
857

858
  int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
3,221,534✔
859

860
  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
201,969,016✔
861
    SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
199,278,720✔
862
    SFilePage*  page = getBufPage(pBuf, pPos->pos.pageId);
199,270,837✔
863
    if (page == NULL) {
198,821,307!
864
      qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
×
865
      T_LONG_JMP(pTaskInfo->env, terrno);
×
866
    }
867

868
    SResultRow* pRow = (SResultRow*)((char*)page + pPos->pos.offset);
198,821,307✔
869

870
    doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
198,821,307✔
871

872
    // no results, continue to check the next one
873
    if (pRow->numOfRows == 0) {
198,168,251!
874
      pGroupResInfo->index += 1;
×
875
      releaseBufPage(pBuf, page);
×
876
      continue;
806✔
877
    }
878
    // skip the window which is less than the windowMinSize
879
    if (pRow->win.ekey - pRow->win.skey < minWindowSize) {
198,261,337✔
880
      qDebug("skip small window, groupId: %" PRId64 ", windowSize: %" PRId64 ", minWindowSize: %" PRId64, pPos->groupId,
20✔
881
             pRow->win.ekey - pRow->win.skey, minWindowSize);
882
      pGroupResInfo->index += 1;
20✔
883
      releaseBufPage(pBuf, page);
20✔
884
      continue;
20✔
885
    }
886

887
    if (!ignoreGroup) {
198,261,317✔
888
      if (pBlock->info.id.groupId == 0) {
132,527,072✔
889
        pBlock->info.id.groupId = pPos->groupId;
117,165,411✔
890
      } else {
891
        // current value belongs to different group, it can't be packed into one datablock
892
        if (pBlock->info.id.groupId != pPos->groupId) {
15,361,661✔
893
          releaseBufPage(pBuf, page);
124,058✔
894
          break;
124,053✔
895
        }
896
      }
897
    }
898

899
    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
198,137,259✔
900
      uint32_t newSize = pBlock->info.rows + pRow->numOfRows + ((numOfRows - i) > 1 ? 1 : 0);
812✔
901
      code = blockDataEnsureCapacity(pBlock, newSize);
812✔
902
      QUERY_CHECK_CODE(code, lino, _end);
812!
903
      qDebug("datablock capacity not sufficient, expand to required:%d, current capacity:%d, %s", newSize,
812!
904
             pBlock->info.capacity, GET_TASKID(pTaskInfo));
905
      // todo set the pOperator->resultInfo size
906
    }
907

908
    pGroupResInfo->index += 1;
198,137,259✔
909
    code = copyResultrowToDataBlock(pExprInfo, numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
198,137,259✔
910
    releaseBufPage(pBuf, page);
199,453,117✔
911
    QUERY_CHECK_CODE(code, lino, _end);
199,128,482!
912

913
    pBlock->info.rows += pRow->numOfRows;
199,128,482✔
914
    if (pBlock->info.rows >= threshold) {
199,128,482✔
915
      break;
382,412✔
916
    }
917
  }
918

919
  qDebug("%s result generated, rows:%" PRId64 ", groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
3,196,761✔
920
         pBlock->info.id.groupId);
921
  pBlock->info.dataLoad = 1;
3,196,765✔
922
  code = blockDataUpdateTsWindow(pBlock, 0);
3,196,765✔
923
  QUERY_CHECK_CODE(code, lino, _end);
3,221,815!
924

925
_end:
3,221,815✔
926
  if (code != TSDB_CODE_SUCCESS) {
3,221,815!
927
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
928
    T_LONG_JMP(pTaskInfo->env, code);
×
929
  }
930
}
3,221,815✔
931

932
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
4,160,621✔
933
                            SDiskbasedBuf* pBuf) {
934
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
4,160,621✔
935
  SSDataBlock*   pBlock = pbInfo->pRes;
4,160,621✔
936

937
  // set output datablock version
938
  pBlock->info.version = pTaskInfo->version;
4,160,621✔
939

940
  blockDataCleanup(pBlock);
4,160,621✔
941
  if (!hasRemainResults(pGroupResInfo)) {
4,161,389✔
942
    return;
938,662✔
943
  }
944

945
  // clear the existed group id
946
  pBlock->info.id.groupId = 0;
3,222,475✔
947
  if (!pbInfo->mergeResultBlock) {
3,222,475✔
948
    doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold,
2,112,704✔
949
                       false, getMinWindowSize(pOperator));
950
  } else {
951
    while (hasRemainResults(pGroupResInfo)) {
2,034,868✔
952
      doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold,
1,109,571✔
953
                         true, getMinWindowSize(pOperator));
954
      if (pBlock->info.rows >= pOperator->resultInfo.threshold) {
1,109,614✔
955
        break;
184,517✔
956
      }
957

958
      // clearing group id to continue to merge data that belong to different groups
959
      pBlock->info.id.groupId = 0;
925,097✔
960
    }
961

962
    // clear the group id info in SSDataBlock, since the client does not need it
963
    pBlock->info.id.groupId = 0;
1,109,604✔
964
  }
965
}
966

967
void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) {
10,140,763✔
968
  for (int32_t i = 0; i < numOfExprs; ++i) {
39,165,666✔
969
    SExprInfo* pExprInfo = &pExpr[i];
29,023,864✔
970
    for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
59,549,504✔
971
      if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_COLUMN) {
30,524,678✔
972
        taosMemoryFreeClear(pExprInfo->base.pParam[j].pCol);
27,604,919!
973
      } else if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_VALUE) {
2,919,759✔
974
        taosVariantDestroy(&pExprInfo->base.pParam[j].param);
2,270,531✔
975
      }
976
    }
977

978
    taosMemoryFree(pExprInfo->base.pParam);
29,024,826!
979
    taosMemoryFree(pExprInfo->pExpr);
29,025,655!
980
  }
981
}
10,141,802✔
982

983
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, int64_t* defaultBufsz) {
6,736,819✔
984
  *defaultPgsz = 4096;
6,736,819✔
985
  uint32_t last = *defaultPgsz;
6,736,819✔
986
  while (*defaultPgsz < rowSize * 4) {
8,119,318✔
987
    *defaultPgsz <<= 1u;
1,382,499✔
988
    if (*defaultPgsz < last) {
1,382,499!
989
      return TSDB_CODE_INVALID_PARA;
×
990
    }
991
    last = *defaultPgsz;
1,382,499✔
992
  }
993

994
  // The default buffer for each operator in query is 10MB.
995
  // at least four pages need to be in buffer
996
  // TODO: make this variable to be configurable.
997
  *defaultBufsz = 4096 * 2560;
6,736,819✔
998
  if ((*defaultBufsz) <= (*defaultPgsz)) {
6,736,819!
999
    (*defaultBufsz) = (*defaultPgsz) * 4;
×
1000
    if (*defaultBufsz < ((int64_t)(*defaultPgsz)) * 4) {
×
1001
      return TSDB_CODE_INVALID_PARA;
×
1002
    }
1003
  }
1004

1005
  return 0;
6,736,819✔
1006
}
1007

1008
void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows) {
15,517,995✔
1009
  if (numOfRows == 0) {
15,517,995!
1010
    numOfRows = 4096;
×
1011
  }
1012

1013
  pResultInfo->capacity = numOfRows;
15,517,995✔
1014
  pResultInfo->threshold = numOfRows * 0.75;
15,517,995✔
1015

1016
  if (pResultInfo->threshold == 0) {
15,517,995✔
1017
    pResultInfo->threshold = numOfRows;
4,001✔
1018
  }
1019
}
15,517,995✔
1020

1021
void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock) {
6,717,590✔
1022
  pInfo->pRes = pBlock;
6,717,590✔
1023
  initResultRowInfo(&pInfo->resultRowInfo);
6,717,590✔
1024
}
6,717,708✔
1025

1026
static void destroySqlFunctionCtx(SqlFunctionCtx* pCtx, SExprInfo* pExpr, int32_t numOfOutput) {
30,115,354✔
1027
  if (pCtx == NULL) {
30,115,354✔
1028
    return;
19,129,094✔
1029
  }
1030

1031
  for (int32_t i = 0; i < numOfOutput; ++i) {
39,747,179✔
1032
    if (pExpr != NULL) {
28,758,945!
1033
      SExprInfo* pExprInfo = &pExpr[i];
28,759,405✔
1034
      for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
59,019,196✔
1035
        if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_VALUE) {
30,259,463✔
1036
          taosMemoryFree(pCtx[i].input.pData[j]);
2,266,568!
1037
          taosMemoryFree(pCtx[i].input.pColumnDataAgg[j]);
2,266,550✔
1038
        }
1039
      }
1040
    }
1041
    for (int32_t j = 0; j < pCtx[i].numOfParams; ++j) {
59,017,891✔
1042
      taosVariantDestroy(&pCtx[i].param[j].param);
30,259,948✔
1043
    }
1044

1045
    taosMemoryFreeClear(pCtx[i].subsidiaries.pCtx);
28,757,943!
1046
    taosMemoryFreeClear(pCtx[i].subsidiaries.buf);
28,758,057✔
1047
    taosMemoryFree(pCtx[i].input.pData);
28,758,055✔
1048
    taosMemoryFree(pCtx[i].input.pColumnDataAgg);
28,761,056!
1049

1050
    if (pCtx[i].udfName != NULL) {
28,760,652✔
1051
      taosMemoryFree(pCtx[i].udfName);
32!
1052
    }
1053
  }
1054

1055
  taosMemoryFreeClear(pCtx);
10,988,234!
1056
  return;
10,988,326✔
1057
}
1058

1059
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr, SFunctionStateStore* pStore) {
10,611,865✔
1060
  pSup->pExprInfo = pExprInfo;
10,611,865✔
1061
  pSup->numOfExprs = numOfExpr;
10,611,865✔
1062
  if (pSup->pExprInfo != NULL) {
10,611,865✔
1063
    pSup->pCtx = createSqlFunctionCtx(pExprInfo, numOfExpr, &pSup->rowEntryInfoOffset, pStore);
8,249,866✔
1064
    if (pSup->pCtx == NULL) {
8,245,254!
1065
      return terrno;
×
1066
    }
1067
  }
1068

1069
  return TSDB_CODE_SUCCESS;
10,611,858✔
1070
}
1071

1072
void cleanupExprSupp(SExprSupp* pSupp) {
30,115,314✔
1073
  destroySqlFunctionCtx(pSupp->pCtx, pSupp->pExprInfo, pSupp->numOfExprs);
30,115,314✔
1074
  if (pSupp->pExprInfo != NULL) {
30,115,106✔
1075
    destroyExprInfo(pSupp->pExprInfo, pSupp->numOfExprs);
10,136,084✔
1076
    taosMemoryFreeClear(pSupp->pExprInfo);
10,136,042!
1077
  }
1078

1079
  if (pSupp->pFilterInfo != NULL) {
30,115,007✔
1080
    filterFreeInfo(pSupp->pFilterInfo);
2,841,572✔
1081
    pSupp->pFilterInfo = NULL;
2,841,586✔
1082
  }
1083

1084
  taosMemoryFree(pSupp->rowEntryInfoOffset);
30,115,021!
1085
}
30,115,056✔
1086

1087
void cleanupBasicInfo(SOptrBasicInfo* pInfo) {
6,746,104✔
1088
  blockDataDestroy(pInfo->pRes);
6,746,104✔
1089
  pInfo->pRes = NULL;
6,747,196✔
1090
}
6,747,196✔
1091

1092
bool groupbyTbname(SNodeList* pGroupList) {
4,471,808✔
1093
  bool bytbname = false;
4,471,808✔
1094
  SNode*pNode = NULL;
4,471,808✔
1095
  FOREACH(pNode, pGroupList) {
4,547,846✔
1096
    if (pNode->type == QUERY_NODE_FUNCTION) {
334,124✔
1097
      bytbname = (strcmp(((struct SFunctionNode*)pNode)->functionName, "tbname") == 0);
258,086✔
1098
      break;
258,086✔
1099
    }
1100
  }
1101
  return bytbname;
4,471,808✔
1102
}
1103

1104
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, SExecTaskInfo* pTask, SReadHandle* readHandle) {
8,113,002✔
1105
  switch (pNode->type) {
8,113,002✔
1106
    case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: {
254✔
1107
      SInserterParam* pInserterParam = taosMemoryCalloc(1, sizeof(SInserterParam));
254!
1108
      if (NULL == pInserterParam) {
254!
1109
        return terrno;
×
1110
      }
1111
      pInserterParam->readHandle = readHandle;
254✔
1112

1113
      *pParam = pInserterParam;
254✔
1114
      break;
254✔
1115
    }
1116
    case QUERY_NODE_PHYSICAL_PLAN_DELETE: {
67,229✔
1117
      SDeleterParam* pDeleterParam = taosMemoryCalloc(1, sizeof(SDeleterParam));
67,229!
1118
      if (NULL == pDeleterParam) {
67,235!
1119
        return terrno;
×
1120
      }
1121

1122
      SArray* pInfoList = NULL;
67,235✔
1123
      int32_t code = getTableListInfo(pTask, &pInfoList);
67,235✔
1124
      if (code != TSDB_CODE_SUCCESS || pInfoList == NULL) {
67,234!
1125
        taosMemoryFree(pDeleterParam);
2!
1126
        return code;
×
1127
      }
1128

1129
      STableListInfo* pTableListInfo = taosArrayGetP(pInfoList, 0);
67,232✔
1130
      taosArrayDestroy(pInfoList);
67,224✔
1131

1132
      pDeleterParam->suid = tableListGetSuid(pTableListInfo);
67,236✔
1133

1134
      // TODO extract uid list
1135
      int32_t numOfTables = 0;
67,232✔
1136
      code = tableListGetSize(pTableListInfo, &numOfTables);
67,232✔
1137
      if (code != TSDB_CODE_SUCCESS) {
67,232!
1138
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1139
        taosMemoryFree(pDeleterParam);
×
1140
        return code;
×
1141
      }
1142

1143
      pDeleterParam->pUidList = taosArrayInit(numOfTables, sizeof(uint64_t));
67,232✔
1144
      if (NULL == pDeleterParam->pUidList) {
67,237✔
1145
        taosMemoryFree(pDeleterParam);
5!
1146
        return terrno;
×
1147
      }
1148

1149
      for (int32_t i = 0; i < numOfTables; ++i) {
136,485✔
1150
        STableKeyInfo* pTable = tableListGetInfo(pTableListInfo, i);
69,252✔
1151
        if (!pTable) {
69,248!
1152
          taosArrayDestroy(pDeleterParam->pUidList);
×
1153
          taosMemoryFree(pDeleterParam);
×
1154
          return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1155
        }
1156
        void*          tmp = taosArrayPush(pDeleterParam->pUidList, &pTable->uid);
69,248✔
1157
        if (!tmp) {
69,253!
1158
          taosArrayDestroy(pDeleterParam->pUidList);
×
1159
          taosMemoryFree(pDeleterParam);
×
1160
          return terrno;
×
1161
        }
1162
      }
1163

1164
      *pParam = pDeleterParam;
67,233✔
1165
      break;
67,233✔
1166
    }
1167
    default:
8,045,519✔
1168
      break;
8,045,519✔
1169
  }
1170

1171
  return TSDB_CODE_SUCCESS;
8,113,006✔
1172
}
1173

1174
void streamOpReleaseState(SOperatorInfo* pOperator) {
369✔
1175
  SOperatorInfo* downstream = pOperator->pDownstream[0];
369✔
1176
  if (downstream->fpSet.releaseStreamStateFn) {
369!
1177
    downstream->fpSet.releaseStreamStateFn(downstream);
369✔
1178
  }
1179
}
369✔
1180

1181
void streamOpReloadState(SOperatorInfo* pOperator) {
369✔
1182
  SOperatorInfo* downstream = pOperator->pDownstream[0];
369✔
1183
  if (downstream->fpSet.reloadStreamStateFn) {
369!
1184
    downstream->fpSet.reloadStreamStateFn(downstream);
369✔
1185
  }
1186
}
369✔
1187

1188
void freeOperatorParamImpl(SOperatorParam* pParam, SOperatorParamType type) {
65,725✔
1189
  int32_t childrenNum = taosArrayGetSize(pParam->pChildren);
65,725✔
1190
  for (int32_t i = 0; i < childrenNum; ++i) {
65,725!
1191
    SOperatorParam* pChild = taosArrayGetP(pParam->pChildren, i);
×
1192
    freeOperatorParam(pChild, type);
×
1193
  }
1194

1195
  taosArrayDestroy(pParam->pChildren);
65,725✔
1196

1197
  taosMemoryFree(pParam->value);
65,725!
1198

1199
  taosMemoryFree(pParam);
65,725!
1200
}
65,725✔
1201

1202
void freeExchangeGetBasicOperatorParam(void* pParam) {
974✔
1203
  SExchangeOperatorBasicParam* pBasic = (SExchangeOperatorBasicParam*)pParam;
974✔
1204
  taosArrayDestroy(pBasic->uidList);
974✔
1205
  if (pBasic->colMap) {
974!
UNCOV
1206
    taosArrayDestroy(pBasic->colMap->colMap);
×
UNCOV
1207
    taosMemoryFreeClear(pBasic->colMap);
×
1208
  }
1209
}
974✔
1210

1211
void freeExchangeGetOperatorParam(SOperatorParam* pParam) {
605✔
1212
  SExchangeOperatorParam* pExcParam = (SExchangeOperatorParam*)pParam->value;
605✔
1213
  if (pExcParam->multiParams) {
605✔
1214
    SExchangeOperatorBatchParam* pExcBatch = (SExchangeOperatorBatchParam*)pParam->value;
557✔
1215
    tSimpleHashCleanup(pExcBatch->pBatchs);
557✔
1216
  } else {
1217
    freeExchangeGetBasicOperatorParam(&pExcParam->basic);
48✔
1218
  }
1219

1220
  freeOperatorParamImpl(pParam, OP_GET_PARAM);
605✔
1221
}
605✔
1222

1223
void freeExchangeNotifyOperatorParam(SOperatorParam* pParam) { freeOperatorParamImpl(pParam, OP_NOTIFY_PARAM); }
×
1224

1225
void freeGroupCacheGetOperatorParam(SOperatorParam* pParam) { freeOperatorParamImpl(pParam, OP_GET_PARAM); }
42,082✔
1226

1227
void freeGroupCacheNotifyOperatorParam(SOperatorParam* pParam) { freeOperatorParamImpl(pParam, OP_NOTIFY_PARAM); }
×
1228

1229
void freeMergeJoinGetOperatorParam(SOperatorParam* pParam) { freeOperatorParamImpl(pParam, OP_GET_PARAM); }
21,041✔
1230

UNCOV
1231
void freeMergeJoinNotifyOperatorParam(SOperatorParam* pParam) { freeOperatorParamImpl(pParam, OP_NOTIFY_PARAM); }
×
1232

1233
void freeTableScanGetOperatorParam(SOperatorParam* pParam) {
1,997✔
1234
  STableScanOperatorParam* pTableScanParam = (STableScanOperatorParam*)pParam->value;
1,997✔
1235
  taosArrayDestroy(pTableScanParam->pUidList);
1,997✔
1236
  if (pTableScanParam->pOrgTbInfo) {
1,997!
UNCOV
1237
    taosArrayDestroy(pTableScanParam->pOrgTbInfo->colMap);
×
UNCOV
1238
    taosMemoryFreeClear(pTableScanParam->pOrgTbInfo);
×
1239
  }
1240
  freeOperatorParamImpl(pParam, OP_GET_PARAM);
1,997✔
1241
}
1,997✔
1242

UNCOV
1243
void freeTableScanNotifyOperatorParam(SOperatorParam* pParam) { freeOperatorParamImpl(pParam, OP_NOTIFY_PARAM); }
×
1244

UNCOV
1245
void freeOpParamItem(void* pItem) {
×
UNCOV
1246
  SOperatorParam* pParam = *(SOperatorParam**)pItem;
×
UNCOV
1247
  pParam->reUse = false;
×
UNCOV
1248
  freeOperatorParam(pParam, OP_GET_PARAM);
×
UNCOV
1249
}
×
1250

UNCOV
1251
void freeVirtualTableScanGetOperatorParam(SOperatorParam* pParam) {
×
UNCOV
1252
  SVTableScanOperatorParam* pVTableScanParam = (SVTableScanOperatorParam*)pParam->value;
×
UNCOV
1253
  taosArrayDestroyEx(pVTableScanParam->pOpParamArray, freeOpParamItem);
×
UNCOV
1254
  freeOperatorParamImpl(pParam, OP_GET_PARAM);
×
1255
}
×
1256

1257
void freeVTableScanNotifyOperatorParam(SOperatorParam* pParam) { freeOperatorParamImpl(pParam, OP_NOTIFY_PARAM); }
×
1258

1259
void freeOperatorParam(SOperatorParam* pParam, SOperatorParamType type) {
5,495,993✔
1260
  if (NULL == pParam || pParam->reUse) {
5,495,993!
1261
    return;
5,430,268✔
1262
  }
1263

1264
  switch (pParam->opType) {
65,725!
1265
    case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE:
605✔
1266
      type == OP_GET_PARAM ? freeExchangeGetOperatorParam(pParam) : freeExchangeNotifyOperatorParam(pParam);
605!
1267
      break;
605✔
1268
    case QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE:
42,082✔
1269
      type == OP_GET_PARAM ? freeGroupCacheGetOperatorParam(pParam) : freeGroupCacheNotifyOperatorParam(pParam);
42,082!
1270
      break;
42,082✔
1271
    case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN:
21,041✔
1272
      type == OP_GET_PARAM ? freeMergeJoinGetOperatorParam(pParam) : freeMergeJoinNotifyOperatorParam(pParam);
21,041!
1273
      break;
21,041✔
1274
    case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:
1,997✔
1275
      type == OP_GET_PARAM ? freeTableScanGetOperatorParam(pParam) : freeTableScanNotifyOperatorParam(pParam);
1,997!
1276
      break;
1,997✔
UNCOV
1277
    case QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN:
×
UNCOV
1278
      type == OP_GET_PARAM ? freeVirtualTableScanGetOperatorParam(pParam) : freeVTableScanNotifyOperatorParam(pParam);
×
UNCOV
1279
      break;
×
UNCOV
1280
    default:
×
UNCOV
1281
      qError("unsupported op %d param, type %d", pParam->opType, type);
×
UNCOV
1282
      break;
×
1283
  }
1284
}
1285

1286
void freeResetOperatorParams(struct SOperatorInfo* pOperator, SOperatorParamType type, bool allFree) {
38,709,986✔
1287
  SOperatorParam**  ppParam = NULL;
38,709,986✔
1288
  SOperatorParam*** pppDownstramParam = NULL;
38,709,986✔
1289
  switch (type) {
38,709,986!
1290
    case OP_GET_PARAM:
19,368,280✔
1291
      ppParam = &pOperator->pOperatorGetParam;
19,368,280✔
1292
      pppDownstramParam = &pOperator->pDownstreamGetParams;
19,368,280✔
1293
      break;
19,368,280✔
1294
    case OP_NOTIFY_PARAM:
19,346,068✔
1295
      ppParam = &pOperator->pOperatorNotifyParam;
19,346,068✔
1296
      pppDownstramParam = &pOperator->pDownstreamNotifyParams;
19,346,068✔
1297
      break;
19,346,068✔
UNCOV
1298
    default:
×
UNCOV
1299
      return;
×
1300
  }
1301

1302
  if (*ppParam) {
38,714,348✔
1303
    freeOperatorParam(*ppParam, type);
21,041✔
1304
    *ppParam = NULL;
21,041✔
1305
  }
1306

1307
  if (*pppDownstramParam) {
38,714,348✔
1308
    for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
64,751✔
1309
      if ((*pppDownstramParam)[i]) {
42,082!
1310
        freeOperatorParam((*pppDownstramParam)[i], type);
×
UNCOV
1311
        (*pppDownstramParam)[i] = NULL;
×
1312
      }
1313
    }
1314
    if (allFree) {
22,669✔
1315
      taosMemoryFreeClear(*pppDownstramParam);
1,895!
1316
    }
1317
  }
1318
}
1319

1320
FORCE_INLINE int32_t getNextBlockFromDownstreamImpl(struct SOperatorInfo* pOperator, int32_t idx, bool clearParam,
31,604,663✔
1321
                                                    SSDataBlock** pResBlock) {
1322
  QRY_PARAM_CHECK(pResBlock);
31,604,663!
1323

1324
  int32_t code = 0;
31,604,663✔
1325
  if (pOperator->pDownstreamGetParams && pOperator->pDownstreamGetParams[idx]) {
31,604,663!
1326
    qDebug("DynOp: op %s start to get block from downstream %s", pOperator->name, pOperator->pDownstream[idx]->name);
63,522✔
1327
    code = pOperator->pDownstream[idx]->fpSet.getNextExtFn(pOperator->pDownstream[idx],
63,522✔
1328
                                                           pOperator->pDownstreamGetParams[idx], pResBlock);
63,522✔
1329
    if (clearParam && (code == 0)) {
63,522!
UNCOV
1330
      freeOperatorParam(pOperator->pDownstreamGetParams[idx], OP_GET_PARAM);
×
UNCOV
1331
      pOperator->pDownstreamGetParams[idx] = NULL;
×
1332
    }
1333

1334
    if (code) {
63,522!
UNCOV
1335
      qError("failed to get next data block from upstream at %s, line:%d code:%s", __func__, __LINE__, tstrerror(code));
×
1336
    }
1337
    return code;
63,522✔
1338
  }
1339

1340
  code = pOperator->pDownstream[idx]->fpSet.getNextFn(pOperator->pDownstream[idx], pResBlock);
31,541,141✔
1341
  if (code) {
31,549,621!
UNCOV
1342
    qError("failed to get next data block from upstream at %s, %d code:%s", __func__, __LINE__, tstrerror(code));
×
1343
  }
1344
  return code;
31,550,438✔
1345
}
1346

1347
bool compareVal(const char* v, const SStateKeys* pKey) {
18,241,474✔
1348
  if (IS_VAR_DATA_TYPE(pKey->type)) {
18,241,474!
1349
    if (varDataLen(v) != varDataLen(pKey->pData)) {
2,825!
UNCOV
1350
      return false;
×
1351
    } else {
1352
      return memcmp(varDataVal(v), varDataVal(pKey->pData), varDataLen(v)) == 0;
2,825✔
1353
    }
1354
  } else {
1355
    return memcmp(pKey->pData, v, pKey->bytes) == 0;
18,238,649✔
1356
  }
1357
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc