• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4103

17 May 2025 02:18AM UTC coverage: 63.264% (+0.4%) from 62.905%
#4103

push

travis-ci

web-flow
Merge pull request #31110 from taosdata/3.0

merge 3.0

158149 of 318142 branches covered (49.71%)

Branch coverage included in aggregate %.

3 of 5 new or added lines in 1 file covered. (60.0%)

1725 existing lines in 138 files now uncovered.

243642 of 316962 relevant lines covered (76.87%)

16346281.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.23
/source/libs/executor/src/projectoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "functionMgt.h"
19
#include "operator.h"
20
#include "querytask.h"
21
#include "streaminterval.h"
22
#include "taoserror.h"
23
#include "tdatablock.h"
24

25
typedef struct SProjectOperatorInfo {
26
  SOptrBasicInfo binfo;
27
  SAggSupporter  aggSup;
28
  SArray*        pPseudoColInfo;
29
  SLimitInfo     limitInfo;
30
  bool           mergeDataBlocks;
31
  SSDataBlock*   pFinalRes;
32
  bool           inputIgnoreGroup;
33
  bool           outputIgnoreGroup;
34
} SProjectOperatorInfo;
35

36
typedef struct SIndefOperatorInfo {
37
  SOptrBasicInfo binfo;
38
  SAggSupporter  aggSup;
39
  SArray*        pPseudoColInfo;
40
  SExprSupp      scalarSup;
41
  uint64_t       groupId;
42
  SSDataBlock*   pNextGroupRes;
43
} SIndefOperatorInfo;
44

45
static int32_t      doGenerateSourceData(SOperatorInfo* pOperator);
46
static int32_t      doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
47
static int32_t      doApplyIndefinitFunction(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
48
static int32_t      setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols, SArray** pResList);
49
static int32_t      setFunctionResultOutput(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SAggSupporter* pSup,
50
                                            int32_t stage, int32_t numOfExprs);
51

52
static void destroyProjectOperatorInfo(void* param) {
1,887,788✔
53
  if (NULL == param) {
1,887,788!
54
    return;
×
55
  }
56

57
  SProjectOperatorInfo* pInfo = (SProjectOperatorInfo*)param;
1,887,788✔
58
  cleanupBasicInfo(&pInfo->binfo);
1,887,788✔
59
  cleanupAggSup(&pInfo->aggSup);
1,887,972✔
60
  taosArrayDestroy(pInfo->pPseudoColInfo);
1,888,012✔
61

62
  blockDataDestroy(pInfo->pFinalRes);
1,888,027✔
63
  taosMemoryFreeClear(param);
1,888,006!
64
}
65

66
static void destroyIndefinitOperatorInfo(void* param) {
118,648✔
67
  SIndefOperatorInfo* pInfo = (SIndefOperatorInfo*)param;
118,648✔
68
  if (pInfo == NULL) {
118,648!
69
    return;
×
70
  }
71

72
  cleanupBasicInfo(&pInfo->binfo);
118,648✔
73
  taosArrayDestroy(pInfo->pPseudoColInfo);
118,650✔
74
  cleanupAggSup(&pInfo->aggSup);
118,650✔
75
  cleanupExprSupp(&pInfo->scalarSup);
118,650✔
76

77
  taosMemoryFreeClear(param);
118,650!
78
}
79

80
void streamOperatorReleaseState(SOperatorInfo* pOperator) {
1,001✔
81
  SOperatorInfo* downstream = pOperator->pDownstream[0];
1,001✔
82
  if (downstream->fpSet.releaseStreamStateFn) {
1,001!
83
    downstream->fpSet.releaseStreamStateFn(downstream);
1,001✔
84
  }
85
}
1,001✔
86

87
void streamOperatorReloadState(SOperatorInfo* pOperator) {
1,001✔
88
  SOperatorInfo* downstream = pOperator->pDownstream[0];
1,001✔
89
  if (downstream->fpSet.reloadStreamStateFn) {
1,001!
90
    downstream->fpSet.reloadStreamStateFn(downstream);
1,001✔
91
  }
92
}
1,001✔
93

94
int32_t createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, SExecTaskInfo* pTaskInfo,
1,886,073✔
95
                                  SOperatorInfo** pOptrInfo) {
96
  QRY_PARAM_CHECK(pOptrInfo);
1,886,073!
97

98
  int32_t code = TSDB_CODE_SUCCESS;
1,886,073✔
99
  SProjectOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SProjectOperatorInfo));
1,886,073!
100
  SOperatorInfo*        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
1,886,615!
101
  if (pInfo == NULL || pOperator == NULL) {
1,886,766!
102
    code = terrno;
×
103
    goto _error;
×
104
  }
105

106
  pOperator->exprSupp.hasWindowOrGroup = false;
1,886,848✔
107
  pOperator->pTaskInfo = pTaskInfo;
1,886,848✔
108

109
  int32_t    lino = 0;
1,886,848✔
110

111
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pProjPhyNode->node.pOutputDataBlockDesc);
1,886,848✔
112
  TSDB_CHECK_NULL(pResBlock, code, lino, _error, terrno);
1,887,399!
113

114
  initLimitInfo(pProjPhyNode->node.pLimit, pProjPhyNode->node.pSlimit, &pInfo->limitInfo);
1,887,399✔
115

116
  pInfo->binfo.pRes = pResBlock;
1,887,583✔
117
  pInfo->pFinalRes = NULL;
1,887,583✔
118

119
  code = createOneDataBlock(pResBlock, false, &pInfo->pFinalRes);
1,887,583✔
120
  TSDB_CHECK_CODE(code, lino, _error);
1,887,027!
121

122
  pInfo->binfo.inputTsOrder = pProjPhyNode->node.inputTsOrder;
1,887,027✔
123
  pInfo->binfo.outputTsOrder = pProjPhyNode->node.outputTsOrder;
1,887,027✔
124
  pInfo->inputIgnoreGroup = pProjPhyNode->inputIgnoreGroup;
1,887,027✔
125
  pInfo->outputIgnoreGroup = pProjPhyNode->ignoreGroupId;
1,887,027✔
126

127
  if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM || pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {
1,887,027✔
128
    pInfo->mergeDataBlocks = false;
4,894✔
129
  } else {
130
    if (!pProjPhyNode->ignoreGroupId) {
1,882,133✔
131
      pInfo->mergeDataBlocks = false;
35,117✔
132
    } else {
133
      pInfo->mergeDataBlocks = pProjPhyNode->mergeDataBlock;
1,847,016✔
134
    }
135
  }
136

137
  int32_t numOfRows = 4096;
1,887,027✔
138
  size_t  keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
1,887,027✔
139

140
  // Make sure the size of SSDataBlock will never exceed the size of 2MB.
141
  int32_t TWOMB = 2 * 1024 * 1024;
1,887,027✔
142
  if (numOfRows * pResBlock->info.rowSize > TWOMB) {
1,887,027✔
143
    numOfRows = TWOMB / pResBlock->info.rowSize;
293,124✔
144
  }
145

146
  initResultSizeInfo(&pOperator->resultInfo, numOfRows);
1,887,027✔
147
  
148
  int32_t    numOfCols = 0;
1,887,039✔
149
  SExprInfo* pExprInfo = NULL;
1,887,039✔
150
  code = createExprInfo(pProjPhyNode->pProjections, NULL, &pExprInfo, &numOfCols);
1,887,039✔
151
  TSDB_CHECK_CODE(code, lino, _error);
1,887,516!
152
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
1,887,516✔
153
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
1,887,516✔
154
  TSDB_CHECK_CODE(code, lino, _error);
1,887,514!
155

156
  initBasicInfo(&pInfo->binfo, pResBlock);
1,887,514✔
157
  code = setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfCols);
1,886,934✔
158
  TSDB_CHECK_CODE(code, lino, _error);
1,887,282!
159

160
  code = filterInitFromNode((SNode*)pProjPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
1,887,282✔
161
  TSDB_CHECK_CODE(code, lino, _error);
1,886,830!
162

163
  code = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfCols, &pInfo->pPseudoColInfo);
1,886,830✔
164
  TSDB_CHECK_CODE(code, lino, _error);
1,887,603!
165

166
  code = initStreamFillOperatorColumnMapInfo(&pOperator->exprSupp, downstream);
1,887,603✔
167
  TSDB_CHECK_CODE(code, lino, _error);
1,886,139!
168

169
  setOperatorInfo(pOperator, "ProjectOperator", QUERY_NODE_PHYSICAL_PLAN_PROJECT, false, OP_NOT_OPENED, pInfo,
1,886,139✔
170
                  pTaskInfo);
171
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doProjectOperation, NULL, destroyProjectOperatorInfo,
1,886,149✔
172
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
173
  setOperatorStreamStateFn(pOperator, streamOperatorReleaseState, streamOperatorReloadState);
1,886,050✔
174

175
  if (NULL != downstream) {
1,886,597✔
176
    code = appendDownstream(pOperator, &downstream, 1);
1,885,010✔
177
    if (code != TSDB_CODE_SUCCESS) {
1,885,797!
178
      goto _error;
×
179
    }
180
  }
181

182
  *pOptrInfo = pOperator;
1,887,384✔
183
  return TSDB_CODE_SUCCESS;
1,887,384✔
184

185
_error:
×
186
  if (pInfo != NULL) destroyProjectOperatorInfo(pInfo);
×
187
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
188
  pTaskInfo->code = code;
×
189
  return code;
×
190
}
191

192
static int32_t discardGroupDataBlock(SSDataBlock* pBlock, SLimitInfo* pLimitInfo) {
3,852,479✔
193
  if (pLimitInfo->remainGroupOffset > 0) {
3,852,479✔
194
    // it is the first group
195
    if (pLimitInfo->currentGroupId == 0 || pLimitInfo->currentGroupId == pBlock->info.id.groupId) {
4,992✔
196
      pLimitInfo->currentGroupId = pBlock->info.id.groupId;
2,191✔
197
      return PROJECT_RETRIEVE_CONTINUE;
2,191✔
198
    } else if (pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
2,801!
199
      // now it is the data from a new group
200
      pLimitInfo->remainGroupOffset -= 1;
2,801✔
201
      pLimitInfo->currentGroupId = pBlock->info.id.groupId;
2,801✔
202

203
      // ignore data block in current group
204
      if (pLimitInfo->remainGroupOffset > 0) {
2,801✔
205
        return PROJECT_RETRIEVE_CONTINUE;
2,588✔
206
      }
207

208
      pLimitInfo->currentGroupId = 0;
213✔
209
    }
210
  }
211

212
  return PROJECT_RETRIEVE_DONE;
3,847,700✔
213
}
214

215
static int32_t setInfoForNewGroup(SSDataBlock* pBlock, SLimitInfo* pLimitInfo, SOperatorInfo* pOperator) {
3,847,727✔
216
  // remainGroupOffset == 0
217
  // here check for a new group data, we need to handle the data of the previous group.
218
  if (!(pLimitInfo->remainGroupOffset == 0 || pLimitInfo->remainGroupOffset == -1)) {
3,847,727!
219
    qError("project failed at: %s:%d", __func__, __LINE__);
×
220
    return TSDB_CODE_INVALID_PARA;
×
221
  }
222

223
  bool newGroup = false;
3,847,727✔
224
  if (0 == pBlock->info.id.groupId) {
3,847,727✔
225
    pLimitInfo->numOfOutputGroups = 1;
3,592,089✔
226
  } else if (pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
255,638✔
227
    pLimitInfo->currentGroupId = pBlock->info.id.groupId;
228,765✔
228
    pLimitInfo->numOfOutputGroups += 1;
228,765✔
229
    newGroup = true;
228,765✔
230
  } else {
231
    return PROJECT_RETRIEVE_CONTINUE;
26,873✔
232
  }
233

234
  if ((pLimitInfo->slimit.limit >= 0) && (pLimitInfo->slimit.limit < pLimitInfo->numOfOutputGroups)) {
3,820,854✔
235
    setOperatorCompleted(pOperator);
355✔
236
    return PROJECT_RETRIEVE_DONE;
355✔
237
  }
238

239
  // reset the value for a new group data
240
  // existing rows that belongs to previous group.
241
  if (newGroup) {
3,820,499✔
242
    resetLimitInfoForNextGroup(pLimitInfo);
228,410✔
243
  }
244

245
  return PROJECT_RETRIEVE_CONTINUE;
3,820,507✔
246
}
247

248
// todo refactor
249
static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SSDataBlock* pBlock,
3,848,546✔
250
                                    SOperatorInfo* pOperator) {
251
  // set current group id
252
  pLimitInfo->currentGroupId = groupId;
3,848,546✔
253
  bool limitReached = applyLimitOffset(pLimitInfo, pBlock, pOperator->pTaskInfo);
3,848,546✔
254
  if (pBlock->info.rows == 0 && 0 != pLimitInfo->limit.limit) {
3,848,728✔
255
    return PROJECT_RETRIEVE_CONTINUE;
18,286✔
256
  } else {
257
    if (limitReached && (pLimitInfo->slimit.limit >= 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
3,830,442✔
258
      setOperatorCompleted(pOperator);
113✔
259
    } else if (limitReached && groupId == 0) {
3,830,329✔
260
      setOperatorCompleted(pOperator);
94,312✔
261
    }
262
  }
263

264
  return PROJECT_RETRIEVE_DONE;
3,830,381✔
265
}
266

267
int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
4,464,979✔
268
  QRY_PARAM_CHECK(pResBlock);
4,464,979!
269

270
  SProjectOperatorInfo* pProjectInfo = pOperator->info;
4,464,979✔
271
  SOptrBasicInfo*       pInfo = &pProjectInfo->binfo;
4,464,979✔
272
  SExprSupp*            pSup = &pOperator->exprSupp;
4,464,979✔
273
  SSDataBlock*          pRes = pInfo->pRes;
4,464,979✔
274
  SSDataBlock*          pFinalRes = pProjectInfo->pFinalRes;
4,464,979✔
275
  int32_t               code = 0;
4,464,979✔
276
  int32_t               lino = 0;
4,464,979✔
277
  int64_t               st = 0;
4,464,979✔
278
  int32_t               order = pInfo->inputTsOrder;
4,464,979✔
279
  int32_t               scanFlag = 0;
4,464,979✔
280

281
  blockDataCleanup(pFinalRes);
4,464,979✔
282
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
4,466,341✔
283

284
  if (pOperator->status == OP_EXEC_DONE) {
4,466,341✔
285
    return code;
1,062,752✔
286
  }
287

288
  if (pOperator->cost.openCost == 0) {
3,403,589✔
289
    st = taosGetTimestampUs();
1,888,813✔
290
  }
291

292
  SOperatorInfo* downstream = pOperator->numOfDownstream > 0 ? pOperator->pDownstream[0] : NULL;
3,404,475✔
293
  SLimitInfo*    pLimitInfo = &pProjectInfo->limitInfo;
3,404,475✔
294

295
  if (downstream == NULL) {
3,404,475✔
296
    code = doGenerateSourceData(pOperator);
1,681✔
297
    QUERY_CHECK_CODE(code, lino, _end);
1,681!
298

299
    if (pProjectInfo->outputIgnoreGroup) {
1,681!
300
      pRes->info.id.groupId = 0;
1,681✔
301
    }
302

303
    *pResBlock = (pRes->info.rows > 0)? pRes:NULL;
1,681!
304
    return code;
1,681✔
305
  }
306

307
  while (1) {
2,312,421✔
308
    while (1) {
23,065✔
309
      blockDataCleanup(pRes);
5,738,280✔
310

311
      // The downstream exec may change the value of the newgroup, so use a local variable instead.
312
      SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
5,736,932✔
313
      if (pBlock == NULL) {
5,737,555✔
314
        qDebug("set op close, exec %d, status %d rows %" PRId64 , pTaskInfo->execModel, pOperator->status, pFinalRes->info.rows);
1,878,904✔
315
        setOperatorCompleted(pOperator);
1,878,907✔
316
        break;
1,879,061✔
317
      }
318
//      if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {
319
//        qDebug("set status recv");
320
//        pOperator->status = OP_EXEC_RECV;
321
//      }
322

323
      // for stream interval
324
      if (pBlock->info.type == STREAM_RETRIEVE || pBlock->info.type == STREAM_DELETE_RESULT ||
3,858,651✔
325
          pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_CREATE_CHILD_TABLE ||
3,857,408!
326
          pBlock->info.type == STREAM_CHECKPOINT || pBlock->info.type == STREAM_NOTIFY_EVENT) {
3,853,597✔
327

328
        *pResBlock = pBlock;
5,665✔
329
        return code;
5,665✔
330
      }
331

332
      if (pProjectInfo->inputIgnoreGroup) {
3,852,986✔
333
        pBlock->info.id.groupId = 0;
456,622✔
334
      }
335

336
      int32_t status = discardGroupDataBlock(pBlock, pLimitInfo);
3,852,986✔
337
      if (status == PROJECT_RETRIEVE_CONTINUE) {
3,852,843✔
338
        continue;
4,779✔
339
      }
340

341
      (void) setInfoForNewGroup(pBlock, pLimitInfo, pOperator);
3,848,064✔
342
      if (pOperator->status == OP_EXEC_DONE) {
3,847,943✔
343
        break;
355✔
344
      }
345

346
      if (pProjectInfo->mergeDataBlocks) {
3,847,588✔
347
        pFinalRes->info.scanFlag = scanFlag = pBlock->info.scanFlag;
2,478,772✔
348
      } else {
349
        pRes->info.scanFlag = scanFlag = pBlock->info.scanFlag;
1,368,816✔
350
      }
351

352
      code = setInputDataBlock(pSup, pBlock, order, scanFlag, false);
3,847,588✔
353
      QUERY_CHECK_CODE(code, lino, _end);
3,848,268!
354

355
      code = blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
3,848,268✔
356
      QUERY_CHECK_CODE(code, lino, _end);
3,848,327!
357

358
      code = projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs,
3,848,327✔
359
                                   pProjectInfo->pPseudoColInfo);
360
      QUERY_CHECK_CODE(code, lino, _end);
3,847,856✔
361

362
      status = doIngroupLimitOffset(pLimitInfo, pBlock->info.id.groupId, pInfo->pRes, pOperator);
3,847,041✔
363
      if (status == PROJECT_RETRIEVE_CONTINUE) {
3,847,105✔
364
        continue;
18,286✔
365
      }
366

367
      break;
3,828,819✔
368
    }
369

370
    if (pProjectInfo->mergeDataBlocks) {
5,708,235✔
371
      if (pRes->info.rows > 0) {
3,565,808✔
372
        pFinalRes->info.id.groupId = 0;  // clear groupId
2,459,470✔
373
        pFinalRes->info.version = pRes->info.version;
2,459,470✔
374

375
        // continue merge data, ignore the group id
376
        code = blockDataMerge(pFinalRes, pRes);
2,459,470✔
377
        QUERY_CHECK_CODE(code, lino, _end);
2,459,807✔
378

379
        if (pFinalRes->info.rows + pRes->info.rows <= pOperator->resultInfo.threshold && (pOperator->status != OP_EXEC_DONE)) {
2,459,806✔
380
          continue;
2,312,519✔
381
        }
382
      }
383

384
      // do apply filter
385
      code = doFilter(pFinalRes, pOperator->exprSupp.pFilterInfo, NULL);
1,253,625✔
386
      QUERY_CHECK_CODE(code, lino, _end);
1,253,896!
387

388
      // when apply the limit/offset for each group, pRes->info.rows may be 0, due to limit constraint.
389
      if (pFinalRes->info.rows > 0 || (pOperator->status == OP_EXEC_DONE)) {
1,253,896!
390
        qDebug("project return %" PRId64 " rows, status %d", pFinalRes->info.rows, pOperator->status);
1,253,994✔
391
        break;
1,253,856✔
392
      }
393
    } else {
394
      // do apply filter
395
      if (pRes->info.rows > 0) {
2,142,427✔
396
        code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
1,368,727✔
397
        QUERY_CHECK_CODE(code, lino, _end);
1,368,798!
398

399
        if (pRes->info.rows == 0) {
1,368,798!
400
          continue;
×
401
        }
402
      }
403

404
      // no results generated
405
      break;
2,142,498✔
406
    }
407
  }
408

409
  SSDataBlock* p = pProjectInfo->mergeDataBlocks ? pFinalRes : pRes;
3,396,354✔
410
  pOperator->resultInfo.totalRows += p->info.rows;
3,396,354✔
411
  p->info.dataLoad = 1;
3,396,354✔
412

413
  if (pOperator->cost.openCost == 0) {
3,396,354✔
414
    pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
1,884,320✔
415
  }
416

417
  if (pProjectInfo->outputIgnoreGroup) {
3,396,468✔
418
    p->info.id.groupId = 0;
3,189,903✔
419
  }
420

421
  if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
3,396,468✔
422
    printDataBlock(p, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
27,399✔
423
  }
424

425
  *pResBlock = (p->info.rows > 0)? p:NULL;
3,396,561✔
426

427
_end:
3,397,377✔
428
  if (code != TSDB_CODE_SUCCESS) {
3,397,377✔
429
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
816!
430
    pTaskInfo->code = code;
816✔
431
    T_LONG_JMP(pTaskInfo->env, code);
816!
432
  }
433
  return code;
3,396,561✔
434
}
435

436
int32_t createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode,
118,649✔
437
                                                 SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
438
  QRY_PARAM_CHECK(pOptrInfo);
118,649!
439
  int32_t code = 0;
118,649✔
440
  int32_t lino = 0;
118,649✔
441
  int32_t numOfRows = 4096;
118,649✔
442
  size_t  keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
118,649✔
443

444
  SIndefOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIndefOperatorInfo));
118,649!
445
  SOperatorInfo*      pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
118,650!
446
  if (pInfo == NULL || pOperator == NULL) {
118,650!
447
    code = terrno;
×
448
    goto _error;
×
449
  }
450

451
  pOperator->pTaskInfo = pTaskInfo;
118,650✔
452

453
  SExprSupp* pSup = &pOperator->exprSupp;
118,650✔
454
  pSup->hasWindowOrGroup = false;
118,650✔
455

456
  SIndefRowsFuncPhysiNode* pPhyNode = (SIndefRowsFuncPhysiNode*)pNode;
118,650✔
457

458
  if (pPhyNode->pExprs != NULL) {
118,650✔
459
    int32_t    num = 0;
139✔
460
    SExprInfo* pSExpr = NULL;
139✔
461
    code = createExprInfo(pPhyNode->pExprs, NULL, &pSExpr, &num);
139✔
462
    QUERY_CHECK_CODE(code, lino, _error);
139!
463

464
    code = initExprSupp(&pInfo->scalarSup, pSExpr, num, &pTaskInfo->storageAPI.functionStore);
139✔
465
    if (code != TSDB_CODE_SUCCESS) {
139!
466
      goto _error;
×
467
    }
468
  }
469

470
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->node.pOutputDataBlockDesc);
118,650✔
471
  TSDB_CHECK_NULL(pResBlock, code, lino, _error, terrno);
118,650!
472

473
  // Make sure the size of SSDataBlock will never exceed the size of 2MB.
474
  int32_t TWOMB = 2 * 1024 * 1024;
118,650✔
475
  if (numOfRows * pResBlock->info.rowSize > TWOMB) {
118,650!
476
    numOfRows = TWOMB / pResBlock->info.rowSize;
×
477
  }
478

479
  initBasicInfo(&pInfo->binfo, pResBlock);
118,650✔
480
  initResultSizeInfo(&pOperator->resultInfo, numOfRows);
118,650✔
481
  code = blockDataEnsureCapacity(pResBlock, numOfRows);
118,649✔
482
  TSDB_CHECK_CODE(code, lino, _error);
118,649!
483

484
  int32_t    numOfExpr = 0;
118,649✔
485
  SExprInfo* pExprInfo = NULL;
118,649✔
486
  code = createExprInfo(pPhyNode->pFuncs, NULL, &pExprInfo, &numOfExpr);
118,649✔
487
  TSDB_CHECK_CODE(code, lino, _error);
118,650!
488

489
  code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str,
118,650✔
490
                            pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
118,650✔
491
  TSDB_CHECK_CODE(code, lino, _error);
118,647!
492

493
  code = setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr);
118,647✔
494
  TSDB_CHECK_CODE(code, lino, _error);
118,648!
495

496
  code = filterInitFromNode((SNode*)pPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
118,648✔
497
  TSDB_CHECK_CODE(code, lino, _error);
118,648!
498

499
  pInfo->binfo.pRes = pResBlock;
118,648✔
500
  pInfo->binfo.inputTsOrder = pNode->inputTsOrder;
118,648✔
501
  pInfo->binfo.outputTsOrder = pNode->outputTsOrder;
118,648✔
502
  code = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr, &pInfo->pPseudoColInfo);
118,648✔
503
  TSDB_CHECK_CODE(code, lino, _error);
118,648!
504

505
  setOperatorInfo(pOperator, "IndefinitOperator", QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC, false, OP_NOT_OPENED, pInfo,
118,648✔
506
                  pTaskInfo);
507
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doApplyIndefinitFunction, NULL, destroyIndefinitOperatorInfo,
118,648✔
508
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
509

510
  code = appendDownstream(pOperator, &downstream, 1);
118,647✔
511
  if (code != TSDB_CODE_SUCCESS) {
118,650!
512
    goto _error;
×
513
  }
514

515
  *pOptrInfo = pOperator;
118,650✔
516
  return TSDB_CODE_SUCCESS;
118,650✔
517

518
_error:
×
519
  if (pInfo != NULL) destroyIndefinitOperatorInfo(pInfo);
×
520
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
521
  pTaskInfo->code = code;
×
522
  return code;
×
523
}
524

525
static void doHandleDataBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, SOperatorInfo* downstream,
853,429✔
526
                              SExecTaskInfo* pTaskInfo) {
527
  SIndefOperatorInfo* pIndefInfo = pOperator->info;
853,429✔
528
  SOptrBasicInfo*     pInfo = &pIndefInfo->binfo;
853,429✔
529
  SExprSupp*          pSup = &pOperator->exprSupp;
853,429✔
530

531
  int32_t order = pInfo->inputTsOrder;
853,429✔
532
  int32_t scanFlag = pBlock->info.scanFlag;
853,429✔
533
  int32_t code = TSDB_CODE_SUCCESS;
853,429✔
534

535
  // there is an scalar expression that needs to be calculated before apply the group aggregation.
536
  SExprSupp* pScalarSup = &pIndefInfo->scalarSup;
853,429✔
537
  if (pScalarSup->pExprInfo != NULL) {
853,429✔
538
    code = projectApplyFunctions(pScalarSup->pExprInfo, pBlock, pBlock, pScalarSup->pCtx, pScalarSup->numOfExprs,
656✔
539
                                 pIndefInfo->pPseudoColInfo);
540
    if (code != TSDB_CODE_SUCCESS) {
656!
541
      T_LONG_JMP(pTaskInfo->env, code);
×
542
    }
543
  }
544

545
  code = setInputDataBlock(pSup, pBlock, order, scanFlag, false);
853,429✔
546
  if (code) {
853,423!
547
    T_LONG_JMP(pTaskInfo->env, code);
×
548
  }
549

550
  code = blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
853,423✔
551
  if (code != TSDB_CODE_SUCCESS) {
853,424!
552
    T_LONG_JMP(pTaskInfo->env, code);
×
553
  }
554

555
  code = projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs,
853,424✔
556
                               pIndefInfo->pPseudoColInfo);
557
  if (code != TSDB_CODE_SUCCESS) {
853,433✔
558
    T_LONG_JMP(pTaskInfo->env, code);
58!
559
  }
560
}
853,375✔
561

562
int32_t doApplyIndefinitFunction(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
290,041✔
563
  QRY_PARAM_CHECK(pResBlock);
290,041!
564
  SIndefOperatorInfo* pIndefInfo = pOperator->info;
290,041✔
565
  SOptrBasicInfo*     pInfo = &pIndefInfo->binfo;
290,041✔
566
  SExprSupp*          pSup = &pOperator->exprSupp;
290,041✔
567
  int64_t             st = 0;
290,041✔
568
  int32_t             code = TSDB_CODE_SUCCESS;
290,041✔
569
  int32_t             lino = 0;
290,041✔
570
  SSDataBlock*        pRes = pInfo->pRes;
290,041✔
571

572
  blockDataCleanup(pRes);
290,041✔
573

574
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
290,043✔
575
  if (pOperator->status == OP_EXEC_DONE) {
290,043✔
576
    return code;
107,747✔
577
  }
578

579
  if (pOperator->cost.openCost == 0) {
182,296✔
580
    st = taosGetTimestampUs();
118,649✔
581
  }
582

583
  SOperatorInfo* downstream = pOperator->pDownstream[0];
182,296✔
584

585
  while (1) {
2,276✔
586
    // here we need to handle the existsed group results
587
    if (pIndefInfo->pNextGroupRes != NULL) {  // todo extract method
184,572✔
588
      for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
212,184✔
589
        SqlFunctionCtx* pCtx = &pSup->pCtx[k];
157,779✔
590

591
        SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
157,779✔
592
        pResInfo->initialized = false;
157,779✔
593
        pCtx->pOutput = NULL;
157,779✔
594
      }
595

596
      doHandleDataBlock(pOperator, pIndefInfo->pNextGroupRes, downstream, pTaskInfo);
54,405✔
597
      pIndefInfo->pNextGroupRes = NULL;
54,405✔
598
    }
599

600
    if (pInfo->pRes->info.rows < pOperator->resultInfo.threshold) {
184,572✔
601
      while (1) {
787,453✔
602
        // The downstream exec may change the value of the newgroup, so use a local variable instead.
603
        SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
972,024✔
604
        if (pBlock == NULL) {
972,015✔
605
          setOperatorCompleted(pOperator);
118,365✔
606
          break;
118,365✔
607
        }
608
        pInfo->pRes->info.scanFlag = pBlock->info.scanFlag;
853,650✔
609

610
        if (pIndefInfo->groupId == 0 && pBlock->info.id.groupId != 0) {
853,650✔
611
          pIndefInfo->groupId = pBlock->info.id.groupId;  // this is the initial group result
3,473✔
612
        } else {
613
          if (pIndefInfo->groupId != pBlock->info.id.groupId) {  // reset output buffer and computing status
850,177✔
614
            pIndefInfo->groupId = pBlock->info.id.groupId;
54,632✔
615
            pIndefInfo->pNextGroupRes = pBlock;
54,632✔
616
            break;
54,632✔
617
          }
618
        }
619

620
        doHandleDataBlock(pOperator, pBlock, downstream, pTaskInfo);
799,018✔
621
        if (pInfo->pRes->info.rows >= pOperator->resultInfo.threshold) {
798,970✔
622
          break;
11,517✔
623
        }
624
      }
625
    }
626

627
    code = doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
184,515✔
628
    QUERY_CHECK_CODE(code, lino, _end);
184,513!
629

630
    size_t rows = pInfo->pRes->info.rows;
184,513✔
631
    if (rows > 0 || pOperator->status == OP_EXEC_DONE) {
184,513✔
632
      break;
633
    } else {
634
      blockDataCleanup(pInfo->pRes);
2,276✔
635
    }
636
  }
637

638
  size_t rows = pInfo->pRes->info.rows;
182,237✔
639
  pOperator->resultInfo.totalRows += rows;
182,237✔
640

641
  if (pOperator->cost.openCost == 0) {
182,237✔
642
    pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
118,592✔
643
  }
644

645
  *pResBlock = (rows > 0) ? pInfo->pRes : NULL;
182,239✔
646

647
_end:
182,239✔
648
  if (code != TSDB_CODE_SUCCESS) {
182,239!
649
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
650
    pTaskInfo->code = code;
×
651
    T_LONG_JMP(pTaskInfo->env, code);
×
652
  }
653
  return code;
182,239✔
654
}
655

656
int32_t initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size) {
2,005,244✔
657
  int32_t code = TSDB_CODE_SUCCESS;
2,005,244✔
658
  for (int32_t j = 0; j < size; ++j) {
13,269,197✔
659
    struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(&pCtx[j]);
11,262,883✔
660
    if (isRowEntryInitialized(pResInfo) || fmIsPseudoColumnFunc(pCtx[j].functionId) || pCtx[j].functionId == -1 ||
12,047,645!
661
        fmIsScalarFunc(pCtx[j].functionId)) {
784,801✔
662
      continue;
11,085,192✔
663
    }
664

665
    code = pCtx[j].fpSet.init(&pCtx[j], pCtx[j].resultInfo);
179,590✔
666
    if (code) {
178,761!
667
      return code;
×
668
    }
669
  }
670

671
  return 0;
2,006,314✔
672
}
673

674
/*
675
 * The start of each column SResultRowEntryInfo is denote by RowCellInfoOffset.
676
 * Note that in case of top/bottom query, the whole multiple rows of result is treated as only one row of results.
677
 * +------------+-----------------result column 1------------+------------------result column 2-----------+
678
 * | SResultRow | SResultRowEntryInfo | intermediate buffer1 | SResultRowEntryInfo | intermediate buffer 2|
679
 * +------------+--------------------------------------------+--------------------------------------------+
680
 *           offset[0]                                  offset[1]                                   offset[2]
681
 */
682
// TODO refactor: some function move away
683
int32_t setFunctionResultOutput(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t stage,
2,005,041✔
684
                             int32_t numOfExprs) {
685
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
2,005,041✔
686
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
2,005,041✔
687
  int32_t*        rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset;
2,005,041✔
688

689
  SResultRowInfo* pResultRowInfo = &pInfo->resultRowInfo;
2,005,041✔
690
  initResultRowInfo(pResultRowInfo);
2,005,041✔
691

692
  int64_t     tid = 0;
2,005,539✔
693
  int64_t     groupId = 0;
2,005,539✔
694
  SResultRow* pRow = doSetResultOutBufByKey(pSup->pResultBuf, pResultRowInfo, (char*)&tid, sizeof(tid), true, groupId,
2,005,539✔
695
                                            pTaskInfo, false, pSup, true);
696
  if (pRow == NULL || pTaskInfo->code != 0) {
2,006,240!
697
    return pTaskInfo->code;
×
698
  }
699

700
  for (int32_t i = 0; i < numOfExprs; ++i) {
13,268,761✔
701
    struct SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, rowEntryInfoOffset);
11,261,666✔
702
    cleanupResultRowEntry(pEntry);
11,262,521✔
703

704
    pCtx[i].resultInfo = pEntry;
11,262,521✔
705
    pCtx[i].scanFlag = stage;
11,262,521✔
706
  }
707

708
  return initCtxOutputBuffer(pCtx, numOfExprs);
2,007,095✔
709
}
710

711
int32_t setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols, SArray** pResList) {
2,004,246✔
712
  QRY_PARAM_CHECK(pResList);
2,004,246!
713
  SArray* pList = taosArrayInit(4, sizeof(int32_t));
2,004,246✔
714
  if (pList == NULL) {
2,006,104✔
715
    return terrno;
334✔
716
  }
717

718
  for (int32_t i = 0; i < numOfCols; ++i) {
13,272,821✔
719
    if (fmIsPseudoColumnFunc(pCtx[i].functionId)) {
11,266,512!
720
      void* px = taosArrayPush(pList, &i);
×
721
      if (px == NULL) {
×
722
        return terrno;
×
723
      }
724
    }
725
  }
726

727
  *pResList = pList;
2,006,309✔
728
  return 0;
2,006,309✔
729
}
730

731
int32_t doGenerateSourceData(SOperatorInfo* pOperator) {
1,681✔
732
  SProjectOperatorInfo* pProjectInfo = pOperator->info;
1,681✔
733

734
  SExprSupp*   pSup = &pOperator->exprSupp;
1,681✔
735
  SSDataBlock* pRes = pProjectInfo->binfo.pRes;
1,681✔
736
  SExprInfo*   pExpr = pSup->pExprInfo;
1,681✔
737
  int64_t      st = taosGetTimestampUs();
1,681✔
738
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1,681✔
739

740
  int32_t code = blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);
1,681✔
741
  if (code) {
1,681!
742
    return code;
×
743
  }
744

745
  for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
3,370✔
746
    int32_t outputSlotId = pExpr[k].base.resSchema.slotId;
1,689✔
747

748
    if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) {
1,689✔
749
      SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, outputSlotId);
1,685✔
750
      if (pColInfoData == NULL) {
1,685!
751
        return terrno;
×
752
      }
753

754
      int32_t type = pExpr[k].base.pParam[0].param.nType;
1,685✔
755
      if (TSDB_DATA_TYPE_NULL == type) {
1,685✔
756
        colDataSetNNULL(pColInfoData, 0, 1);
757
      } else {
758
        code = colDataSetVal(pColInfoData, 0, taosVariantGet(&pExpr[k].base.pParam[0].param, type), false);
1,653✔
759
        if (code) {
1,653!
760
          return code;
×
761
        }
762
      }
763
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) {
4!
764
      SqlFunctionCtx* pfCtx = &pSup->pCtx[k];
4✔
765

766
      // UDF scalar functions will be calculated here, for example, select foo(n) from (select 1 n).
767
      // UDF aggregate functions will be handled in agg operator.
768
      if (fmIsScalarFunc(pfCtx->functionId)) {
4!
769
        SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
4✔
770
        if (pBlockList == NULL) {
4!
771
          return terrno;
×
772
        }
773

774
        void* px = taosArrayPush(pBlockList, &pRes);
4✔
775
        if (px == NULL) {
4!
776
          return terrno;
×
777
        }
778

779
        SColumnInfoData* pResColData = taosArrayGet(pRes->pDataBlock, outputSlotId);
4✔
780
        if (pResColData == NULL) {
4!
781
          return terrno;
×
782
        }
783

784
        SColumnInfoData  idata = {.info = pResColData->info, .hasNull = true};
4✔
785

786
        SScalarParam dest = {.columnData = &idata};
4✔
787
        code = scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest);
4✔
788
        if (code != TSDB_CODE_SUCCESS) {
4!
789
          taosArrayDestroy(pBlockList);
×
790
          return code;
×
791
        }
792

793
        int32_t startOffset = pRes->info.rows;
4✔
794
        if (pRes->info.capacity <= 0) {
4!
795
          qError("project failed at: %s:%d", __func__, __LINE__);
×
796
          return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
797
        }
798
        code = colDataAssign(pResColData, &idata, dest.numOfRows, &pRes->info);
4✔
799
        if (code) {
4!
800
          return code;
×
801
        }
802

803
        colDataDestroy(&idata);
4✔
804
        taosArrayDestroy(pBlockList);
4✔
805
      } else {
806
        return TSDB_CODE_OPS_NOT_SUPPORT;
×
807
      }
808
    } else {
809
      return TSDB_CODE_OPS_NOT_SUPPORT;
×
810
    }
811
  }
812

813
  pRes->info.rows = 1;
1,681✔
814
  code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
1,681✔
815
  if (code) {
1,681!
816
    pTaskInfo->code = code;
×
817
    return code;
×
818
  }
819

820
  (void) doIngroupLimitOffset(&pProjectInfo->limitInfo, 0, pRes, pOperator);
1,681✔
821

822
  pOperator->resultInfo.totalRows += pRes->info.rows;
1,681✔
823

824
  setOperatorCompleted(pOperator);
1,681✔
825
  if (pOperator->cost.openCost == 0) {
1,681!
826
    pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
1,681✔
827
  }
828

829
  return code;
1,681✔
830
}
831

832
static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, SArray* pPseudoList) {
6,499,847✔
833
  size_t num = (pPseudoList != NULL) ? taosArrayGetSize(pPseudoList) : 0;
6,499,847✔
834
  for (int32_t i = 0; i < num; ++i) {
6,500,707!
835
    pCtx[i].pOutput = taosArrayGet(pResult->pDataBlock, i);
×
836
    if (pCtx[i].pOutput == NULL) {
×
837
      qError("failed to get the output buf, ptr is null");
×
838
    }
839
  }
840
}
6,500,707✔
841

842
int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
6,500,097✔
843
                              int32_t numOfOutput, SArray* pPseudoList) {
844
  int32_t lino = 0;
6,500,097✔
845
  int32_t code = TSDB_CODE_SUCCESS;
6,500,097✔
846
  setPseudoOutputColInfo(pResult, pCtx, pPseudoList);
6,500,097✔
847
  pResult->info.dataLoad = 1;
6,500,952✔
848

849
  SArray* processByRowFunctionCtx = NULL;
6,500,952✔
850
  if (pSrcBlock == NULL) {
6,500,952!
851
    for (int32_t k = 0; k < numOfOutput; ++k) {
×
852
      int32_t outputSlotId = pExpr[k].base.resSchema.slotId;
×
853

854
      if (pExpr[k].pExpr->nodeType != QUERY_NODE_VALUE) {
×
855
        qError("project failed at: %s:%d", __func__, __LINE__);
×
856
        code = TSDB_CODE_INVALID_PARA;
×
857
        TSDB_CHECK_CODE(code, lino, _exit);
×
858
      }
859
      SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
×
860
      if (pColInfoData == NULL) {
×
861
        code = terrno;
×
862
        TSDB_CHECK_CODE(code, lino, _exit);
×
863
      }
864

865
      int32_t type = pExpr[k].base.pParam[0].param.nType;
×
866
      if (TSDB_DATA_TYPE_NULL == type) {
×
867
        colDataSetNNULL(pColInfoData, 0, 1);
868
      } else {
869
        code = colDataSetVal(pColInfoData, 0, taosVariantGet(&pExpr[k].base.pParam[0].param, type), false);
×
870
        TSDB_CHECK_CODE(code, lino, _exit);
×
871
      }
872
    }
873

874
    pResult->info.rows = 1;
×
875
    goto _exit;
×
876
  }
877

878
  if (pResult != pSrcBlock) {
6,500,952✔
879
    pResult->info.id.groupId = pSrcBlock->info.id.groupId;
5,293,783✔
880
    memcpy(pResult->info.parTbName, pSrcBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
5,293,783✔
881
    qTrace("%s, parName:%s,groupId:%"PRIu64, __FUNCTION__, pSrcBlock->info.parTbName, pResult->info.id.groupId);
5,293,783✔
882
  }
883

884
  // if the source equals to the destination, it is to create a new column as the result of scalar
885
  // function or some operators.
886
  bool createNewColModel = (pResult == pSrcBlock);
6,499,607✔
887
  if (createNewColModel) {
6,499,607✔
888
    code = blockDataEnsureCapacity(pResult, pResult->info.rows);
1,205,791✔
889
    if (code) {
1,206,238!
890
      TSDB_CHECK_CODE(code, lino, _exit);
×
891
    }
892
  }
893

894
  int32_t numOfRows = 0;
6,500,054✔
895

896
  for (int32_t k = 0; k < numOfOutput; ++k) {
29,073,567✔
897
    int32_t               outputSlotId = pExpr[k].base.resSchema.slotId;
22,689,468✔
898
    SqlFunctionCtx*       pfCtx = &pCtx[k];
22,689,468✔
899
    SInputColumnInfoData* pInputData = &pfCtx->input;
22,689,468✔
900

901
    if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) {  // it is a project query
22,689,468✔
902
      SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
16,237,647✔
903
      if (pColInfoData == NULL) {
16,236,927✔
904
        code = terrno;
169✔
905
        TSDB_CHECK_CODE(code, lino, _exit);
893!
906
      }
907

908
      if (pResult->info.rows > 0 && !createNewColModel) {
16,236,758!
909
        int32_t ret = 0;
×
910

911
        if (pInputData->pData[0] == NULL) {
×
912
          int32_t slotId = pfCtx->param[0].pCol->slotId;
×
913

914
          SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId);
×
915
          if (pInput == NULL) {
×
916
            code = terrno;
×
917
            TSDB_CHECK_CODE(code, lino, _exit);
×
918
          }
919

920
          ret = colDataMergeCol(pColInfoData, pResult->info.rows, (int32_t*)&pResult->info.capacity, pInput,
×
921
                                pSrcBlock->info.rows);
×
922
        } else {
923
          ret = colDataMergeCol(pColInfoData, pResult->info.rows, (int32_t*)&pResult->info.capacity,
×
924
                                pInputData->pData[0], pInputData->numOfRows);
×
925
        }
926

927
        if (ret < 0) {
×
928
          code = ret;
×
929
        }
930

931
        TSDB_CHECK_CODE(code, lino, _exit);
×
932
      } else {
933
        if (pInputData->pData[0] == NULL) {
16,236,758✔
934
          int32_t slotId = pfCtx->param[0].pCol->slotId;
16,509✔
935

936
          SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId);
16,509✔
937
          if (pInput == NULL) {
16,509!
938
            code = terrno;
×
939
            TSDB_CHECK_CODE(code, lino, _exit);
×
940
          }
941

942
          code = colDataAssign(pColInfoData, pInput, pSrcBlock->info.rows, &pResult->info);
16,509✔
943
          numOfRows = pSrcBlock->info.rows;
16,509✔
944
        } else {
945
          code = colDataAssign(pColInfoData, pInputData->pData[0], pInputData->numOfRows, &pResult->info);
16,220,249✔
946
          numOfRows = pInputData->numOfRows;
16,219,913✔
947
        }
948

949
        TSDB_CHECK_CODE(code, lino, _exit);
16,236,422!
950
      }
951
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) {
6,451,821✔
952
      SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
477,583✔
953
      if (pColInfoData == NULL) {
477,680!
UNCOV
954
        code = terrno;
×
955
        TSDB_CHECK_CODE(code, lino, _exit);
×
956
      }
957

958
      int32_t offset = createNewColModel ? 0 : pResult->info.rows;
477,691!
959

960
      int32_t type = pExpr[k].base.pParam[0].param.nType;
477,691✔
961
      if (TSDB_DATA_TYPE_NULL == type) {
477,691✔
962
        colDataSetNNULL(pColInfoData, offset, pSrcBlock->info.rows);
22,686✔
963
      } else {
964
        char* p = taosVariantGet(&pExpr[k].base.pParam[0].param, type);
455,005✔
965
        for (int32_t i = 0; i < pSrcBlock->info.rows; ++i) {
350,110,310✔
966
          code = colDataSetVal(pColInfoData, i + offset, p, false);
349,771,165✔
967
          TSDB_CHECK_CODE(code, lino, _exit);
349,655,325!
968
        }
969
      }
970

971
      numOfRows = pSrcBlock->info.rows;
361,831✔
972
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_OPERATOR) {
5,974,238✔
973
      SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
508,108✔
974
      if (pBlockList == NULL) {
508,510!
975
        code = terrno;
×
976
        TSDB_CHECK_CODE(code, lino, _exit);
693!
977
      }
978

979
      void* px = taosArrayPush(pBlockList, &pSrcBlock);
508,414✔
980
      if (px == NULL) {
508,414!
981
        code = terrno;
×
982
        taosArrayDestroy(pBlockList);
×
983
        TSDB_CHECK_CODE(code, lino, _exit);
×
984
      }
985

986
      SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId);
508,414✔
987
      if (pResColData == NULL) {
508,336!
988
        code = terrno;
×
989
        taosArrayDestroy(pBlockList);
×
990
        TSDB_CHECK_CODE(code, lino, _exit);
×
991
      }
992

993
      SColumnInfoData  idata = {.info = pResColData->info, .hasNull = true};
508,336✔
994

995
      SScalarParam dest = {.columnData = &idata};
508,336✔
996
      code = scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest);
508,336✔
997
      if (code != TSDB_CODE_SUCCESS) {
508,301✔
998
        taosArrayDestroy(pBlockList);
643✔
999
        TSDB_CHECK_CODE(code, lino, _exit);
693!
1000
      }
1001

1002
      int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
507,658✔
1003
      if (pResult->info.capacity <= 0) {
507,658!
1004
        qError("project failed at: %s:%d", __func__, __LINE__);
×
1005
        code = TSDB_CODE_INVALID_PARA;
×
1006
        TSDB_CHECK_CODE(code, lino, _exit);
×
1007
      }
1008

1009
      int32_t ret = colDataMergeCol(pResColData, startOffset, (int32_t*)&pResult->info.capacity, &idata, dest.numOfRows);
507,658✔
1010
      if (ret < 0) {
507,323!
1011
        code = ret;
×
1012
      }
1013

1014
      colDataDestroy(&idata);
507,323✔
1015
      TSDB_CHECK_CODE(code, lino, _exit);
507,645!
1016

1017
      numOfRows = dest.numOfRows;
507,645✔
1018
      taosArrayDestroy(pBlockList);
507,645✔
1019
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) {
5,466,130✔
1020
      // _rowts/_c0, not tbname column
1021
      if (fmIsPseudoColumnFunc(pfCtx->functionId) && (!fmIsScanPseudoColumnFunc(pfCtx->functionId))) {
5,465,590!
1022
        if (fmIsGroupIdFunc(pfCtx->functionId)) {
×
1023
          SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
×
1024
          TSDB_CHECK_NULL(pColInfoData, code, lino, _exit, terrno);
×
1025
          code = colDataSetVal(pColInfoData, pResult->info.rows, (const char*)&pSrcBlock->info.id.groupId, false);
×
1026
          TSDB_CHECK_CODE(code, lino, _exit);
×
1027
        }
1028
      } else if (fmIsIndefiniteRowsFunc(pfCtx->functionId)) {
5,465,291✔
1029
        SResultRowEntryInfo* pResInfo = GET_RES_INFO(pfCtx);
853,801✔
1030
        code = pfCtx->fpSet.init(pfCtx, pResInfo);
853,801✔
1031
        TSDB_CHECK_CODE(code, lino, _exit);
853,803!
1032
        pfCtx->pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId);
853,803✔
1033
        if (pfCtx->pOutput == NULL) {
853,804!
1034
          code = terrno;
×
1035
          TSDB_CHECK_CODE(code, lino, _exit);
×
1036
        }
1037

1038
        pfCtx->offset = createNewColModel ? 0 : pResult->info.rows;  // set the start offset
853,804!
1039

1040
        // set the timestamp(_rowts) output buffer
1041
        if (taosArrayGetSize(pPseudoList) > 0) {
853,804!
1042
          int32_t* outputColIndex = taosArrayGet(pPseudoList, 0);
×
1043
          if (outputColIndex == NULL) {
×
1044
            code = terrno;
×
1045
            TSDB_CHECK_CODE(code, lino, _exit);
×
1046
          }
1047

1048
          pfCtx->pTsOutput = (SColumnInfoData*)pCtx[*outputColIndex].pOutput;
×
1049
        }
1050

1051
        // link pDstBlock to set selectivity value
1052
        if (pfCtx->subsidiaries.num > 0) {
853,805✔
1053
          pfCtx->pDstBlock = pResult;
620,184✔
1054
        }
1055

1056
        code = pfCtx->fpSet.process(pfCtx);
853,805✔
1057
        if (code != TSDB_CODE_SUCCESS) {
853,809✔
1058
          if (pCtx[k].fpSet.cleanup != NULL) {
27!
1059
            pCtx[k].fpSet.cleanup(&pCtx[k]);
×
1060
          }
1061
          TSDB_CHECK_CODE(code, lino, _exit);
27!
1062
        }
1063

1064
        numOfRows = pResInfo->numOfRes;
853,782✔
1065
        if (fmIsProcessByRowFunc(pfCtx->functionId)) {
853,782✔
1066
          if (NULL == processByRowFunctionCtx) {
412,944✔
1067
            processByRowFunctionCtx = taosArrayInit(1, sizeof(SqlFunctionCtx*));
412,249✔
1068
            if (!processByRowFunctionCtx) {
412,249!
1069
              code = terrno;
×
1070
              TSDB_CHECK_CODE(code, lino, _exit);
×
1071
            }
1072
          }
1073

1074
          void* px = taosArrayPush(processByRowFunctionCtx, &pfCtx);
412,507✔
1075
          if (px == NULL) {
412,507!
1076
            code = terrno;
×
1077
            TSDB_CHECK_CODE(code, lino, _exit);
×
1078
          }
1079
        }
1080
      } else if (fmIsAggFunc(pfCtx->functionId)) {
4,609,308✔
1081
        // selective value output should be set during corresponding function execution
1082
        if (fmIsSelectValueFunc(pfCtx->functionId)) {
989,723✔
1083
          continue;
621,596✔
1084
        }
1085
        // _group_key function for "partition by tbname" + csum(col_name) query
1086
        SColumnInfoData* pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId);
368,128✔
1087
        if (pOutput == NULL) {
368,128!
1088
          code = terrno;
×
1089
          TSDB_CHECK_CODE(code, lino, _exit);
×
1090
        }
1091

1092
        int32_t          slotId = pfCtx->param[0].pCol->slotId;
368,128✔
1093

1094
        // todo handle the json tag
1095
        SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId);
368,128✔
1096
        if (pInput == NULL) {
368,128!
1097
          code = terrno;
×
1098
          TSDB_CHECK_CODE(code, lino, _exit);
×
1099
        }
1100

1101
        for (int32_t f = 0; f < pSrcBlock->info.rows; ++f) {
26,590,561✔
1102
          bool isNull = colDataIsNull_s(pInput, f);
26,222,433✔
1103
          if (isNull) {
26,222,433✔
1104
            colDataSetNULL(pOutput, pResult->info.rows + f);
2,586,150✔
1105
          } else {
1106
            char* data = colDataGetData(pInput, f);
23,636,283!
1107
            code = colDataSetVal(pOutput, pResult->info.rows + f, data, isNull);
23,636,283✔
1108
            TSDB_CHECK_CODE(code, lino, _exit);
23,636,283!
1109
          }
1110
        }
1111

1112
      } else {
1113
        SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
3,617,292✔
1114
        if (pBlockList == NULL) {
3,624,027!
1115
          code = terrno;
×
1116
          TSDB_CHECK_CODE(code, lino, _exit);
173!
1117
        }
1118

1119
        void* px = taosArrayPush(pBlockList, &pSrcBlock);
3,622,829✔
1120
        if (px == NULL) {
3,622,829!
1121
          code = terrno;
×
1122
          TSDB_CHECK_CODE(code, lino, _exit);
×
1123
        }
1124

1125
        SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId);
3,622,829✔
1126
        if (pResColData == NULL) {
3,622,427!
1127
          taosArrayDestroy(pBlockList);
×
1128
          code = terrno;
×
1129
          TSDB_CHECK_CODE(code, lino, _exit);
×
1130
        }
1131

1132
        SColumnInfoData  idata = {.info = pResColData->info, .hasNull = true};
3,622,427✔
1133

1134
        SScalarParam dest = {.columnData = &idata};
3,622,427✔
1135
        code = scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest);
3,622,427✔
1136
        if (code != TSDB_CODE_SUCCESS) {
3,620,976!
UNCOV
1137
          taosArrayDestroy(pBlockList);
×
1138
          TSDB_CHECK_CODE(code, lino, _exit);
173!
1139
        }
1140

1141
        int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
3,621,248✔
1142
        if (pResult->info.capacity <= 0) {
3,621,248!
1143
          qError("project failed at: %s:%d", __func__, __LINE__);
×
1144
          code = TSDB_CODE_INVALID_PARA;
×
1145
          TSDB_CHECK_CODE(code, lino, _exit);
×
1146
        }
1147
        int32_t ret = colDataMergeCol(pResColData, startOffset, (int32_t*)&pResult->info.capacity, &idata, dest.numOfRows);
3,621,248✔
1148
        if (ret < 0) {
3,618,948!
1149
          code = ret;
×
1150
        }
1151

1152
        colDataDestroy(&idata);
3,618,948✔
1153

1154
        numOfRows = dest.numOfRows;
3,624,408✔
1155
        taosArrayDestroy(pBlockList);
3,624,408✔
1156
        TSDB_CHECK_CODE(code, lino, _exit);
3,624,632!
1157
      }
1158
    } else {
1159
      return TSDB_CODE_OPS_NOT_SUPPORT;
540✔
1160
    }
1161
  }
1162

1163
  if (processByRowFunctionCtx && taosArrayGetSize(processByRowFunctionCtx) > 0){
6,384,099✔
1164
    SqlFunctionCtx** pfCtx = taosArrayGet(processByRowFunctionCtx, 0);
296,342✔
1165
    if (pfCtx == NULL) {
412,249!
1166
      code = terrno;
×
1167
      TSDB_CHECK_CODE(code, lino, _exit);
×
1168
    }
1169

1170
    code = (*pfCtx)->fpSet.processFuncByRow(processByRowFunctionCtx);
412,249✔
1171
    TSDB_CHECK_CODE(code, lino, _exit);
412,249✔
1172
    numOfRows = (*pfCtx)->resultInfo->numOfRes;
412,218✔
1173
  }
1174

1175
  if (!createNewColModel) {
6,499,975✔
1176
    pResult->info.rows += numOfRows;
5,293,259✔
1177
  }
1178

1179
_exit:
1,206,716✔
1180
  if(processByRowFunctionCtx) {
6,500,899✔
1181
    taosArrayDestroy(processByRowFunctionCtx);
412,249✔
1182
  }
1183
  if(code) {
6,500,899✔
1184
    qError("project apply functions failed at: %s:%d", __func__, lino);
924!
1185
  }
1186
  return code;
6,500,894✔
1187
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc