• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.66
/source/libs/executor/src/projectoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "functionMgt.h"
19
#include "operator.h"
20
#include "querytask.h"
21
#include "taoserror.h"
22
#include "tdatablock.h"
23

24
typedef struct SProjectOperatorInfo {
25
  SOptrBasicInfo binfo;
26
  SAggSupporter  aggSup;
27
  SArray*        pPseudoColInfo;
28
  SLimitInfo     limitInfo;
29
  bool           mergeDataBlocks;
30
  SSDataBlock*   pFinalRes;
31
  bool           inputIgnoreGroup;
32
  bool           outputIgnoreGroup;
33
} SProjectOperatorInfo;
34

35
typedef struct SIndefOperatorInfo {
36
  SOptrBasicInfo binfo;
37
  SAggSupporter  aggSup;
38
  SArray*        pPseudoColInfo;
39
  SExprSupp      scalarSup;
40
  uint64_t       groupId;
41
  SSDataBlock*   pNextGroupRes;
42
} SIndefOperatorInfo;
43

44
static int32_t      doGenerateSourceData(SOperatorInfo* pOperator);
45
static int32_t      doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
46
static int32_t      doApplyIndefinitFunction(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
47
static int32_t      setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols, SArray** pResList);
48
static int32_t      setFunctionResultOutput(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SAggSupporter* pSup,
49
                                            int32_t stage, int32_t numOfExprs);
50

51
static void destroyProjectOperatorInfo(void* param) {
1,795,161✔
52
  if (NULL == param) {
1,795,161!
53
    return;
×
54
  }
55

56
  SProjectOperatorInfo* pInfo = (SProjectOperatorInfo*)param;
1,795,161✔
57
  cleanupBasicInfo(&pInfo->binfo);
1,795,161✔
58
  cleanupAggSup(&pInfo->aggSup);
1,795,346✔
59
  taosArrayDestroy(pInfo->pPseudoColInfo);
1,795,343✔
60

61
  blockDataDestroy(pInfo->pFinalRes);
1,795,286✔
62
  taosMemoryFreeClear(param);
1,795,370!
63
}
64

65
static void destroyIndefinitOperatorInfo(void* param) {
117,404✔
66
  SIndefOperatorInfo* pInfo = (SIndefOperatorInfo*)param;
117,404✔
67
  if (pInfo == NULL) {
117,404!
68
    return;
×
69
  }
70

71
  cleanupBasicInfo(&pInfo->binfo);
117,404✔
72
  taosArrayDestroy(pInfo->pPseudoColInfo);
117,404✔
73
  cleanupAggSup(&pInfo->aggSup);
117,404✔
74
  cleanupExprSupp(&pInfo->scalarSup);
117,404✔
75

76
  taosMemoryFreeClear(param);
117,404!
77
}
78

79
void streamOperatorReleaseState(SOperatorInfo* pOperator) {
1,051✔
80
  SOperatorInfo* downstream = pOperator->pDownstream[0];
1,051✔
81
  if (downstream->fpSet.releaseStreamStateFn) {
1,051!
82
    downstream->fpSet.releaseStreamStateFn(downstream);
1,051✔
83
  }
84
}
1,051✔
85

86
void streamOperatorReloadState(SOperatorInfo* pOperator) {
1,051✔
87
  SOperatorInfo* downstream = pOperator->pDownstream[0];
1,051✔
88
  if (downstream->fpSet.reloadStreamStateFn) {
1,051!
89
    downstream->fpSet.reloadStreamStateFn(downstream);
1,051✔
90
  }
91
}
1,051✔
92

93
int32_t createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, SExecTaskInfo* pTaskInfo,
1,793,782✔
94
                                  SOperatorInfo** pOptrInfo) {
95
  QRY_PARAM_CHECK(pOptrInfo);
1,793,782!
96

97
  int32_t code = TSDB_CODE_SUCCESS;
1,793,782✔
98
  SProjectOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SProjectOperatorInfo));
1,793,782!
99
  SOperatorInfo*        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
1,793,865!
100
  if (pInfo == NULL || pOperator == NULL) {
1,794,047!
101
    code = terrno;
×
102
    goto _error;
×
103
  }
104

105
  pOperator->exprSupp.hasWindowOrGroup = false;
1,794,134✔
106
  pOperator->pTaskInfo = pTaskInfo;
1,794,134✔
107

108
  int32_t    lino = 0;
1,794,134✔
109

110
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pProjPhyNode->node.pOutputDataBlockDesc);
1,794,134✔
111
  TSDB_CHECK_NULL(pResBlock, code, lino, _error, terrno);
1,794,876!
112

113
  initLimitInfo(pProjPhyNode->node.pLimit, pProjPhyNode->node.pSlimit, &pInfo->limitInfo);
1,794,876✔
114

115
  pInfo->binfo.pRes = pResBlock;
1,795,121✔
116
  pInfo->pFinalRes = NULL;
1,795,121✔
117

118
  code = createOneDataBlock(pResBlock, false, &pInfo->pFinalRes);
1,795,121✔
119
  TSDB_CHECK_CODE(code, lino, _error);
1,794,668!
120

121
  pInfo->binfo.inputTsOrder = pProjPhyNode->node.inputTsOrder;
1,794,668✔
122
  pInfo->binfo.outputTsOrder = pProjPhyNode->node.outputTsOrder;
1,794,668✔
123
  pInfo->inputIgnoreGroup = pProjPhyNode->inputIgnoreGroup;
1,794,668✔
124
  pInfo->outputIgnoreGroup = pProjPhyNode->ignoreGroupId;
1,794,668✔
125

126
  if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM || pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {
1,794,668✔
127
    pInfo->mergeDataBlocks = false;
4,923✔
128
  } else {
129
    if (!pProjPhyNode->ignoreGroupId) {
1,789,745✔
130
      pInfo->mergeDataBlocks = false;
25,582✔
131
    } else {
132
      pInfo->mergeDataBlocks = pProjPhyNode->mergeDataBlock;
1,764,163✔
133
    }
134
  }
135

136
  int32_t numOfRows = 4096;
1,794,668✔
137
  size_t  keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
1,794,668✔
138

139
  // Make sure the size of SSDataBlock will never exceed the size of 2MB.
140
  int32_t TWOMB = 2 * 1024 * 1024;
1,794,668✔
141
  if (numOfRows * pResBlock->info.rowSize > TWOMB) {
1,794,668✔
142
    numOfRows = TWOMB / pResBlock->info.rowSize;
287,125✔
143
  }
144

145
  initResultSizeInfo(&pOperator->resultInfo, numOfRows);
1,794,668✔
146
  
147
  int32_t    numOfCols = 0;
1,794,485✔
148
  SExprInfo* pExprInfo = NULL;
1,794,485✔
149
  code = createExprInfo(pProjPhyNode->pProjections, NULL, &pExprInfo, &numOfCols);
1,794,485✔
150
  TSDB_CHECK_CODE(code, lino, _error);
1,794,855!
151
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
1,794,855✔
152
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
1,794,855✔
153
  TSDB_CHECK_CODE(code, lino, _error);
1,794,732!
154

155
  initBasicInfo(&pInfo->binfo, pResBlock);
1,794,732✔
156
  code = setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfCols);
1,794,240✔
157
  TSDB_CHECK_CODE(code, lino, _error);
1,793,788!
158

159
  code = filterInitFromNode((SNode*)pProjPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
1,793,788✔
160
  TSDB_CHECK_CODE(code, lino, _error);
1,793,659!
161

162
  code = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfCols, &pInfo->pPseudoColInfo);
1,793,659✔
163
  TSDB_CHECK_CODE(code, lino, _error);
1,794,760!
164

165
  setOperatorInfo(pOperator, "ProjectOperator", QUERY_NODE_PHYSICAL_PLAN_PROJECT, false, OP_NOT_OPENED, pInfo,
1,794,760✔
166
                  pTaskInfo);
167
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doProjectOperation, NULL, destroyProjectOperatorInfo,
1,794,424✔
168
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
169
  setOperatorStreamStateFn(pOperator, streamOperatorReleaseState, streamOperatorReloadState);
1,793,894✔
170

171
  if (NULL != downstream) {
1,794,127✔
172
    code = appendDownstream(pOperator, &downstream, 1);
1,792,560✔
173
    if (code != TSDB_CODE_SUCCESS) {
1,793,239!
174
      goto _error;
×
175
    }
176
  }
177

178
  *pOptrInfo = pOperator;
1,794,806✔
179
  return TSDB_CODE_SUCCESS;
1,794,806✔
180

181
_error:
×
182
  if (pInfo != NULL) destroyProjectOperatorInfo(pInfo);
×
183
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
184
  pTaskInfo->code = code;
×
185
  return code;
×
186
}
187

188
static int32_t discardGroupDataBlock(SSDataBlock* pBlock, SLimitInfo* pLimitInfo) {
5,572,150✔
189
  if (pLimitInfo->remainGroupOffset > 0) {
5,572,150✔
190
    // it is the first group
191
    if (pLimitInfo->currentGroupId == 0 || pLimitInfo->currentGroupId == pBlock->info.id.groupId) {
3,814✔
192
      pLimitInfo->currentGroupId = pBlock->info.id.groupId;
2,122✔
193
      return PROJECT_RETRIEVE_CONTINUE;
2,122✔
194
    } else if (pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
1,692!
195
      // now it is the data from a new group
196
      pLimitInfo->remainGroupOffset -= 1;
1,692✔
197
      pLimitInfo->currentGroupId = pBlock->info.id.groupId;
1,692✔
198

199
      // ignore data block in current group
200
      if (pLimitInfo->remainGroupOffset > 0) {
1,692✔
201
        return PROJECT_RETRIEVE_CONTINUE;
1,476✔
202
      }
203

204
      pLimitInfo->currentGroupId = 0;
216✔
205
    }
206
  }
207

208
  return PROJECT_RETRIEVE_DONE;
5,568,552✔
209
}
210

211
static int32_t setInfoForNewGroup(SSDataBlock* pBlock, SLimitInfo* pLimitInfo, SOperatorInfo* pOperator) {
5,568,479✔
212
  // remainGroupOffset == 0
213
  // here check for a new group data, we need to handle the data of the previous group.
214
  if (!(pLimitInfo->remainGroupOffset == 0 || pLimitInfo->remainGroupOffset == -1)) {
5,568,479!
215
    qError("project failed at: %s:%d", __func__, __LINE__);
×
216
    return TSDB_CODE_INVALID_PARA;
×
217
  }
218

219
  bool newGroup = false;
5,568,479✔
220
  if (0 == pBlock->info.id.groupId) {
5,568,479✔
221
    pLimitInfo->numOfOutputGroups = 1;
5,388,851✔
222
  } else if (pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
179,628✔
223
    pLimitInfo->currentGroupId = pBlock->info.id.groupId;
163,188✔
224
    pLimitInfo->numOfOutputGroups += 1;
163,188✔
225
    newGroup = true;
163,188✔
226
  } else {
227
    return PROJECT_RETRIEVE_CONTINUE;
16,440✔
228
  }
229

230
  if ((pLimitInfo->slimit.limit >= 0) && (pLimitInfo->slimit.limit < pLimitInfo->numOfOutputGroups)) {
5,552,039✔
231
    setOperatorCompleted(pOperator);
397✔
232
    return PROJECT_RETRIEVE_DONE;
397✔
233
  }
234

235
  // reset the value for a new group data
236
  // existing rows that belongs to previous group.
237
  if (newGroup) {
5,551,642✔
238
    resetLimitInfoForNextGroup(pLimitInfo);
162,795✔
239
  }
240

241
  return PROJECT_RETRIEVE_CONTINUE;
5,550,992✔
242
}
243

244
// todo refactor
245
static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SSDataBlock* pBlock,
5,568,683✔
246
                                    SOperatorInfo* pOperator) {
247
  // set current group id
248
  pLimitInfo->currentGroupId = groupId;
5,568,683✔
249
  bool limitReached = applyLimitOffset(pLimitInfo, pBlock, pOperator->pTaskInfo);
5,568,683✔
250
  if (pBlock->info.rows == 0 && 0 != pLimitInfo->limit.limit) {
5,568,976✔
251
    return PROJECT_RETRIEVE_CONTINUE;
18,307✔
252
  } else {
253
    if (limitReached && (pLimitInfo->slimit.limit >= 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
5,550,669✔
254
      setOperatorCompleted(pOperator);
118✔
255
    } else if (limitReached && groupId == 0) {
5,550,551✔
256
      setOperatorCompleted(pOperator);
106,022✔
257
    }
258
  }
259

260
  return PROJECT_RETRIEVE_DONE;
5,550,487✔
261
}
262

263
int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
4,922,518✔
264
  QRY_PARAM_CHECK(pResBlock);
4,922,518!
265

266
  SProjectOperatorInfo* pProjectInfo = pOperator->info;
4,922,518✔
267
  SOptrBasicInfo*       pInfo = &pProjectInfo->binfo;
4,922,518✔
268
  SExprSupp*            pSup = &pOperator->exprSupp;
4,922,518✔
269
  SSDataBlock*          pRes = pInfo->pRes;
4,922,518✔
270
  SSDataBlock*          pFinalRes = pProjectInfo->pFinalRes;
4,922,518✔
271
  int32_t               code = 0;
4,922,518✔
272
  int32_t               lino = 0;
4,922,518✔
273
  int64_t               st = 0;
4,922,518✔
274
  int32_t               order = pInfo->inputTsOrder;
4,922,518✔
275
  int32_t               scanFlag = 0;
4,922,518✔
276

277
  blockDataCleanup(pFinalRes);
4,922,518✔
278
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
4,922,728✔
279

280
  if (pOperator->status == OP_EXEC_DONE) {
4,922,728✔
281
    return code;
999,529✔
282
  }
283

284
  if (pOperator->cost.openCost == 0) {
3,923,199✔
285
    st = taosGetTimestampUs();
1,794,791✔
286
  }
287

288
  SOperatorInfo* downstream = pOperator->numOfDownstream > 0 ? pOperator->pDownstream[0] : NULL;
3,924,034✔
289
  SLimitInfo*    pLimitInfo = &pProjectInfo->limitInfo;
3,924,034✔
290

291
  if (downstream == NULL) {
3,924,034✔
292
    code = doGenerateSourceData(pOperator);
1,657✔
293
    QUERY_CHECK_CODE(code, lino, _end);
1,657!
294

295
    if (pProjectInfo->outputIgnoreGroup) {
1,657!
296
      pRes->info.id.groupId = 0;
1,657✔
297
    }
298

299
    *pResBlock = (pRes->info.rows > 0)? pRes:NULL;
1,657!
300
    return code;
1,657✔
301
  }
302

303
  while (1) {
3,428,554✔
304
    while (1) {
21,905✔
305
      blockDataCleanup(pRes);
7,372,836✔
306

307
      // The downstream exec may change the value of the newgroup, so use a local variable instead.
308
      SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
7,370,795✔
309
      if (pBlock == NULL) {
7,373,582✔
310
        qDebug("set op close, exec %d, status %d rows %" PRId64 , pTaskInfo->execModel, pOperator->status, pFinalRes->info.rows);
1,795,206✔
311
        setOperatorCompleted(pOperator);
1,795,208✔
312
        break;
1,795,257✔
313
      }
314
//      if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {
315
//        qDebug("set status recv");
316
//        pOperator->status = OP_EXEC_RECV;
317
//      }
318

319
      // for stream interval
320
      if (pBlock->info.type == STREAM_RETRIEVE || pBlock->info.type == STREAM_DELETE_RESULT ||
5,578,376✔
321
          pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_CREATE_CHILD_TABLE ||
5,577,130!
322
          pBlock->info.type == STREAM_CHECKPOINT) {
5,573,141✔
323

324
        *pResBlock = pBlock;
5,866✔
325
        return code;
5,866✔
326
      }
327

328
      if (pProjectInfo->inputIgnoreGroup) {
5,572,510✔
329
        pBlock->info.id.groupId = 0;
454,643✔
330
      }
331

332
      int32_t status = discardGroupDataBlock(pBlock, pLimitInfo);
5,572,510✔
333
      if (status == PROJECT_RETRIEVE_CONTINUE) {
5,572,054✔
334
        continue;
3,598✔
335
      }
336

337
      (void) setInfoForNewGroup(pBlock, pLimitInfo, pOperator);
5,568,456✔
338
      if (pOperator->status == OP_EXEC_DONE) {
5,567,756✔
339
        break;
397✔
340
      }
341

342
      if (pProjectInfo->mergeDataBlocks) {
5,567,359✔
343
        pFinalRes->info.scanFlag = scanFlag = pBlock->info.scanFlag;
4,381,849✔
344
      } else {
345
        pRes->info.scanFlag = scanFlag = pBlock->info.scanFlag;
1,185,510✔
346
      }
347

348
      code = setInputDataBlock(pSup, pBlock, order, scanFlag, false);
5,567,359✔
349
      QUERY_CHECK_CODE(code, lino, _end);
5,567,925!
350

351
      code = blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
5,567,925✔
352
      QUERY_CHECK_CODE(code, lino, _end);
5,568,666!
353

354
      code = projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs,
5,568,666✔
355
                                   pProjectInfo->pPseudoColInfo);
356
      QUERY_CHECK_CODE(code, lino, _end);
5,567,974✔
357

358
      status = doIngroupLimitOffset(pLimitInfo, pBlock->info.id.groupId, pInfo->pRes, pOperator);
5,566,814✔
359
      if (status == PROJECT_RETRIEVE_CONTINUE) {
5,567,081✔
360
        continue;
18,307✔
361
      }
362

363
      break;
5,548,774✔
364
    }
365

366
    if (pProjectInfo->mergeDataBlocks) {
7,344,428✔
367
      if (pRes->info.rows > 0) {
5,385,445✔
368
        pFinalRes->info.id.groupId = 0;  // clear groupId
4,362,514✔
369
        pFinalRes->info.version = pRes->info.version;
4,362,514✔
370

371
        // continue merge data, ignore the group id
372
        code = blockDataMerge(pFinalRes, pRes);
4,362,514✔
373
        QUERY_CHECK_CODE(code, lino, _end);
4,362,822!
374

375
        if (pFinalRes->info.rows + pRes->info.rows <= pOperator->resultInfo.threshold && (pOperator->status != OP_EXEC_DONE)) {
4,362,822✔
376
          continue;
3,428,584✔
377
        }
378
      }
379

380
      // do apply filter
381
      code = doFilter(pFinalRes, pOperator->exprSupp.pFilterInfo, NULL);
1,957,169✔
382
      QUERY_CHECK_CODE(code, lino, _end);
1,957,367!
383

384
      // when apply the limit/offset for each group, pRes->info.rows may be 0, due to limit constraint.
385
      if (pFinalRes->info.rows > 0 || (pOperator->status == OP_EXEC_DONE)) {
1,957,367!
386
        qDebug("project return %" PRId64 " rows, status %d", pFinalRes->info.rows, pOperator->status);
1,957,397✔
387
        break;
1,957,364✔
388
      }
389
    } else {
390
      // do apply filter
391
      if (pRes->info.rows > 0) {
1,958,983✔
392
        code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
1,185,812✔
393
        QUERY_CHECK_CODE(code, lino, _end);
1,185,850!
394

395
        if (pRes->info.rows == 0) {
1,185,850!
396
          continue;
×
397
        }
398
      }
399

400
      // no results generated
401
      break;
1,959,021✔
402
    }
403
  }
404

405
  SSDataBlock* p = pProjectInfo->mergeDataBlocks ? pFinalRes : pRes;
3,916,385✔
406
  pOperator->resultInfo.totalRows += p->info.rows;
3,916,385✔
407
  p->info.dataLoad = 1;
3,916,385✔
408

409
  if (pOperator->cost.openCost == 0) {
3,916,385✔
410
    pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
1,791,173✔
411
  }
412

413
  if (pProjectInfo->outputIgnoreGroup) {
3,916,499✔
414
    p->info.id.groupId = 0;
3,782,955✔
415
  }
416

417
  if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
3,916,499✔
418
    printDataBlock(p, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
29,471✔
419
  }
420

421
  *pResBlock = (p->info.rows > 0)? p:NULL;
3,916,730✔
422

423
_end:
3,917,890✔
424
  if (code != TSDB_CODE_SUCCESS) {
3,917,890✔
425
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
1,160!
426
    pTaskInfo->code = code;
1,160✔
427
    T_LONG_JMP(pTaskInfo->env, code);
1,160!
428
  }
429
  return code;
3,916,730✔
430
}
431

432
int32_t createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode,
117,396✔
433
                                                 SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
434
  QRY_PARAM_CHECK(pOptrInfo);
117,396!
435
  int32_t code = 0;
117,396✔
436
  int32_t lino = 0;
117,396✔
437
  int32_t numOfRows = 4096;
117,396✔
438
  size_t  keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
117,396✔
439

440
  SIndefOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIndefOperatorInfo));
117,396!
441
  SOperatorInfo*      pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
117,403!
442
  if (pInfo == NULL || pOperator == NULL) {
117,401!
443
    code = terrno;
×
444
    goto _error;
×
445
  }
446

447
  pOperator->pTaskInfo = pTaskInfo;
117,401✔
448

449
  SExprSupp* pSup = &pOperator->exprSupp;
117,401✔
450
  pSup->hasWindowOrGroup = false;
117,401✔
451

452
  SIndefRowsFuncPhysiNode* pPhyNode = (SIndefRowsFuncPhysiNode*)pNode;
117,401✔
453

454
  if (pPhyNode->pExprs != NULL) {
117,401✔
455
    int32_t    num = 0;
115✔
456
    SExprInfo* pSExpr = NULL;
115✔
457
    code = createExprInfo(pPhyNode->pExprs, NULL, &pSExpr, &num);
115✔
458
    QUERY_CHECK_CODE(code, lino, _error);
115!
459

460
    code = initExprSupp(&pInfo->scalarSup, pSExpr, num, &pTaskInfo->storageAPI.functionStore);
115✔
461
    if (code != TSDB_CODE_SUCCESS) {
115!
462
      goto _error;
×
463
    }
464
  }
465

466
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->node.pOutputDataBlockDesc);
117,401✔
467
  TSDB_CHECK_NULL(pResBlock, code, lino, _error, terrno);
117,404!
468

469
  // Make sure the size of SSDataBlock will never exceed the size of 2MB.
470
  int32_t TWOMB = 2 * 1024 * 1024;
117,404✔
471
  if (numOfRows * pResBlock->info.rowSize > TWOMB) {
117,404!
472
    numOfRows = TWOMB / pResBlock->info.rowSize;
×
473
  }
474

475
  initBasicInfo(&pInfo->binfo, pResBlock);
117,404✔
476
  initResultSizeInfo(&pOperator->resultInfo, numOfRows);
117,403✔
477
  code = blockDataEnsureCapacity(pResBlock, numOfRows);
117,404✔
478
  TSDB_CHECK_CODE(code, lino, _error);
117,401!
479

480
  int32_t    numOfExpr = 0;
117,401✔
481
  SExprInfo* pExprInfo = NULL;
117,401✔
482
  code = createExprInfo(pPhyNode->pFuncs, NULL, &pExprInfo, &numOfExpr);
117,401✔
483
  TSDB_CHECK_CODE(code, lino, _error);
117,398!
484

485
  code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str,
117,398✔
486
                            pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
117,398✔
487
  TSDB_CHECK_CODE(code, lino, _error);
117,401!
488

489
  code = setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr);
117,401✔
490
  TSDB_CHECK_CODE(code, lino, _error);
117,395!
491

492
  code = filterInitFromNode((SNode*)pPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
117,395✔
493
  TSDB_CHECK_CODE(code, lino, _error);
117,395!
494

495
  pInfo->binfo.pRes = pResBlock;
117,395✔
496
  pInfo->binfo.inputTsOrder = pNode->inputTsOrder;
117,395✔
497
  pInfo->binfo.outputTsOrder = pNode->outputTsOrder;
117,395✔
498
  code = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr, &pInfo->pPseudoColInfo);
117,395✔
499
  TSDB_CHECK_CODE(code, lino, _error);
117,400!
500

501
  setOperatorInfo(pOperator, "IndefinitOperator", QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC, false, OP_NOT_OPENED, pInfo,
117,400✔
502
                  pTaskInfo);
503
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doApplyIndefinitFunction, NULL, destroyIndefinitOperatorInfo,
117,399✔
504
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
505

506
  code = appendDownstream(pOperator, &downstream, 1);
117,401✔
507
  if (code != TSDB_CODE_SUCCESS) {
117,403!
508
    goto _error;
×
509
  }
510

511
  *pOptrInfo = pOperator;
117,403✔
512
  return TSDB_CODE_SUCCESS;
117,403✔
513

514
_error:
×
515
  if (pInfo != NULL) destroyIndefinitOperatorInfo(pInfo);
×
516
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
517
  pTaskInfo->code = code;
×
518
  return code;
×
519
}
520

521
static void doHandleDataBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, SOperatorInfo* downstream,
1,686,876✔
522
                              SExecTaskInfo* pTaskInfo) {
523
  SIndefOperatorInfo* pIndefInfo = pOperator->info;
1,686,876✔
524
  SOptrBasicInfo*     pInfo = &pIndefInfo->binfo;
1,686,876✔
525
  SExprSupp*          pSup = &pOperator->exprSupp;
1,686,876✔
526

527
  int32_t order = pInfo->inputTsOrder;
1,686,876✔
528
  int32_t scanFlag = pBlock->info.scanFlag;
1,686,876✔
529
  int32_t code = TSDB_CODE_SUCCESS;
1,686,876✔
530

531
  // there is an scalar expression that needs to be calculated before apply the group aggregation.
532
  SExprSupp* pScalarSup = &pIndefInfo->scalarSup;
1,686,876✔
533
  if (pScalarSup->pExprInfo != NULL) {
1,686,876✔
534
    code = projectApplyFunctions(pScalarSup->pExprInfo, pBlock, pBlock, pScalarSup->pCtx, pScalarSup->numOfExprs,
538✔
535
                                 pIndefInfo->pPseudoColInfo);
536
    if (code != TSDB_CODE_SUCCESS) {
538!
537
      T_LONG_JMP(pTaskInfo->env, code);
×
538
    }
539
  }
540

541
  code = setInputDataBlock(pSup, pBlock, order, scanFlag, false);
1,686,876✔
542
  if (code) {
1,686,877!
543
    T_LONG_JMP(pTaskInfo->env, code);
×
544
  }
545

546
  code = blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
1,686,877✔
547
  if (code != TSDB_CODE_SUCCESS) {
1,686,877!
548
    T_LONG_JMP(pTaskInfo->env, code);
×
549
  }
550

551
  code = projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs,
1,686,877✔
552
                               pIndefInfo->pPseudoColInfo);
553
  if (code != TSDB_CODE_SUCCESS) {
1,686,877✔
554
    T_LONG_JMP(pTaskInfo->env, code);
39!
555
  }
556
}
1,686,838✔
557

558
int32_t doApplyIndefinitFunction(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
275,892✔
559
  QRY_PARAM_CHECK(pResBlock);
275,892!
560
  SIndefOperatorInfo* pIndefInfo = pOperator->info;
275,892✔
561
  SOptrBasicInfo*     pInfo = &pIndefInfo->binfo;
275,892✔
562
  SExprSupp*          pSup = &pOperator->exprSupp;
275,892✔
563
  int64_t             st = 0;
275,892✔
564
  int32_t             code = TSDB_CODE_SUCCESS;
275,892✔
565
  int32_t             lino = 0;
275,892✔
566
  SSDataBlock*        pRes = pInfo->pRes;
275,892✔
567

568
  blockDataCleanup(pRes);
275,892✔
569

570
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
275,896✔
571
  if (pOperator->status == OP_EXEC_DONE) {
275,896✔
572
    return code;
105,773✔
573
  }
574

575
  if (pOperator->cost.openCost == 0) {
170,123✔
576
    st = taosGetTimestampUs();
117,400✔
577
  }
578

579
  SOperatorInfo* downstream = pOperator->pDownstream[0];
170,126✔
580

581
  while (1) {
395,683✔
582
    // here we need to handle the existsed group results
583
    if (pIndefInfo->pNextGroupRes != NULL) {  // todo extract method
565,809✔
584
      for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
169,942✔
585
        SqlFunctionCtx* pCtx = &pSup->pCtx[k];
126,283✔
586

587
        SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
126,283✔
588
        pResInfo->initialized = false;
126,283✔
589
        pCtx->pOutput = NULL;
126,283✔
590
      }
591

592
      doHandleDataBlock(pOperator, pIndefInfo->pNextGroupRes, downstream, pTaskInfo);
43,659✔
593
      pIndefInfo->pNextGroupRes = NULL;
43,659✔
594
    }
595

596
    if (pInfo->pRes->info.rows < pOperator->resultInfo.threshold) {
565,809✔
597
      while (1) {
1,238,435✔
598
        // The downstream exec may change the value of the newgroup, so use a local variable instead.
599
        SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
1,804,239✔
600
        if (pBlock == NULL) {
1,804,241✔
601
          setOperatorCompleted(pOperator);
117,148✔
602
          break;
117,148✔
603
        }
604
        pInfo->pRes->info.scanFlag = pBlock->info.scanFlag;
1,687,093✔
605

606
        if (pIndefInfo->groupId == 0 && pBlock->info.id.groupId != 0) {
1,687,093✔
607
          pIndefInfo->groupId = pBlock->info.id.groupId;  // this is the initial group result
3,163✔
608
        } else {
609
          if (pIndefInfo->groupId != pBlock->info.id.groupId) {  // reset output buffer and computing status
1,683,930✔
610
            pIndefInfo->groupId = pBlock->info.id.groupId;
43,876✔
611
            pIndefInfo->pNextGroupRes = pBlock;
43,876✔
612
            break;
43,876✔
613
          }
614
        }
615

616
        doHandleDataBlock(pOperator, pBlock, downstream, pTaskInfo);
1,643,217✔
617
        if (pInfo->pRes->info.rows >= pOperator->resultInfo.threshold) {
1,643,180✔
618
          break;
404,745✔
619
        }
620
      }
621
    }
622

623
    code = doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
565,774✔
624
    QUERY_CHECK_CODE(code, lino, _end);
565,769!
625

626
    size_t rows = pInfo->pRes->info.rows;
565,769✔
627
    if (rows > 0 || pOperator->status == OP_EXEC_DONE) {
565,769✔
628
      break;
629
    } else {
630
      blockDataCleanup(pInfo->pRes);
395,683✔
631
    }
632
  }
633

634
  size_t rows = pInfo->pRes->info.rows;
170,086✔
635
  pOperator->resultInfo.totalRows += rows;
170,086✔
636

637
  if (pOperator->cost.openCost == 0) {
170,086✔
638
    pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
117,365✔
639
  }
640

641
  *pResBlock = (rows > 0) ? pInfo->pRes : NULL;
170,086✔
642

643
_end:
170,086✔
644
  if (code != TSDB_CODE_SUCCESS) {
170,086!
645
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
646
    pTaskInfo->code = code;
×
647
    T_LONG_JMP(pTaskInfo->env, code);
×
648
  }
649
  return code;
170,086✔
650
}
651

652
int32_t initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size) {
1,910,429✔
653
  int32_t code = TSDB_CODE_SUCCESS;
1,910,429✔
654
  for (int32_t j = 0; j < size; ++j) {
12,929,519✔
655
    struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(&pCtx[j]);
11,018,741✔
656
    if (isRowEntryInitialized(pResInfo) || fmIsPseudoColumnFunc(pCtx[j].functionId) || pCtx[j].functionId == -1 ||
11,734,378!
657
        fmIsScalarFunc(pCtx[j].functionId)) {
715,689✔
658
      continue;
10,839,915✔
659
    }
660

661
    code = pCtx[j].fpSet.init(&pCtx[j], pCtx[j].resultInfo);
179,833✔
662
    if (code) {
179,175!
663
      return code;
×
664
    }
665
  }
666

667
  return 0;
1,910,778✔
668
}
669

670
/*
671
 * The start of each column SResultRowEntryInfo is denote by RowCellInfoOffset.
672
 * Note that in case of top/bottom query, the whole multiple rows of result is treated as only one row of results.
673
 * +------------+-----------------result column 1------------+------------------result column 2-----------+
674
 * | SResultRow | SResultRowEntryInfo | intermediate buffer1 | SResultRowEntryInfo | intermediate buffer 2|
675
 * +------------+--------------------------------------------+--------------------------------------------+
676
 *           offset[0]                                  offset[1]                                   offset[2]
677
 */
678
// TODO refactor: some function move away
679
int32_t setFunctionResultOutput(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t stage,
1,911,425✔
680
                             int32_t numOfExprs) {
681
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
1,911,425✔
682
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
1,911,425✔
683
  int32_t*        rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset;
1,911,425✔
684

685
  SResultRowInfo* pResultRowInfo = &pInfo->resultRowInfo;
1,911,425✔
686
  initResultRowInfo(pResultRowInfo);
1,911,425✔
687

688
  int64_t     tid = 0;
1,911,600✔
689
  int64_t     groupId = 0;
1,911,600✔
690
  SResultRow* pRow = doSetResultOutBufByKey(pSup->pResultBuf, pResultRowInfo, (char*)&tid, sizeof(tid), true, groupId,
1,911,600✔
691
                                            pTaskInfo, false, pSup, true);
692
  if (pRow == NULL || pTaskInfo->code != 0) {
1,911,934!
693
    return pTaskInfo->code;
×
694
  }
695

696
  for (int32_t i = 0; i < numOfExprs; ++i) {
12,929,870✔
697
    struct SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, rowEntryInfoOffset);
11,017,595✔
698
    cleanupResultRowEntry(pEntry);
11,017,936✔
699

700
    pCtx[i].resultInfo = pEntry;
11,017,936✔
701
    pCtx[i].scanFlag = stage;
11,017,936✔
702
  }
703

704
  return initCtxOutputBuffer(pCtx, numOfExprs);
1,912,275✔
705
}
706

707
int32_t setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols, SArray** pResList) {
1,910,057✔
708
  QRY_PARAM_CHECK(pResList);
1,910,057!
709
  SArray* pList = taosArrayInit(4, sizeof(int32_t));
1,910,057✔
710
  if (pList == NULL) {
1,912,216✔
711
    return terrno;
197✔
712
  }
713

714
  for (int32_t i = 0; i < numOfCols; ++i) {
12,935,590✔
715
    if (fmIsPseudoColumnFunc(pCtx[i].functionId)) {
11,022,834!
716
      void* px = taosArrayPush(pList, &i);
×
717
      if (px == NULL) {
×
718
        return terrno;
×
719
      }
720
    }
721
  }
722

723
  *pResList = pList;
1,912,756✔
724
  return 0;
1,912,756✔
725
}
726

727
int32_t doGenerateSourceData(SOperatorInfo* pOperator) {
1,657✔
728
  SProjectOperatorInfo* pProjectInfo = pOperator->info;
1,657✔
729

730
  SExprSupp*   pSup = &pOperator->exprSupp;
1,657✔
731
  SSDataBlock* pRes = pProjectInfo->binfo.pRes;
1,657✔
732
  SExprInfo*   pExpr = pSup->pExprInfo;
1,657✔
733
  int64_t      st = taosGetTimestampUs();
1,657✔
734
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1,657✔
735

736
  int32_t code = blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);
1,657✔
737
  if (code) {
1,657!
738
    return code;
×
739
  }
740

741
  for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
3,320✔
742
    int32_t outputSlotId = pExpr[k].base.resSchema.slotId;
1,663✔
743

744
    if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) {
1,663!
745
      SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, outputSlotId);
1,663✔
746
      if (pColInfoData == NULL) {
1,663!
747
        return terrno;
×
748
      }
749

750
      int32_t type = pExpr[k].base.pParam[0].param.nType;
1,663✔
751
      if (TSDB_DATA_TYPE_NULL == type) {
1,663✔
752
        colDataSetNNULL(pColInfoData, 0, 1);
753
      } else {
754
        code = colDataSetVal(pColInfoData, 0, taosVariantGet(&pExpr[k].base.pParam[0].param, type), false);
1,631✔
755
        if (code) {
1,631!
756
          return code;
×
757
        }
758
      }
759
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) {
×
760
      SqlFunctionCtx* pfCtx = &pSup->pCtx[k];
×
761

762
      // UDF scalar functions will be calculated here, for example, select foo(n) from (select 1 n).
763
      // UDF aggregate functions will be handled in agg operator.
764
      if (fmIsScalarFunc(pfCtx->functionId)) {
×
765
        SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
×
766
        if (pBlockList == NULL) {
×
767
          return terrno;
×
768
        }
769

770
        void* px = taosArrayPush(pBlockList, &pRes);
×
771
        if (px == NULL) {
×
772
          return terrno;
×
773
        }
774

775
        SColumnInfoData* pResColData = taosArrayGet(pRes->pDataBlock, outputSlotId);
×
776
        if (pResColData == NULL) {
×
777
          return terrno;
×
778
        }
779

780
        SColumnInfoData  idata = {.info = pResColData->info, .hasNull = true};
×
781

782
        SScalarParam dest = {.columnData = &idata};
×
783
        code = scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest);
×
784
        if (code != TSDB_CODE_SUCCESS) {
×
785
          taosArrayDestroy(pBlockList);
×
786
          return code;
×
787
        }
788

789
        int32_t startOffset = pRes->info.rows;
×
790
        if (pRes->info.capacity <= 0) {
×
791
          qError("project failed at: %s:%d", __func__, __LINE__);
×
792
          return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
793
        }
794
        code = colDataAssign(pResColData, &idata, dest.numOfRows, &pRes->info);
×
795
        if (code) {
×
796
          return code;
×
797
        }
798

799
        colDataDestroy(&idata);
×
800
        taosArrayDestroy(pBlockList);
×
801
      } else {
802
        return TSDB_CODE_OPS_NOT_SUPPORT;
×
803
      }
804
    } else {
805
      return TSDB_CODE_OPS_NOT_SUPPORT;
×
806
    }
807
  }
808

809
  pRes->info.rows = 1;
1,657✔
810
  code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
1,657✔
811
  if (code) {
1,657!
812
    pTaskInfo->code = code;
×
813
    return code;
×
814
  }
815

816
  (void) doIngroupLimitOffset(&pProjectInfo->limitInfo, 0, pRes, pOperator);
1,657✔
817

818
  pOperator->resultInfo.totalRows += pRes->info.rows;
1,657✔
819

820
  setOperatorCompleted(pOperator);
1,657✔
821
  if (pOperator->cost.openCost == 0) {
1,657!
822
    pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
1,657✔
823
  }
824

825
  return code;
1,657✔
826
}
827

828
static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, SArray* pPseudoList) {
9,356,189✔
829
  size_t num = (pPseudoList != NULL) ? taosArrayGetSize(pPseudoList) : 0;
9,356,189✔
830
  for (int32_t i = 0; i < num; ++i) {
9,356,822!
831
    pCtx[i].pOutput = taosArrayGet(pResult->pDataBlock, i);
×
832
    if (pCtx[i].pOutput == NULL) {
×
833
      qError("failed to get the output buf, ptr is null");
×
834
    }
835
  }
836
}
9,356,822✔
837

838
int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
9,356,389✔
839
                              int32_t numOfOutput, SArray* pPseudoList) {
840
  int32_t lino = 0;
9,356,389✔
841
  int32_t code = TSDB_CODE_SUCCESS;
9,356,389✔
842
  setPseudoOutputColInfo(pResult, pCtx, pPseudoList);
9,356,389✔
843
  pResult->info.dataLoad = 1;
9,356,618✔
844

845
  SArray* processByRowFunctionCtx = NULL;
9,356,618✔
846
  if (pSrcBlock == NULL) {
9,356,618!
847
    for (int32_t k = 0; k < numOfOutput; ++k) {
×
848
      int32_t outputSlotId = pExpr[k].base.resSchema.slotId;
×
849

850
      if (pExpr[k].pExpr->nodeType != QUERY_NODE_VALUE) {
×
851
        qError("project failed at: %s:%d", __func__, __LINE__);
×
852
        code = TSDB_CODE_INVALID_PARA;
×
853
        TSDB_CHECK_CODE(code, lino, _exit);
×
854
      }
855
      SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
×
856
      if (pColInfoData == NULL) {
×
857
        code = terrno;
×
858
        TSDB_CHECK_CODE(code, lino, _exit);
×
859
      }
860

861
      int32_t type = pExpr[k].base.pParam[0].param.nType;
×
862
      if (TSDB_DATA_TYPE_NULL == type) {
×
863
        colDataSetNNULL(pColInfoData, 0, 1);
864
      } else {
865
        code = colDataSetVal(pColInfoData, 0, taosVariantGet(&pExpr[k].base.pParam[0].param, type), false);
×
866
        TSDB_CHECK_CODE(code, lino, _exit);
×
867
      }
868
    }
869

870
    pResult->info.rows = 1;
×
871
    goto _exit;
×
872
  }
873

874
  if (pResult != pSrcBlock) {
9,356,618✔
875
    pResult->info.id.groupId = pSrcBlock->info.id.groupId;
7,811,235✔
876
    memcpy(pResult->info.parTbName, pSrcBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
7,811,235✔
877
  }
878

879
  // if the source equals to the destination, it is to create a new column as the result of scalar
880
  // function or some operators.
881
  bool createNewColModel = (pResult == pSrcBlock);
9,356,618✔
882
  if (createNewColModel) {
9,356,618✔
883
    code = blockDataEnsureCapacity(pResult, pResult->info.rows);
1,544,709✔
884
    if (code) {
1,544,835!
885
      TSDB_CHECK_CODE(code, lino, _exit);
×
886
    }
887
  }
888

889
  int32_t numOfRows = 0;
9,356,744✔
890

891
  for (int32_t k = 0; k < numOfOutput; ++k) {
32,811,824✔
892
    int32_t               outputSlotId = pExpr[k].base.resSchema.slotId;
23,569,161✔
893
    SqlFunctionCtx*       pfCtx = &pCtx[k];
23,569,161✔
894
    SInputColumnInfoData* pInputData = &pfCtx->input;
23,569,161✔
895

896
    if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) {  // it is a project query
23,569,161✔
897
      SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
15,236,181✔
898
      if (pColInfoData == NULL) {
15,235,127!
UNCOV
899
        code = terrno;
×
900
        TSDB_CHECK_CODE(code, lino, _exit);
1,178!
901
      }
902

903
      if (pResult->info.rows > 0 && !createNewColModel) {
15,235,161!
904
        int32_t ret = 0;
×
905

906
        if (pInputData->pData[0] == NULL) {
×
907
          int32_t slotId = pfCtx->param[0].pCol->slotId;
×
908

909
          SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId);
×
910
          if (pInput == NULL) {
×
911
            code = terrno;
×
912
            TSDB_CHECK_CODE(code, lino, _exit);
×
913
          }
914

915
          ret = colDataMergeCol(pColInfoData, pResult->info.rows, (int32_t*)&pResult->info.capacity, pInput,
×
916
                                pSrcBlock->info.rows);
×
917
        } else {
918
          ret = colDataMergeCol(pColInfoData, pResult->info.rows, (int32_t*)&pResult->info.capacity,
×
919
                                pInputData->pData[0], pInputData->numOfRows);
×
920
        }
921

922
        if (ret < 0) {
×
923
          code = ret;
×
924
        }
925

926
        TSDB_CHECK_CODE(code, lino, _exit);
×
927
      } else {
928
        if (pInputData->pData[0] == NULL) {
15,235,161✔
929
          int32_t slotId = pfCtx->param[0].pCol->slotId;
24,551✔
930

931
          SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId);
24,551✔
932
          if (pInput == NULL) {
24,551!
933
            code = terrno;
×
934
            TSDB_CHECK_CODE(code, lino, _exit);
×
935
          }
936

937
          code = colDataAssign(pColInfoData, pInput, pSrcBlock->info.rows, &pResult->info);
24,551✔
938
          numOfRows = pSrcBlock->info.rows;
24,551✔
939
        } else {
940
          code = colDataAssign(pColInfoData, pInputData->pData[0], pInputData->numOfRows, &pResult->info);
15,210,610✔
941
          numOfRows = pInputData->numOfRows;
15,211,661✔
942
        }
943

944
        TSDB_CHECK_CODE(code, lino, _exit);
15,236,212!
945
      }
946
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) {
8,332,980✔
947
      SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
2,781,346✔
948
      if (pColInfoData == NULL) {
2,781,374!
949
        code = terrno;
×
950
        TSDB_CHECK_CODE(code, lino, _exit);
×
951
      }
952

953
      int32_t offset = createNewColModel ? 0 : pResult->info.rows;
2,781,378!
954

955
      int32_t type = pExpr[k].base.pParam[0].param.nType;
2,781,378✔
956
      if (TSDB_DATA_TYPE_NULL == type) {
2,781,378✔
957
        colDataSetNNULL(pColInfoData, offset, pSrcBlock->info.rows);
21,993✔
958
      } else {
959
        char* p = taosVariantGet(&pExpr[k].base.pParam[0].param, type);
2,759,385✔
960
        for (int32_t i = 0; i < pSrcBlock->info.rows; ++i) {
2,147,483,647✔
961
          code = colDataSetVal(pColInfoData, i + offset, p, false);
2,147,483,647✔
962
          TSDB_CHECK_CODE(code, lino, _exit);
2,147,483,647!
963
        }
964
      }
965

966
      numOfRows = pSrcBlock->info.rows;
2,666,373✔
967
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_OPERATOR) {
5,551,634✔
968
      SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
1,247,131✔
969
      if (pBlockList == NULL) {
1,247,465!
970
        code = terrno;
×
971
        TSDB_CHECK_CODE(code, lino, _exit);
1,059!
972
      }
973

974
      void* px = taosArrayPush(pBlockList, &pSrcBlock);
1,247,286✔
975
      if (px == NULL) {
1,247,286!
976
        code = terrno;
×
977
        taosArrayDestroy(pBlockList);
×
978
        TSDB_CHECK_CODE(code, lino, _exit);
×
979
      }
980

981
      SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId);
1,247,286✔
982
      if (pResColData == NULL) {
1,247,317!
983
        code = terrno;
×
984
        taosArrayDestroy(pBlockList);
×
985
        TSDB_CHECK_CODE(code, lino, _exit);
×
986
      }
987

988
      SColumnInfoData  idata = {.info = pResColData->info, .hasNull = true};
1,247,317✔
989

990
      SScalarParam dest = {.columnData = &idata};
1,247,317✔
991
      code = scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest);
1,247,317✔
992
      if (code != TSDB_CODE_SUCCESS) {
1,247,325✔
993
        taosArrayDestroy(pBlockList);
1,019✔
994
        TSDB_CHECK_CODE(code, lino, _exit);
1,059!
995
      }
996

997
      int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
1,246,306✔
998
      if (pResult->info.capacity <= 0) {
1,246,306!
999
        qError("project failed at: %s:%d", __func__, __LINE__);
×
1000
        code = TSDB_CODE_INVALID_PARA;
×
1001
        TSDB_CHECK_CODE(code, lino, _exit);
×
1002
      }
1003

1004
      int32_t ret = colDataMergeCol(pResColData, startOffset, (int32_t*)&pResult->info.capacity, &idata, dest.numOfRows);
1,246,306✔
1005
      if (ret < 0) {
1,246,021!
1006
        code = ret;
×
1007
      }
1008

1009
      colDataDestroy(&idata);
1,246,021✔
1010
      TSDB_CHECK_CODE(code, lino, _exit);
1,246,254!
1011

1012
      numOfRows = dest.numOfRows;
1,246,254✔
1013
      taosArrayDestroy(pBlockList);
1,246,254✔
1014
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) {
4,304,503!
1015
      // _rowts/_c0, not tbname column
1016
      if (fmIsPseudoColumnFunc(pfCtx->functionId) && (!fmIsScanPseudoColumnFunc(pfCtx->functionId))) {
4,304,588!
UNCOV
1017
        if (fmIsGroupIdFunc(pfCtx->functionId)) {
×
UNCOV
1018
          SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
×
UNCOV
1019
          TSDB_CHECK_NULL(pColInfoData, code, lino, _exit, terrno);
×
UNCOV
1020
          code = colDataSetVal(pColInfoData, pResult->info.rows, (const char*)&pSrcBlock->info.id.groupId, false);
×
UNCOV
1021
          TSDB_CHECK_CODE(code, lino, _exit);
×
1022
        }
1023
      } else if (fmIsIndefiniteRowsFunc(pfCtx->functionId)) {
4,304,907✔
1024
        SResultRowEntryInfo* pResInfo = GET_RES_INFO(pfCtx);
1,687,152✔
1025
        code = pfCtx->fpSet.init(pfCtx, pResInfo);
1,687,152✔
1026
        TSDB_CHECK_CODE(code, lino, _exit);
1,687,153!
1027
        pfCtx->pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId);
1,687,153✔
1028
        if (pfCtx->pOutput == NULL) {
1,687,153!
UNCOV
1029
          code = terrno;
×
UNCOV
1030
          TSDB_CHECK_CODE(code, lino, _exit);
×
1031
        }
1032

1033
        pfCtx->offset = createNewColModel ? 0 : pResult->info.rows;  // set the start offset
1,687,153!
1034

1035
        // set the timestamp(_rowts) output buffer
1036
        if (taosArrayGetSize(pPseudoList) > 0) {
1,687,153!
UNCOV
1037
          int32_t* outputColIndex = taosArrayGet(pPseudoList, 0);
×
1038
          if (outputColIndex == NULL) {
×
UNCOV
1039
            code = terrno;
×
UNCOV
1040
            TSDB_CHECK_CODE(code, lino, _exit);
×
1041
          }
1042

UNCOV
1043
          pfCtx->pTsOutput = (SColumnInfoData*)pCtx[*outputColIndex].pOutput;
×
1044
        }
1045

1046
        // link pDstBlock to set selectivity value
1047
        if (pfCtx->subsidiaries.num > 0) {
1,687,153✔
1048
          pfCtx->pDstBlock = pResult;
401,585✔
1049
        }
1050

1051
        code = pfCtx->fpSet.process(pfCtx);
1,687,153✔
1052
        if (code != TSDB_CODE_SUCCESS) {
1,687,154✔
1053
          if (pCtx[k].fpSet.cleanup != NULL) {
18!
UNCOV
1054
            pCtx[k].fpSet.cleanup(&pCtx[k]);
×
1055
          }
1056
          TSDB_CHECK_CODE(code, lino, _exit);
18!
1057
        }
1058

1059
        numOfRows = pResInfo->numOfRes;
1,687,136✔
1060
        if (fmIsProcessByRowFunc(pfCtx->functionId)) {
1,687,136✔
1061
          if (NULL == processByRowFunctionCtx) {
1,516,295✔
1062
            processByRowFunctionCtx = taosArrayInit(1, sizeof(SqlFunctionCtx*));
1,516,225✔
1063
            if (!processByRowFunctionCtx) {
1,516,225!
UNCOV
1064
              code = terrno;
×
UNCOV
1065
              TSDB_CHECK_CODE(code, lino, _exit);
×
1066
            }
1067
          }
1068

1069
          void* px = taosArrayPush(processByRowFunctionCtx, &pfCtx);
1,516,381✔
1070
          if (px == NULL) {
1,516,381!
UNCOV
1071
            code = terrno;
×
UNCOV
1072
            TSDB_CHECK_CODE(code, lino, _exit);
×
1073
          }
1074
        }
1075
      } else if (fmIsAggFunc(pfCtx->functionId)) {
2,617,446✔
1076
        // selective value output should be set during corresponding function execution
1077
        if (fmIsSelectValueFunc(pfCtx->functionId)) {
696,352✔
1078
          continue;
401,861✔
1079
        }
1080
        // _group_key function for "partition by tbname" + csum(col_name) query
1081
        SColumnInfoData* pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId);
294,491✔
1082
        if (pOutput == NULL) {
294,491!
UNCOV
1083
          code = terrno;
×
UNCOV
1084
          TSDB_CHECK_CODE(code, lino, _exit);
×
1085
        }
1086

1087
        int32_t          slotId = pfCtx->param[0].pCol->slotId;
294,491✔
1088

1089
        // todo handle the json tag
1090
        SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId);
294,491✔
1091
        if (pInput == NULL) {
294,491!
UNCOV
1092
          code = terrno;
×
UNCOV
1093
          TSDB_CHECK_CODE(code, lino, _exit);
×
1094
        }
1095

1096
        for (int32_t f = 0; f < pSrcBlock->info.rows; ++f) {
21,273,004✔
1097
          bool isNull = colDataIsNull_s(pInput, f);
20,978,513✔
1098
          if (isNull) {
20,978,513✔
1099
            colDataSetNULL(pOutput, pResult->info.rows + f);
2,068,690✔
1100
          } else {
1101
            char* data = colDataGetData(pInput, f);
18,909,823!
1102
            code = colDataSetVal(pOutput, pResult->info.rows + f, data, isNull);
18,909,823✔
1103
            TSDB_CHECK_CODE(code, lino, _exit);
18,909,823!
1104
          }
1105
        }
1106

1107
      } else {
1108
        SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
1,920,482✔
1109
        if (pBlockList == NULL) {
1,922,418!
UNCOV
1110
          code = terrno;
×
1111
          TSDB_CHECK_CODE(code, lino, _exit);
101!
1112
        }
1113

1114
        void* px = taosArrayPush(pBlockList, &pSrcBlock);
1,922,026✔
1115
        if (px == NULL) {
1,922,026!
UNCOV
1116
          code = terrno;
×
1117
          TSDB_CHECK_CODE(code, lino, _exit);
×
1118
        }
1119

1120
        SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId);
1,922,026✔
1121
        if (pResColData == NULL) {
1,922,493!
UNCOV
1122
          taosArrayDestroy(pBlockList);
×
UNCOV
1123
          code = terrno;
×
UNCOV
1124
          TSDB_CHECK_CODE(code, lino, _exit);
×
1125
        }
1126

1127
        SColumnInfoData  idata = {.info = pResColData->info, .hasNull = true};
1,922,493✔
1128

1129
        SScalarParam dest = {.columnData = &idata};
1,922,493✔
1130
        code = scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest);
1,922,493✔
1131
        if (code != TSDB_CODE_SUCCESS) {
1,922,452!
UNCOV
1132
          taosArrayDestroy(pBlockList);
×
1133
          TSDB_CHECK_CODE(code, lino, _exit);
101!
1134
        }
1135

1136
        int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
1,922,535✔
1137
        if (pResult->info.capacity <= 0) {
1,922,535!
UNCOV
1138
          qError("project failed at: %s:%d", __func__, __LINE__);
×
1139
          code = TSDB_CODE_INVALID_PARA;
×
UNCOV
1140
          TSDB_CHECK_CODE(code, lino, _exit);
×
1141
        }
1142
        int32_t ret = colDataMergeCol(pResColData, startOffset, (int32_t*)&pResult->info.capacity, &idata, dest.numOfRows);
1,922,535✔
1143
        if (ret < 0) {
1,921,306!
UNCOV
1144
          code = ret;
×
1145
        }
1146

1147
        colDataDestroy(&idata);
1,921,306✔
1148

1149
        numOfRows = dest.numOfRows;
1,922,751✔
1150
        taosArrayDestroy(pBlockList);
1,922,751✔
1151
        TSDB_CHECK_CODE(code, lino, _exit);
1,922,717!
1152
      }
1153
    } else {
UNCOV
1154
      return TSDB_CODE_OPS_NOT_SUPPORT;
×
1155
    }
1156
  }
1157

1158
  if (processByRowFunctionCtx && taosArrayGetSize(processByRowFunctionCtx) > 0){
9,242,663✔
1159
    SqlFunctionCtx** pfCtx = taosArrayGet(processByRowFunctionCtx, 0);
1,403,423✔
1160
    if (pfCtx == NULL) {
1,516,225!
UNCOV
1161
      code = terrno;
×
UNCOV
1162
      TSDB_CHECK_CODE(code, lino, _exit);
×
1163
    }
1164

1165
    code = (*pfCtx)->fpSet.processFuncByRow(processByRowFunctionCtx);
1,516,225✔
1166
    TSDB_CHECK_CODE(code, lino, _exit);
1,516,225✔
1167
    numOfRows = (*pfCtx)->resultInfo->numOfRes;
1,516,204✔
1168
  }
1169

1170
  if (!createNewColModel) {
9,355,444✔
1171
    pResult->info.rows += numOfRows;
7,810,349✔
1172
  }
1173

1174
_exit:
1,545,095✔
1175
  if(processByRowFunctionCtx) {
9,356,643✔
1176
    taosArrayDestroy(processByRowFunctionCtx);
1,516,225✔
1177
  }
1178
  if(code) {
9,356,643✔
1179
    qError("project apply functions failed at: %s:%d", __func__, lino);
1,199!
1180
  }
1181
  return code;
9,356,472✔
1182
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc