• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5059

17 May 2026 01:15AM UTC coverage: 73.443% (+0.06%) from 73.387%
#5059

push

travis-ci

web-flow
feat (TDgpt): Dynamic Model Synchronization Enhancements (#35344)

* refactor: do some internal refactor.

* fix: fix multiprocess sync issue.

* feat: add dynamic anomaly detection and forecasting services

* fix: log error message for undeploying model in exception handling

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* fix: handle undeploy when model exists only on disk

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/286aafa0-c3ce-4c27-b803-2707571e9dc1

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: guard dynamic registry concurrent access

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: tighten service list locking scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: restore prophet support and update tests per review feedback

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: improve test name and move copy inside lock scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* Potential fix for pull request finding

Co-au... (continued)

281870 of 383795 relevant lines covered (73.44%)

135516561.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.54
/source/libs/executor/src/projectoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "functionMgt.h"
19
#include "operator.h"
20
#include "querytask.h"
21
#include "taoserror.h"
22
#include "tdatablock.h"
23

24
typedef struct SProjectOperatorInfo {
25
  SOptrBasicInfo binfo;
26
  SAggSupporter  aggSup;
27
  SArray*        pPseudoColInfo;
28
  SLimitInfo     limitInfo;
29
  bool           mergeDataBlocks;
30
  SSDataBlock*   pFinalRes;
31
  bool           inputIgnoreGroup;
32
  bool           outputIgnoreGroup;
33
} SProjectOperatorInfo;
34

35
typedef struct SIndefOperatorInfo {
36
  SOptrBasicInfo binfo;
37
  SAggSupporter  aggSup;
38
  SArray*        pPseudoColInfo;
39
  SExprSupp      scalarSup;
40
  uint64_t       groupId;
41
  SSDataBlock*   pNextGroupRes;
42
} SIndefOperatorInfo;
43

44
static int32_t      doGenerateSourceData(SOperatorInfo* pOperator);
45
static int32_t      doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
46
static int32_t      doApplyIndefinitFunction(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
47
int32_t projectApplyOperator(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, int32_t outputSlotId, int32_t* numOfRows, bool createNewColModel, const void* pExtraParams);
48

49
static bool hasLagLeadFunc(const SExprSupp* pSup) {
9,626,402✔
50
  if (pSup == NULL || pSup->pCtx == NULL) {
9,626,402✔
51
    return false;
×
52
  }
53

54
  for (int32_t i = 0; i < pSup->numOfExprs; ++i) {
31,360,493✔
55
    EFunctionType type = fmGetFuncTypeFromId(pSup->pCtx[i].functionId);
21,770,307✔
56
    if (type == FUNCTION_TYPE_LAG || type == FUNCTION_TYPE_LEAD) {
21,767,885✔
57
      return true;
33,794✔
58
    }
59
  }
60

61
  return false;
9,591,449✔
62
}
63

64
static void destroyProjectOperatorInfo(void* param) {
159,484,071✔
65
  if (NULL == param) {
159,484,071✔
66
    return;
×
67
  }
68

69
  SProjectOperatorInfo* pInfo = (SProjectOperatorInfo*)param;
159,484,071✔
70
  cleanupBasicInfo(&pInfo->binfo);
159,484,071✔
71
  cleanupAggSup(&pInfo->aggSup);
159,481,937✔
72
  taosArrayDestroy(pInfo->pPseudoColInfo);
159,434,772✔
73

74
  blockDataDestroy(pInfo->pFinalRes);
159,449,705✔
75
  taosMemoryFreeClear(param);
159,450,796✔
76
}
77

78
static void destroyIndefinitOperatorInfo(void* param) {
3,502,557✔
79
  SIndefOperatorInfo* pInfo = (SIndefOperatorInfo*)param;
3,502,557✔
80
  if (pInfo == NULL) {
3,502,557✔
81
    return;
×
82
  }
83

84
  cleanupBasicInfo(&pInfo->binfo);
3,502,557✔
85
  taosArrayDestroy(pInfo->pPseudoColInfo);
3,502,557✔
86
  cleanupAggSup(&pInfo->aggSup);
3,502,723✔
87
  cleanupExprSupp(&pInfo->scalarSup);
3,501,555✔
88

89
  taosMemoryFreeClear(param);
3,502,557✔
90
}
91

92
static void cleanupProcessByRowIter(SqlFunctionCtx* pCtx) {
×
93
  SFuncInputRowIter* pIter = &pCtx->rowIter;
×
94

95
  if (pIter->pPrevRowBlock != NULL) {
×
96
    blockDataDestroy(pIter->pPrevRowBlock);
×
97
  }
98
  taosMemoryFreeClear(pIter->pPrevData);
×
99
  taosMemoryFreeClear(pIter->pPrevPk);
×
100
  memset(pIter, 0, sizeof(*pIter));
×
101
}
×
102

103
static int32_t resetProcessByRowCtx(SqlFunctionCtx* pCtx) {
×
104
  SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
×
105
  char*                pOutput = pCtx->pOutput;
×
106

107
  if (pResInfo->initialized && pCtx->fpSet.cleanup != NULL) {
×
108
    pCtx->fpSet.cleanup(pCtx);
×
109
  }
110

111
  cleanupProcessByRowIter(pCtx);
×
112
  pResInfo->initialized = false;
×
113
  pResInfo->numOfRes = 0;
×
114
  pCtx->bInputFinished = false;
×
115

116
  pCtx->pOutput = NULL;
×
117
  int32_t code = pCtx->fpSet.init(pCtx, pResInfo);
×
118
  pCtx->pOutput = pOutput;
×
119
  return code;
×
120
}
121

122
static bool allProcessByRowCtxSameFuncId(SArray* pProcessByRowFunctionCtx) {
×
123
  if (pProcessByRowFunctionCtx == NULL || taosArrayGetSize(pProcessByRowFunctionCtx) <= 1) {
×
124
    return true;
×
125
  }
126

127
  SqlFunctionCtx** ppFirstCtx = taosArrayGet(pProcessByRowFunctionCtx, 0);
×
128
  if (ppFirstCtx == NULL || *ppFirstCtx == NULL) {
×
129
    return false;
×
130
  }
131

132
  int32_t funcId = (*ppFirstCtx)->functionId;
×
133
  for (int32_t i = 1; i < taosArrayGetSize(pProcessByRowFunctionCtx); ++i) {
×
134
    SqlFunctionCtx** ppCtx = taosArrayGet(pProcessByRowFunctionCtx, i);
×
135
    if (ppCtx == NULL || *ppCtx == NULL || (*ppCtx)->functionId != funcId) {
×
136
      return false;
×
137
    }
138
  }
139

140
  return true;
×
141
}
142

143
static int32_t processByRowInExternalWindows(SArray* pGroupedCtxArray, SSDataBlock* pSrcBlock,
×
144
                                             SStreamRuntimeFuncInfo* pStreamInfo) {
145
  int32_t code = TSDB_CODE_SUCCESS;
×
146
  int32_t lino = 0;
×
147

148
  int32_t  ctxNum = taosArrayGetSize(pGroupedCtxArray);
×
149
  int32_t  idxNum = taosArrayGetSize(pStreamInfo->pStreamBlkWinIdx);
×
150
  int32_t  totalRows = 0;
×
151
  SArray*  pInputWinIdx = NULL;
×
152
  int32_t* pStartRows = NULL;
×
153
  int64_t* pNumRows = NULL;
×
154
  int32_t* pOffsets = NULL;
×
155

156
  pInputWinIdx = taosArrayInit(idxNum, sizeof(int64_t));
×
157
  TSDB_CHECK_NULL(pInputWinIdx, code, lino, _exit, terrno);
×
158
  TSDB_CHECK_NULL(taosArrayAddBatch(pInputWinIdx, TARRAY_DATA(pStreamInfo->pStreamBlkWinIdx), idxNum), code, lino,
×
159
                  _exit, terrno);
160

161
  pStartRows = taosMemoryCalloc(ctxNum, sizeof(int32_t));
×
162
  TSDB_CHECK_NULL(pStartRows, code, lino, _exit, terrno);
×
163
  pNumRows = taosMemoryCalloc(ctxNum, sizeof(int64_t));
×
164
  TSDB_CHECK_NULL(pNumRows, code, lino, _exit, terrno);
×
165
  pOffsets = taosMemoryCalloc(ctxNum, sizeof(int32_t));
×
166
  TSDB_CHECK_NULL(pOffsets, code, lino, _exit, terrno);
×
167

168
  for (int32_t i = 0; i < ctxNum; ++i) {
×
169
    SqlFunctionCtx** ppCtx = taosArrayGet(pGroupedCtxArray, i);
×
170
    TSDB_CHECK_NULL(ppCtx, code, lino, _exit, terrno);
×
171
    TSDB_CHECK_NULL(*ppCtx, code, lino, _exit, terrno);
×
172
    pStartRows[i] = (*ppCtx)->input.startRowIndex;
×
173
    pNumRows[i] = (*ppCtx)->input.numOfRows;
×
174
    pOffsets[i] = (*ppCtx)->offset;
×
175
  }
176

177
  taosArrayClear(pStreamInfo->pStreamBlkWinIdx);
×
178

179
  for (int32_t i = 0; i < idxNum; ++i) {
×
180
    int64_t* pCurr = taosArrayGet(pInputWinIdx, i);
×
181
    int32_t* pCurrPair = (int32_t*)pCurr;
×
182
    int32_t  winIdx = pCurrPair[0];
×
183
    int32_t  rowStart = pCurrPair[1];
×
184
    int32_t  rowEnd = pSrcBlock->info.rows;
×
185

186
    if (i + 1 < idxNum) {
×
187
      int64_t* pNext = taosArrayGet(pInputWinIdx, i + 1);
×
188
      rowEnd = ((int32_t*)pNext)[1];
×
189
    }
190

191
    if (rowEnd <= rowStart) {
×
192
      continue;
×
193
    }
194

195
    for (int32_t j = 0; j < ctxNum; ++j) {
×
196
      SqlFunctionCtx** ppCtx = taosArrayGet(pGroupedCtxArray, j);
×
197
      SqlFunctionCtx*  pCtx = *ppCtx;
×
198

199
      pCtx->input.startRowIndex = rowStart;
×
200
      pCtx->input.numOfRows = rowEnd - rowStart;
×
201
      pCtx->offset = pOffsets[j] + totalRows;
×
202
      TAOS_CHECK_EXIT(resetProcessByRowCtx(pCtx));
×
203
    }
204

205
    SqlFunctionCtx** ppFirstCtx = taosArrayGet(pGroupedCtxArray, 0);
×
206
    TAOS_CHECK_EXIT((*ppFirstCtx)->fpSet.processFuncByRow(pGroupedCtxArray));
×
207

208
    int32_t winRows = (*ppFirstCtx)->resultInfo->numOfRes;
×
209
    if (winRows > 0) {
×
210
      int64_t  val = 0;
×
211
      int32_t* pOutPair = (int32_t*)&val;
×
212
      pOutPair[0] = winIdx;
×
213
      pOutPair[1] = totalRows;
×
214
      TSDB_CHECK_NULL(taosArrayPush(pStreamInfo->pStreamBlkWinIdx, &val), code, lino, _exit, terrno);
×
215
      totalRows += winRows;
×
216
    }
217
  }
218

219
  for (int32_t i = 0; i < ctxNum; ++i) {
×
220
    SqlFunctionCtx** ppCtx = taosArrayGet(pGroupedCtxArray, i);
×
221
    SqlFunctionCtx*  pCtx = *ppCtx;
×
222
    pCtx->input.startRowIndex = pStartRows[i];
×
223
    pCtx->input.numOfRows = pNumRows[i];
×
224
    pCtx->offset = pOffsets[i];
×
225
    pCtx->resultInfo->numOfRes = totalRows;
×
226
  }
227

228
_exit:
×
229
  if (pInputWinIdx != NULL) {
×
230
    taosArrayDestroy(pInputWinIdx);
×
231
  }
232
  taosMemoryFreeClear(pStartRows);
×
233
  taosMemoryFreeClear(pNumRows);
×
234
  taosMemoryFreeClear(pOffsets);
×
235
  return code;
×
236
}
237

238
static int32_t assignPlaceHolderInExternalWindows(SColumnInfoData* pResColData, int64_t offset, int64_t rows,
×
239
                                                  int16_t funcId, SStreamRuntimeFuncInfo* pInfo, SNode* pParamNode) {
240
  int32_t code = TSDB_CODE_SUCCESS;
×
241
  int32_t lino = 0;
×
242

243
  int32_t originIdx = pInfo->curIdx;
×
244
  int32_t idxNum = taosArrayGetSize(pInfo->pStreamBlkWinIdx);
×
245
  SArray* pInputWinIdx = taosArrayInit(idxNum, sizeof(int64_t));
×
246
  TSDB_CHECK_NULL(pInputWinIdx, code, lino, _exit, terrno);
×
247
  TSDB_CHECK_NULL(taosArrayAddBatch(pInputWinIdx, TARRAY_DATA(pInfo->pStreamBlkWinIdx), idxNum), code, lino, _exit,
×
248
                  terrno);
249

250
  for (int32_t i = 0; i < idxNum; ++i) {
×
251
    int64_t* pCurr = taosArrayGet(pInputWinIdx, i);
×
252
    int32_t* pCurrPair = (int32_t*)pCurr;
×
253
    int32_t  winIdx = pCurrPair[0];
×
254
    int32_t  rowStart = pCurrPair[1];
×
255
    int32_t  rowEnd = rows;
×
256

257
    if (i + 1 < idxNum) {
×
258
      int64_t* pNext = taosArrayGet(pInputWinIdx, i + 1);
×
259
      rowEnd = ((int32_t*)pNext)[1];
×
260
    }
261

262
    if (rowEnd <= rowStart) {
×
263
      continue;
×
264
    }
265

266
    pInfo->curIdx = winIdx;
×
267
    TAOS_CHECK_EXIT(scalarAssignPlaceHolderRes(pResColData, offset + rowStart, rowEnd - rowStart, funcId, pInfo,
×
268
                                               pParamNode));
269
  }
270

271
_exit:
×
272
  pInfo->curIdx = originIdx;
×
273
  taosArrayDestroy(pInputWinIdx);
×
274
  return code;
×
275
}
276

277
void streamOperatorReleaseState(SOperatorInfo* pOperator) {
×
278
  SOperatorInfo* downstream = pOperator->pDownstream[0];
×
279
  if (downstream->fpSet.releaseStreamStateFn) {
×
280
    downstream->fpSet.releaseStreamStateFn(downstream);
×
281
  }
282
}
×
283

284
void streamOperatorReloadState(SOperatorInfo* pOperator) {
×
285
  SOperatorInfo* downstream = pOperator->pDownstream[0];
×
286
  if (downstream->fpSet.reloadStreamStateFn) {
×
287
    downstream->fpSet.reloadStreamStateFn(downstream);
×
288
  }
289
}
×
290

291
static int32_t resetProjectOperState(SOperatorInfo* pOper) {
10,155,706✔
292
  SProjectOperatorInfo* pProject = pOper->info;
10,155,706✔
293
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
10,155,981✔
294
  pOper->status = OP_NOT_OPENED;
10,155,706✔
295

296
  resetBasicOperatorState(&pProject->binfo);
10,155,706✔
297
  SProjectPhysiNode* pPhynode = (SProjectPhysiNode*)pOper->pPhyNode;
10,156,257✔
298

299
  pProject->limitInfo = (SLimitInfo){0};
10,156,257✔
300
  initLimitInfo(pPhynode->node.pLimit, pPhynode->node.pSlimit, &pProject->limitInfo);
10,156,257✔
301

302
  blockDataCleanup(pProject->pFinalRes);
10,156,257✔
303

304
  int32_t code = resetAggSup(&pOper->exprSupp, &pProject->aggSup, pTaskInfo, pPhynode->pProjections, NULL,
20,309,002✔
305
    sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, NULL,
10,156,257✔
306
    &pTaskInfo->storageAPI.functionStore);
307
  if (code == 0){
10,154,640✔
308
    code = setFunctionResultOutput(pOper, &pProject->binfo, &pProject->aggSup, MAIN_SCAN, pOper->exprSupp.numOfExprs);
10,155,472✔
309
  }
310
  return 0;
10,154,836✔
311
}
312

313
int32_t createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, SExecTaskInfo* pTaskInfo,
159,437,941✔
314
                                  SOperatorInfo** pOptrInfo) {
315
  QRY_PARAM_CHECK(pOptrInfo);
159,437,941✔
316

317
  int32_t code = TSDB_CODE_SUCCESS;
159,450,208✔
318
  SProjectOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SProjectOperatorInfo));
159,450,208✔
319
  SOperatorInfo*        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
159,230,592✔
320
  if (pInfo == NULL || pOperator == NULL) {
159,363,717✔
321
    code = terrno;
173✔
322
    goto _error;
×
323
  }
324

325
  pOperator->pPhyNode = pProjPhyNode;
159,369,791✔
326
  pOperator->exprSupp.hasWindowOrGroup = false;
159,380,577✔
327
  pOperator->pTaskInfo = pTaskInfo;
159,435,044✔
328
  initOperatorCostInfo(pOperator);
159,468,423✔
329

330
  int32_t    lino = 0;
159,462,716✔
331

332
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pProjPhyNode->node.pOutputDataBlockDesc);
159,462,716✔
333
  TSDB_CHECK_NULL(pResBlock, code, lino, _error, terrno);
159,508,716✔
334

335
  initLimitInfo(pProjPhyNode->node.pLimit, pProjPhyNode->node.pSlimit, &pInfo->limitInfo);
159,508,716✔
336

337
  pInfo->binfo.pRes = pResBlock;
159,470,539✔
338
  pInfo->pFinalRes = NULL;
159,495,943✔
339

340
  code = createOneDataBlock(pResBlock, false, &pInfo->pFinalRes);
159,481,128✔
341
  TSDB_CHECK_CODE(code, lino, _error);
159,451,823✔
342

343
  pInfo->binfo.inputTsOrder = pProjPhyNode->node.inputTsOrder;
159,451,823✔
344
  pInfo->binfo.outputTsOrder = pProjPhyNode->node.outputTsOrder;
159,446,981✔
345
  pInfo->inputIgnoreGroup = pProjPhyNode->inputIgnoreGroup;
159,456,394✔
346
  pInfo->outputIgnoreGroup = pProjPhyNode->ignoreGroupId;
159,415,998✔
347

348
  if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {
159,443,657✔
349
    pInfo->mergeDataBlocks = false;
385,176✔
350
  } else {
351
    if (!pProjPhyNode->ignoreGroupId) {
158,928,559✔
352
      pInfo->mergeDataBlocks = false;
1,942,529✔
353
    } else {
354
      pInfo->mergeDataBlocks = pProjPhyNode->mergeDataBlock;
157,088,121✔
355
    }
356
  }
357

358
  int32_t numOfRows = 4096;
159,409,561✔
359
  size_t  keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
159,409,561✔
360

361
  // Make sure the size of SSDataBlock will never exceed the size of 2MB.
362
  int32_t TWOMB = 2 * 1024 * 1024;
159,409,561✔
363
  if (numOfRows * pResBlock->info.rowSize > TWOMB) {
159,409,561✔
364
    numOfRows = TWOMB / pResBlock->info.rowSize;
5,301,273✔
365
  }
366

367
  initResultSizeInfo(&pOperator->resultInfo, numOfRows);
159,454,693✔
368
  
369
  int32_t    numOfCols = 0;
159,412,577✔
370
  SExprInfo* pExprInfo = NULL;
159,414,049✔
371
  code = createExprInfo(pProjPhyNode->pProjections, NULL, &pExprInfo, &numOfCols);
159,438,362✔
372
  TSDB_CHECK_CODE(code, lino, _error);
159,417,605✔
373
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
159,107,624✔
374
                    NULL, &pTaskInfo->storageAPI.functionStore);
375
  TSDB_CHECK_CODE(code, lino, _error);
159,110,118✔
376

377
  initBasicInfo(&pInfo->binfo, pResBlock);
159,110,118✔
378
  code = setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfCols);
159,135,563✔
379
  TSDB_CHECK_CODE(code, lino, _error);
159,093,453✔
380

381
  code = filterInitFromNode((SNode*)pProjPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
159,126,172✔
382
                            pTaskInfo->pStreamRuntimeInfo);
159,093,453✔
383
  TSDB_CHECK_CODE(code, lino, _error);
159,081,175✔
384

385
  code = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfCols, &pInfo->pPseudoColInfo);
159,081,175✔
386
  TSDB_CHECK_CODE(code, lino, _error);
159,072,691✔
387

388
  setOperatorInfo(pOperator, "ProjectOperator", QUERY_NODE_PHYSICAL_PLAN_PROJECT, false, OP_NOT_OPENED, pInfo,
159,072,691✔
389
                  pTaskInfo);
390
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doProjectOperation, NULL, destroyProjectOperatorInfo,
159,118,749✔
391
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
392
  setOperatorStreamStateFn(pOperator, streamOperatorReleaseState, streamOperatorReloadState);
159,134,015✔
393
  setOperatorResetStateFn(pOperator, resetProjectOperState);
159,106,203✔
394

395
  if (NULL != downstream) {
159,120,274✔
396
    code = appendDownstream(pOperator, &downstream, 1);
157,106,503✔
397
    if (code != TSDB_CODE_SUCCESS) {
156,990,573✔
398
      goto _error;
×
399
    }
400
  }
401

402
  *pOptrInfo = pOperator;
159,004,344✔
403
  return TSDB_CODE_SUCCESS;
159,033,719✔
404

405
_error:
309,981✔
406
  if (pInfo != NULL) destroyProjectOperatorInfo(pInfo);
309,981✔
407
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
309,981✔
408
  pTaskInfo->code = code;
309,981✔
409
  return code;
309,981✔
410
}
411

412
static int32_t discardGroupDataBlock(SSDataBlock* pBlock, SLimitInfo* pLimitInfo) {
354,420,136✔
413
  if (pLimitInfo->remainGroupOffset > 0) {
354,420,136✔
414
    // it is the first group
415
    if (pLimitInfo->currentGroupId == 0 || pLimitInfo->currentGroupId == pBlock->info.id.groupId) {
1,110,245✔
416
      pLimitInfo->currentGroupId = pBlock->info.id.groupId;
295,745✔
417
      return PROJECT_RETRIEVE_CONTINUE;
295,745✔
418
    } else if (pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
814,500✔
419
      // now it is the data from a new group
420
      pLimitInfo->remainGroupOffset -= 1;
814,500✔
421
      pLimitInfo->currentGroupId = pBlock->info.id.groupId;
814,500✔
422

423
      // ignore data block in current group
424
      if (pLimitInfo->remainGroupOffset > 0) {
814,500✔
425
        return PROJECT_RETRIEVE_CONTINUE;
763,009✔
426
      }
427

428
      pLimitInfo->currentGroupId = 0;
51,491✔
429
    }
430
  }
431

432
  return PROJECT_RETRIEVE_DONE;
353,348,227✔
433
}
434

435
static int32_t setInfoForNewGroup(SSDataBlock* pBlock, SLimitInfo* pLimitInfo, SOperatorInfo* pOperator) {
353,345,564✔
436
  // remainGroupOffset == 0
437
  // here check for a new group data, we need to handle the data of the previous group.
438
  if (!(pLimitInfo->remainGroupOffset == 0 || pLimitInfo->remainGroupOffset == -1)) {
353,345,564✔
439
    qError("project failed at: %s:%d", __func__, __LINE__);
×
440
    return TSDB_CODE_INVALID_PARA;
×
441
  }
442

443
  bool newGroup = false;
353,398,246✔
444
  if (0 == pBlock->info.id.groupId) {
353,398,246✔
445
    pLimitInfo->numOfOutputGroups = 1;
314,802,666✔
446
  } else if (pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
38,605,114✔
447
    pLimitInfo->currentGroupId = pBlock->info.id.groupId;
34,143,257✔
448
    pLimitInfo->numOfOutputGroups += 1;
34,140,860✔
449
    newGroup = true;
34,140,137✔
450
  } else {
451
    return PROJECT_RETRIEVE_CONTINUE;
4,474,376✔
452
  }
453

454
  if ((pLimitInfo->slimit.limit >= 0) && (pLimitInfo->slimit.limit < pLimitInfo->numOfOutputGroups)) {
348,904,090✔
455
    setOperatorCompleted(pOperator);
157,873✔
456
    return PROJECT_RETRIEVE_DONE;
157,873✔
457
  }
458

459
  // reset the value for a new group data
460
  // existing rows that belongs to previous group.
461
  if (newGroup) {
348,737,057✔
462
    resetLimitInfoForNextGroup(pLimitInfo);
33,974,311✔
463
  }
464

465
  return PROJECT_RETRIEVE_CONTINUE;
348,657,341✔
466
}
467

468
// todo refactor
469
static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SSDataBlock* pBlock,
354,017,292✔
470
                                    SOperatorInfo* pOperator) {
471
  // set current group id
472
  pLimitInfo->currentGroupId = groupId;
354,017,292✔
473
  bool limitReached = applyLimitOffset(pLimitInfo, pBlock, pOperator->pTaskInfo);
354,074,364✔
474
  if (pBlock->info.rows == 0 && 0 != pLimitInfo->limit.limit) {
354,077,786✔
475
    return PROJECT_RETRIEVE_CONTINUE;
5,336,933✔
476
  } else {
477
    if (limitReached && (pLimitInfo->slimit.limit >= 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
348,735,819✔
478
      setOperatorCompleted(pOperator);
89,350✔
479
    } else if (limitReached && groupId == 0) {
348,646,469✔
480
      setOperatorCompleted(pOperator);
8,741,804✔
481
    }
482
  }
483

484
  return PROJECT_RETRIEVE_DONE;
348,583,013✔
485
}
486

487
int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
598,154,887✔
488
  QRY_PARAM_CHECK(pResBlock);
598,154,887✔
489

490
  SProjectOperatorInfo* pProjectInfo = pOperator->info;
598,223,480✔
491
  SOptrBasicInfo*       pInfo = &pProjectInfo->binfo;
598,207,767✔
492
  SExprSupp*            pSup = &pOperator->exprSupp;
598,273,894✔
493
  SSDataBlock*          pRes = pInfo->pRes;
598,244,366✔
494
  SSDataBlock*          pFinalRes = pProjectInfo->pFinalRes;
598,243,182✔
495
  int32_t               code = 0;
598,223,980✔
496
  int32_t               lino = 0;
598,223,980✔
497
  int32_t               order = pInfo->inputTsOrder;
598,223,980✔
498
  int32_t               scanFlag = 0;
598,257,259✔
499

500
  blockDataCleanup(pFinalRes);
598,257,259✔
501
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
598,174,835✔
502
  bool           withExternalWindow = pTaskInfo->pStreamRuntimeInfo != NULL &&
613,325,203✔
503
                                      pTaskInfo->pStreamRuntimeInfo->funcInfo.withExternalWindow;
15,184,522✔
504

505
  if (pOperator->status == OP_EXEC_DONE) {
598,116,639✔
506
    return code;
90,928,831✔
507
  }
508

509
  SOperatorInfo* downstream = pOperator->numOfDownstream > 0 ? pOperator->pDownstream[0] : NULL;
507,254,191✔
510
  SLimitInfo*    pLimitInfo = &pProjectInfo->limitInfo;
507,270,452✔
511

512
  if (downstream == NULL) {
507,298,822✔
513
    code = doGenerateSourceData(pOperator);
2,035,979✔
514
    QUERY_CHECK_CODE(code, lino, _end);
2,035,979✔
515

516
    if (pProjectInfo->outputIgnoreGroup) {
2,035,979✔
517
      pRes->info.id.groupId = 0;
2,035,979✔
518
    }
519

520
    *pResBlock = (pRes->info.rows > 0)? pRes:NULL;
2,035,979✔
521
    return code;
2,035,979✔
522
  }
523

524
  while (1) {
129,979,715✔
525
    while (1) {
6,395,687✔
526
      blockDataCleanup(pRes);
641,638,245✔
527

528
      // The downstream exec may change the value of the newgroup, so use a local variable instead.
529
      SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
641,595,220✔
530
      if (pBlock == NULL) {
638,679,593✔
531
        qDebug("set op close, exec %d, status %d rows %" PRId64 , pTaskInfo->execModel, pOperator->status, pFinalRes->info.rows);
284,390,704✔
532
        setOperatorCompleted(pOperator);
284,403,026✔
533
        break;
284,380,835✔
534
      }
535
//      if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {
536
//        qDebug("set status recv");
537
//        pOperator->status = OP_EXEC_RECV;
538
//      }
539

540
      if (pProjectInfo->inputIgnoreGroup) {
354,288,889✔
541
        pBlock->info.id.groupId = 0;
5,663,982✔
542
      }
543

544
      int32_t status = discardGroupDataBlock(pBlock, pLimitInfo);
354,300,623✔
545
      if (status == PROJECT_RETRIEVE_CONTINUE) {
354,418,886✔
546
        continue;
1,058,754✔
547
      }
548

549
      (void) setInfoForNewGroup(pBlock, pLimitInfo, pOperator);
353,360,132✔
550
      if (pOperator->status == OP_EXEC_DONE) {
353,299,043✔
551
        break;
157,873✔
552
      }
553

554
      if (pProjectInfo->mergeDataBlocks) {
353,220,076✔
555
        pFinalRes->info.scanFlag = scanFlag = pBlock->info.scanFlag;
142,599,312✔
556
      } else {
557
        pRes->info.scanFlag = scanFlag = pBlock->info.scanFlag;
210,631,146✔
558
      }
559

560
      code = setInputDataBlock(pSup, pBlock, order, scanFlag, false);
353,115,796✔
561
      QUERY_CHECK_CODE(code, lino, _end);
353,240,232✔
562

563
      code = blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
353,240,232✔
564
      QUERY_CHECK_CODE(code, lino, _end);
353,141,830✔
565

566
      code = projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs,
706,223,527✔
567
                                   pProjectInfo->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo),
353,253,205✔
568
                                   pOperator->pTaskInfo);
569
      QUERY_CHECK_CODE(code, lino, _end);
353,222,711✔
570

571
      status = doIngroupLimitOffset(pLimitInfo, pBlock->info.id.groupId, pInfo->pRes, pOperator);
352,005,399✔
572
      if (status == PROJECT_RETRIEVE_CONTINUE) {
351,914,306✔
573
        continue;
5,336,933✔
574
      }
575

576
      break;
346,577,373✔
577
    }
578

579
    if (pProjectInfo->mergeDataBlocks) {
631,116,081✔
580
      if (pRes->info.rows > 0) {
247,290,275✔
581
        pFinalRes->info.id.groupId = 0;  // clear groupId
139,019,617✔
582
        pFinalRes->info.version = pRes->info.version;
139,019,617✔
583
        // keep baseGId from current upstream block; already set above for this merge round
584

585
        // continue merge data, ignore the group id
586
        code = blockDataMerge(pFinalRes, pRes);
139,019,617✔
587
        QUERY_CHECK_CODE(code, lino, _end);
139,020,443✔
588

589
        if (!withExternalWindow && pFinalRes->info.rows + pRes->info.rows <= pOperator->resultInfo.threshold &&
139,020,443✔
590
            (pOperator->status != OP_EXEC_DONE)) {
135,149,291✔
591
          continue;
129,938,909✔
592
        }
593
      }
594

595
      // do apply filter
596
      if (pOperator->exprSupp.pFilterInfo != NULL) {
117,351,366✔
597
        filterSetExecContext(pOperator->exprSupp.pFilterInfo, pOperator->pTaskInfo, isTaskKilled);
2,359✔
598
      }
599
      code = doFilter(pFinalRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
117,350,540✔
600
      QUERY_CHECK_CODE(code, lino, _end);
117,350,540✔
601

602
      // when apply the limit/offset for each group, pRes->info.rows may be 0, due to limit constraint.
603
      if (pFinalRes->info.rows > 0 || (pOperator->status == OP_EXEC_DONE)) {
117,350,540✔
604
        qDebug("project return %" PRId64 " rows, status %d", pFinalRes->info.rows, pOperator->status);
117,309,734✔
605
        break;
117,309,734✔
606
      }
607
    } else {
608
      // do apply filter
609
      if (pRes->info.rows > 0) {
383,833,848✔
610
        code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
207,580,463✔
611
        QUERY_CHECK_CODE(code, lino, _end);
207,430,741✔
612

613
        if (pRes->info.rows == 0) {
207,430,741✔
614
          continue;
×
615
        }
616
      }
617

618
      // no results generated
619
      break;
383,802,796✔
620
    }
621
  }
622

623
  SSDataBlock* p = pProjectInfo->mergeDataBlocks ? pFinalRes : pRes;
501,112,530✔
624
  p->info.dataLoad = 1;
501,211,716✔
625

626
  if (pProjectInfo->outputIgnoreGroup) {
501,173,640✔
627
    p->info.id.groupId = 0;
483,403,798✔
628
  }
629

630
  if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
501,211,649✔
631
    printDataBlock(p, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo), pTaskInfo->id.queryId);
8,945,972✔
632
  }
633

634
  *pResBlock = (p->info.rows > 0)? p:NULL;
501,224,109✔
635

636
_end:
502,438,335✔
637
  if (code != TSDB_CODE_SUCCESS) {
502,438,335✔
638
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
1,217,312✔
639
    pTaskInfo->code = code;
1,217,312✔
640
    T_LONG_JMP(pTaskInfo->env, code);
1,217,312✔
641
  }
642
  return code;
501,221,023✔
643
}
644

645
static int32_t resetIndefinitOutputOperState(SOperatorInfo* pOper) {
502✔
646
  SIndefOperatorInfo* pInfo = pOper->info;
502✔
647
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
502✔
648
  SIndefRowsFuncPhysiNode* pPhynode = (SIndefRowsFuncPhysiNode*)pOper->pPhyNode;
502✔
649
  pOper->status = OP_NOT_OPENED;
502✔
650

651
  resetBasicOperatorState(&pInfo->binfo);
502✔
652

653
  pInfo->groupId = 0;
502✔
654
  pInfo->pNextGroupRes = NULL;
502✔
655
  int32_t code = resetAggSup(&pOper->exprSupp, &pInfo->aggSup, pTaskInfo, pPhynode->pFuncs, NULL,
1,004✔
656
    sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, NULL,
502✔
657
    &pTaskInfo->storageAPI.functionStore);
658
  if (code == 0){
502✔
659
    code = setFunctionResultOutput(pOper, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, pOper->exprSupp.numOfExprs);
502✔
660
  }
661

662
  if (code == 0) {
502✔
663
    code = resetExprSupp(&pInfo->scalarSup, pTaskInfo, pPhynode->pExprs, NULL,
502✔
664
                         &pTaskInfo->storageAPI.functionStore);
665
  }
666
  return 0;
502✔
667
}
668

669
int32_t createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode,
3,503,141✔
670
                                                 SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
671
  QRY_PARAM_CHECK(pOptrInfo);
3,503,141✔
672
  int32_t code = 0;
3,501,998✔
673
  int32_t lino = 0;
3,501,998✔
674
  int32_t numOfRows = 4096;
3,501,998✔
675
  size_t  keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
3,501,998✔
676

677
  SIndefOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIndefOperatorInfo));
3,501,998✔
678
  SOperatorInfo*      pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
3,501,303✔
679
  if (pInfo == NULL || pOperator == NULL) {
3,501,887✔
680
    code = terrno;
×
681
    goto _error;
×
682
  }
683

684
  pOperator->pPhyNode = pNode;
3,501,887✔
685
  pOperator->pTaskInfo = pTaskInfo;
3,501,887✔
686
  initOperatorCostInfo(pOperator);
3,502,723✔
687

688
  SExprSupp* pSup = &pOperator->exprSupp;
3,502,723✔
689
  pSup->hasWindowOrGroup = false;
3,502,723✔
690

691
  SIndefRowsFuncPhysiNode* pPhyNode = (SIndefRowsFuncPhysiNode*)pNode;
3,502,305✔
692

693
  if (pPhyNode->pExprs != NULL) {
3,502,305✔
694
    int32_t    num = 0;
25,773✔
695
    SExprInfo* pSExpr = NULL;
25,773✔
696
    code = createExprInfo(pPhyNode->pExprs, NULL, &pSExpr, &num);
25,773✔
697
    QUERY_CHECK_CODE(code, lino, _error);
25,773✔
698

699
    code = initExprSupp(&pInfo->scalarSup, pSExpr, num, &pTaskInfo->storageAPI.functionStore);
25,773✔
700
    if (code != TSDB_CODE_SUCCESS) {
25,773✔
701
      goto _error;
×
702
    }
703
  }
704

705
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->node.pOutputDataBlockDesc);
3,502,723✔
706
  TSDB_CHECK_NULL(pResBlock, code, lino, _error, terrno);
3,502,723✔
707

708
  // Make sure the size of SSDataBlock will never exceed the size of 2MB.
709
  int32_t TWOMB = 2 * 1024 * 1024;
3,502,723✔
710
  if (numOfRows * pResBlock->info.rowSize > TWOMB) {
3,502,723✔
711
    numOfRows = TWOMB / pResBlock->info.rowSize;
×
712
  }
713

714
  initBasicInfo(&pInfo->binfo, pResBlock);
3,502,305✔
715
  initResultSizeInfo(&pOperator->resultInfo, numOfRows);
3,502,723✔
716
  code = blockDataEnsureCapacity(pResBlock, numOfRows);
3,502,723✔
717
  TSDB_CHECK_CODE(code, lino, _error);
3,503,141✔
718

719
  int32_t    numOfExpr = 0;
3,503,141✔
720
  SExprInfo* pExprInfo = NULL;
3,503,141✔
721
  code = createExprInfo(pPhyNode->pFuncs, NULL, &pExprInfo, &numOfExpr);
3,502,557✔
722
  TSDB_CHECK_CODE(code, lino, _error);
3,501,721✔
723

724
  code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str,
3,501,721✔
725
                            NULL, &pTaskInfo->storageAPI.functionStore);
726
  TSDB_CHECK_CODE(code, lino, _error);
3,501,887✔
727

728
  code = setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr);
3,501,887✔
729
  TSDB_CHECK_CODE(code, lino, _error);
3,500,151✔
730

731
  code = filterInitFromNode((SNode*)pPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
3,502,130✔
732
                            pTaskInfo->pStreamRuntimeInfo);
3,500,151✔
733
  TSDB_CHECK_CODE(code, lino, _error);
3,500,492✔
734

735
  pInfo->binfo.pRes = pResBlock;
3,500,492✔
736
  pInfo->binfo.inputTsOrder = pNode->inputTsOrder;
3,501,469✔
737
  pInfo->binfo.outputTsOrder = pNode->outputTsOrder;
3,500,735✔
738
  code = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr, &pInfo->pPseudoColInfo);
3,498,731✔
739
  TSDB_CHECK_CODE(code, lino, _error);
3,500,317✔
740

741
  setOperatorInfo(pOperator, "IndefinitOperator", QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC, false, OP_NOT_OPENED, pInfo,
3,500,317✔
742
                  pTaskInfo);
743
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doApplyIndefinitFunction, NULL, destroyIndefinitOperatorInfo,
3,501,162✔
744
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
745
                                         
746
  setOperatorResetStateFn(pOperator, resetIndefinitOutputOperState);
3,499,874✔
747
  code = appendDownstream(pOperator, &downstream, 1);
3,499,324✔
748
  if (code != TSDB_CODE_SUCCESS) {
3,500,292✔
749
    goto _error;
×
750
  }
751

752
  *pOptrInfo = pOperator;
3,500,292✔
753
  return TSDB_CODE_SUCCESS;
3,499,874✔
754

755
_error:
×
756
  if (pInfo != NULL) destroyIndefinitOperatorInfo(pInfo);
×
757
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
758
  pTaskInfo->code = code;
×
759
  return code;
×
760
}
761

762
static void doHandleDataBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, SOperatorInfo* downstream,
53,852,936✔
763
                              SExecTaskInfo* pTaskInfo) {
764
  SIndefOperatorInfo* pIndefInfo = pOperator->info;
53,852,936✔
765
  SOptrBasicInfo*     pInfo = &pIndefInfo->binfo;
53,852,936✔
766
  SExprSupp*          pSup = &pOperator->exprSupp;
53,852,936✔
767

768
  int32_t order = pInfo->inputTsOrder;
53,852,936✔
769
  int32_t scanFlag = pBlock->info.scanFlag;
53,852,936✔
770
  int32_t code = TSDB_CODE_SUCCESS;
53,852,936✔
771

772
  // there is an scalar expression that needs to be calculated before apply the group aggregation.
773
  SExprSupp* pScalarSup = &pIndefInfo->scalarSup;
53,852,936✔
774
  if (pScalarSup->pExprInfo != NULL) {
53,852,936✔
775
    code = projectApplyFunctions(pScalarSup->pExprInfo, pBlock, pBlock, pScalarSup->pCtx, pScalarSup->numOfExprs,
156,864✔
776
                                 pIndefInfo->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo),
78,432✔
777
                                 pOperator->pTaskInfo);
778
    if (code != TSDB_CODE_SUCCESS) {
78,432✔
779
      T_LONG_JMP(pTaskInfo->env, code);
×
780
    }
781
  }
782

783
  code = setInputDataBlock(pSup, pBlock, order, scanFlag, false);
53,852,936✔
784
  if (code) {
53,852,936✔
785
    T_LONG_JMP(pTaskInfo->env, code);
×
786
  }
787

788
  code = blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
53,852,936✔
789
  if (code != TSDB_CODE_SUCCESS) {
53,852,936✔
790
    T_LONG_JMP(pTaskInfo->env, code);
×
791
  }
792

793
  code = projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs,
107,705,872✔
794
                               pIndefInfo->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo),
53,852,936✔
795
                               pOperator->pTaskInfo);
796
  if (code != TSDB_CODE_SUCCESS) {
53,852,936✔
797
    T_LONG_JMP(pTaskInfo->env, code);
20,287✔
798
  }
799
}
53,832,649✔
800

801
int32_t doApplyIndefinitFunction(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
11,852,240✔
802
  QRY_PARAM_CHECK(pResBlock);
11,852,240✔
803
  SIndefOperatorInfo* pIndefInfo = pOperator->info;
11,853,242✔
804
  SOptrBasicInfo*     pInfo = &pIndefInfo->binfo;
11,852,824✔
805
  SExprSupp*          pSup = &pOperator->exprSupp;
11,853,669✔
806
  int32_t             code = TSDB_CODE_SUCCESS;
11,853,669✔
807
  int32_t             lino = 0;
11,853,669✔
808
  SSDataBlock*        pRes = pInfo->pRes;
11,853,669✔
809

810
  blockDataCleanup(pRes);
11,851,997✔
811

812
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
11,853,503✔
813
  if (pOperator->status == OP_EXEC_DONE) {
11,852,240✔
814
    return code;
2,226,422✔
815
  }
816

817
  SOperatorInfo* downstream = pOperator->pDownstream[0];
9,626,654✔
818
  bool           noSplitOutput = hasLagLeadFunc(pSup);
9,624,721✔
819

820
  while (1) {
3,204,777✔
821
    // here we need to handle the existsed group results
822
    if (pIndefInfo->pNextGroupRes != NULL) {  // todo extract method
12,830,438✔
823
      for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
23,413,413✔
824
        SqlFunctionCtx* pCtx = &pSup->pCtx[k];
17,362,964✔
825

826
        SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
17,362,964✔
827
        if (pResInfo->initialized && pCtx->fpSet.cleanup != NULL) {
17,362,964✔
828
          pCtx->fpSet.cleanup(pCtx);
2,454✔
829
        }
830
        pResInfo->initialized = false;
17,362,964✔
831
        pCtx->pOutput = NULL;
17,362,964✔
832
      }
833

834
      doHandleDataBlock(pOperator, pIndefInfo->pNextGroupRes, downstream, pTaskInfo);
6,050,449✔
835
      pIndefInfo->pNextGroupRes = NULL;
6,050,449✔
836
    }
837

838
    if (noSplitOutput || pInfo->pRes->info.rows < pOperator->resultInfo.threshold) {
12,830,020✔
839
      while (1) {
44,473,786✔
840
        // The downstream exec may change the value of the newgroup, so use a local variable instead.
841
        SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
57,304,808✔
842
        if (pBlock == NULL) {
57,306,228✔
843
          setOperatorCompleted(pOperator);
3,435,282✔
844
          break;
3,435,282✔
845
        }
846
        pInfo->pRes->info.scanFlag = pBlock->info.scanFlag;
53,870,946✔
847

848
        if (pIndefInfo->groupId == 0 && pBlock->info.id.groupId != 0) {
53,870,946✔
849
          pIndefInfo->groupId = pBlock->info.id.groupId;  // this is the initial group result
446,994✔
850
        } else {
851
          if (pIndefInfo->groupId != pBlock->info.id.groupId) {  // reset output buffer and computing status
53,423,952✔
852
            pIndefInfo->groupId = pBlock->info.id.groupId;
6,068,459✔
853
            pIndefInfo->pNextGroupRes = pBlock;
6,068,459✔
854
            break;
6,068,459✔
855
          }
856
        }
857

858
        doHandleDataBlock(pOperator, pBlock, downstream, pTaskInfo);
47,802,487✔
859
        // External-window outputs carry per-window row ranges in stream runtime state.
860
        // Return as soon as this operator has a result block so the downstream state
861
        // still matches the block we are about to hand back to the runner.
862
        if (pTaskInfo->pStreamRuntimeInfo != NULL && pTaskInfo->pStreamRuntimeInfo->funcInfo.withExternalWindow &&
47,782,200✔
863
            pInfo->pRes->info.rows > 0) {
×
864
          break;
×
865
        }
866
        if (!noSplitOutput && pInfo->pRes->info.rows >= pOperator->resultInfo.threshold) {
47,782,200✔
867
          break;
3,308,414✔
868
        }
869
      }
870
    }
871

872
    code = doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
12,813,418✔
873
    QUERY_CHECK_CODE(code, lino, _end);
12,812,155✔
874

875
    size_t rows = pInfo->pRes->info.rows;
12,812,155✔
876
    if (rows > 0 || pOperator->status == OP_EXEC_DONE) {
12,812,155✔
877
      break;
878
    } else {
879
      blockDataCleanup(pInfo->pRes);
3,204,777✔
880
    }
881
  }
882

883
  *pResBlock = (pInfo->pRes->info.rows> 0) ? pInfo->pRes : NULL;
9,607,378✔
884

885
_end:
9,607,378✔
886
  if (code != TSDB_CODE_SUCCESS) {
9,607,378✔
887
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
888
    pTaskInfo->code = code;
×
889
    T_LONG_JMP(pTaskInfo->env, code);
×
890
  }
891
  return code;
9,607,378✔
892
}
893

894
int32_t initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size) {
172,788,212✔
895
  int32_t code = TSDB_CODE_SUCCESS;
172,788,212✔
896
  for (int32_t j = 0; j < size; ++j) {
812,936,916✔
897
    struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(&pCtx[j]);
640,295,770✔
898
    if (isRowEntryInitialized(pResInfo) || fmIsPseudoColumnFunc(pCtx[j].functionId) || pCtx[j].functionId == -1 ||
697,640,061✔
899
        fmIsScalarFunc(pCtx[j].functionId)) {
57,349,421✔
900
      continue;
636,042,428✔
901
    }
902

903
    code = pCtx[j].fpSet.init(&pCtx[j], pCtx[j].resultInfo);
4,246,848✔
904
    if (code) {
4,106,283✔
905
      return code;
×
906
    }
907
  }
908

909
  return 0;
172,641,146✔
910
}
911

912
/*
913
 * The start of each column SResultRowEntryInfo is denote by RowCellInfoOffset.
914
 * Note that in case of top/bottom query, the whole multiple rows of result is treated as only one row of results.
915
 * +------------+-----------------result column 1------------+------------------result column 2-----------+
916
 * | SResultRow | SResultRowEntryInfo | intermediate buffer1 | SResultRowEntryInfo | intermediate buffer 2|
917
 * +------------+--------------------------------------------+--------------------------------------------+
918
 *           offset[0]                                  offset[1]                                   offset[2]
919
 */
920
// TODO refactor: some function move away
921
int32_t setFunctionResultOutput(struct SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t stage,
172,785,449✔
922
                             int32_t numOfExprs) {
923
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
172,785,449✔
924
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
172,827,693✔
925
  int32_t*        rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset;
172,786,767✔
926

927
  SResultRowInfo* pResultRowInfo = &pInfo->resultRowInfo;
172,801,559✔
928
  initResultRowInfo(pResultRowInfo);
172,746,050✔
929

930
  int64_t     tid = 0;
172,680,577✔
931
  int64_t     groupId = 0;
172,683,345✔
932
  SResultRow* pRow = doSetResultOutBufByKey(pSup->pResultBuf, pResultRowInfo, (char*)&tid, sizeof(tid), true, groupId,
172,683,345✔
933
                                            pTaskInfo, false, pSup, true);
934
  if (pRow == NULL || pTaskInfo->code != 0) {
172,790,616✔
935
    return pTaskInfo->code;
155,753✔
936
  }
937

938
  for (int32_t i = 0; i < numOfExprs; ++i) {
813,049,854✔
939
    struct SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, rowEntryInfoOffset);
640,308,866✔
940
    cleanupResultRowEntry(pEntry);
640,347,154✔
941

942
    pCtx[i].resultInfo = pEntry;
640,307,860✔
943
    pCtx[i].scanFlag = stage;
640,286,360✔
944
  }
945

946
  return initCtxOutputBuffer(pCtx, numOfExprs);
172,740,988✔
947
}
948

949
int32_t setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols, SArray** pResList) {
162,766,177✔
950
  QRY_PARAM_CHECK(pResList);
162,766,177✔
951
  SArray* pList = taosArrayInit(4, sizeof(int32_t));
162,725,792✔
952
  if (pList == NULL) {
162,727,297✔
953
    return terrno;
×
954
  }
955

956
  for (int32_t i = 0; i < numOfCols; ++i) {
764,777,052✔
957
    if (fmIsPseudoColumnFunc(pCtx[i].functionId) && !fmIsPlaceHolderFunc(pCtx[i].functionId)) {
602,205,296✔
958
      void* px = taosArrayPush(pList, &i);
168,328✔
959
      if (px == NULL) {
168,328✔
960
        return terrno;
×
961
      }
962
    }
963
  }
964

965
  *pResList = pList;
162,676,884✔
966
  return 0;
162,689,282✔
967
}
968

969
int32_t doGenerateSourceData(SOperatorInfo* pOperator) {
2,035,979✔
970
  SProjectOperatorInfo* pProjectInfo = pOperator->info;
2,035,979✔
971

972
  SExprSupp*   pSup = &pOperator->exprSupp;
2,035,979✔
973
  SSDataBlock* pRes = pProjectInfo->binfo.pRes;
2,035,979✔
974
  SExprInfo*   pExpr = pSup->pExprInfo;
2,035,979✔
975
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2,035,979✔
976

977
  int32_t code = blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);
2,035,979✔
978
  if (code) {
2,035,979✔
979
    return code;
×
980
  }
981

982
  for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
4,076,767✔
983
    int32_t outputSlotId = pExpr[k].base.resSchema.slotId;
2,040,788✔
984

985
    if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) {
2,040,788✔
986
      SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, outputSlotId);
2,033,092✔
987
      if (pColInfoData == NULL) {
2,033,092✔
988
        return terrno;
×
989
      }
990

991
      int32_t type = pExpr[k].base.pParam[0].param.nType;
2,033,092✔
992
      if (TSDB_DATA_TYPE_NULL == type) {
2,033,092✔
993
        colDataSetNNULL(pColInfoData, 0, 1);
994
      } else {
995
        code = colDataSetVal(pColInfoData, 0, taosVariantGet(&pExpr[k].base.pParam[0].param, type), false);
2,029,762✔
996
        if (code) {
2,029,762✔
997
          return code;
×
998
        }
999
      }
1000
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) {
7,696✔
1001
      SqlFunctionCtx* pfCtx = &pSup->pCtx[k];
6,253✔
1002

1003
      // UDF scalar functions will be calculated here, for example, select foo(n) from (select 1 n).
1004
      // UDF aggregate functions will be handled in agg operator.
1005
      if (fmIsScalarFunc(pfCtx->functionId)) {
6,253✔
1006
        SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
6,253✔
1007
        if (pBlockList == NULL) {
6,253✔
1008
          return terrno;
×
1009
        }
1010

1011
        void* px = taosArrayPush(pBlockList, &pRes);
6,253✔
1012
        if (px == NULL) {
6,253✔
1013
          return terrno;
×
1014
        }
1015

1016
        SColumnInfoData* pResColData = taosArrayGet(pRes->pDataBlock, outputSlotId);
6,253✔
1017
        if (pResColData == NULL) {
6,253✔
1018
          return terrno;
×
1019
        }
1020

1021
        SColumnInfoData  idata = {.info = pResColData->info, .hasNull = true};
6,253✔
1022

1023
        SScalarParam dest = {.columnData = &idata};
6,253✔
1024
        gTaskScalarExtra.pStreamInfo  = GET_STM_RTINFO(pOperator->pTaskInfo);
6,253✔
1025
        gTaskScalarExtra.pStreamRange = NULL;
6,253✔
1026
        gTaskScalarExtra.pTaskInfo    = pOperator->pTaskInfo;
6,253✔
1027
        gTaskScalarExtra.isTaskKilled = isTaskKilled;
6,253✔
1028
        code = scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest, &gTaskScalarExtra);
6,253✔
1029
        if (code != TSDB_CODE_SUCCESS) {
6,253✔
1030
          taosArrayDestroy(pBlockList);
×
1031
          return code;
×
1032
        }
1033

1034
        int32_t startOffset = pRes->info.rows;
6,253✔
1035
        if (pRes->info.capacity <= 0) {
6,253✔
1036
          qError("project failed at: %s:%d", __func__, __LINE__);
×
1037
          return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1038
        }
1039
        code = colDataAssign(pResColData, &idata, dest.numOfRows, &pRes->info);
6,253✔
1040
        if (code) {
6,253✔
1041
          return code;
×
1042
        }
1043

1044
        colDataDestroy(&idata);
6,253✔
1045
        taosArrayDestroy(pBlockList);
6,253✔
1046
      } else {
1047
        return TSDB_CODE_OPS_NOT_SUPPORT;
×
1048
      }
1049
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_OPERATOR) {
1,443✔
1050
      TAOS_CHECK_RETURN(projectApplyOperator(&pExpr[k], pRes, NULL, outputSlotId, NULL, false, &gTaskScalarExtra));
1,443✔
1051
    } else {
1052
      return TSDB_CODE_OPS_NOT_SUPPORT;
×
1053
    }
1054
  }
1055

1056
  pRes->info.rows = 1;
2,035,979✔
1057
  code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
2,035,979✔
1058
  if (code) {
2,035,979✔
1059
    pTaskInfo->code = code;
×
1060
    return code;
×
1061
  }
1062

1063
  (void) doIngroupLimitOffset(&pProjectInfo->limitInfo, 0, pRes, pOperator);
2,035,979✔
1064

1065
  setOperatorCompleted(pOperator);
2,035,979✔
1066

1067
  return code;
2,035,979✔
1068
}
1069

1070
static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, SArray* pPseudoList) {
580,333,482✔
1071
  size_t num = (pPseudoList != NULL) ? taosArrayGetSize(pPseudoList) : 0;
580,333,482✔
1072
  for (int32_t i = 0; i < num; ++i) {
583,107,972✔
1073
    pCtx[i].pOutput = taosArrayGet(pResult->pDataBlock, i);
2,565,322✔
1074
    if (pCtx[i].pOutput == NULL) {
2,565,322✔
1075
      qError("failed to get the output buf, ptr is null");
×
1076
    }
1077
  }
1078
}
580,542,650✔
1079

1080
int32_t projectApplyColumn(SSDataBlock* pResult, SSDataBlock* pSrcBlock, int32_t outputSlotId, SqlFunctionCtx* pfCtx, int32_t* numOfRows, bool createNewColModel) {
1,291,222,135✔
1081
  int32_t code = 0, lino = 0;
1,291,222,135✔
1082
  SInputColumnInfoData* pInputData = &pfCtx->input;
1,291,222,135✔
1083
  SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
1,291,295,132✔
1084
  TSDB_CHECK_NULL(pColInfoData, code, lino, _exit, terrno);
1,291,288,174✔
1085

1086
  if (pResult->info.rows > 0 && !createNewColModel) {
1,291,288,174✔
1087
    if (pInputData->pData[0] == NULL) {
11,187,925✔
1088
      int32_t slotId = pfCtx->param[0].pCol->slotId;
11,188,817✔
1089

1090
      SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId);
11,188,817✔
1091
      TSDB_CHECK_NULL(pInput, code, lino, _exit, terrno);
11,187,479✔
1092

1093
      TAOS_CHECK_EXIT(colDataMergeCol(pColInfoData, pResult->info.rows, (int32_t*)&pResult->info.capacity, pInput,
11,187,479✔
1094
                            pSrcBlock->info.rows));
1095
      *numOfRows = pSrcBlock->info.rows;
11,181,235✔
1096
      return code;
11,183,019✔
1097
    }
1098
    
1099
    TAOS_CHECK_EXIT(colDataMergeCol(pColInfoData, pResult->info.rows, (int32_t*)&pResult->info.capacity,
×
1100
                          pInputData->pData[0], pInputData->numOfRows));
1101
    *numOfRows = pInputData->numOfRows;
×
1102
    return code;
×
1103
  } 
1104
  
1105
  if (pInputData->pData[0] == NULL) {
1,280,155,629✔
1106
    int32_t slotId = pfCtx->param[0].pCol->slotId;
8,996,750✔
1107

1108
    SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId);
8,997,196✔
1109
    TSDB_CHECK_NULL(pInput, code, lino, _exit, terrno);
8,997,642✔
1110

1111
    TAOS_CHECK_EXIT(colDataAssign(pColInfoData, pInput, pSrcBlock->info.rows, &pResult->info));
8,997,642✔
1112
    *numOfRows = pSrcBlock->info.rows;
8,997,196✔
1113

1114
    return code;
8,997,642✔
1115
  }
1116
  
1117
  TAOS_CHECK_EXIT(colDataAssign(pColInfoData, pInputData->pData[0], pInputData->numOfRows, &pResult->info));
1,270,925,541✔
1118
  *numOfRows = pInputData->numOfRows;
1,271,097,915✔
1119

1120
_exit:
1,271,143,349✔
1121

1122
  if (code) {
1,271,143,349✔
1123
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1124
  }
1125
  
1126
  return code;
1,271,049,110✔
1127
}
1128

1129

1130
int32_t projectApplyValue(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, int32_t outputSlotId, int32_t* numOfRows, bool createNewColModel) {
19,285,748✔
1131
  int32_t code = 0, lino = 0;
19,285,748✔
1132
  SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
19,285,748✔
1133
  TSDB_CHECK_NULL(pColInfoData, code, lino, _exit, terrno);
19,274,394✔
1134

1135
  int32_t offset = createNewColModel ? 0 : pResult->info.rows;
19,274,394✔
1136
  int32_t type = pExpr->base.pParam[0].param.nType;
19,288,552✔
1137
  if (TSDB_DATA_TYPE_NULL == type) {
19,276,617✔
1138
    colDataSetNNULL(pColInfoData, offset, pSrcBlock->info.rows);
715,159✔
1139
  } else {
1140
    char* p = taosVariantGet(&pExpr->base.pParam[0].param, type);
18,561,458✔
1141
    for (int32_t i = 0; i < pSrcBlock->info.rows; ++i) {
2,147,483,647✔
1142
      TAOS_CHECK_EXIT(colDataSetVal(pColInfoData, i + offset, p, false));
2,147,483,647✔
1143
    }
1144
  }
1145

1146
  *numOfRows = pSrcBlock->info.rows;
14,361,989✔
1147

1148
_exit:
19,284,336✔
1149

1150
  if (code) {
19,284,336✔
1151
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1152
  }
1153
  
1154
  return code;
19,284,991✔
1155
}
1156

1157

1158

1159
int32_t projectApplyOperator(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, int32_t outputSlotId, int32_t* numOfRows, bool createNewColModel, const void* pExtraParams) {
92,184,832✔
1160
  int32_t code = 0, lino = 0;
92,184,832✔
1161
  SArray* pBlockList = NULL;
92,184,832✔
1162
  if (NULL != pSrcBlock) {
92,184,832✔
1163
    pBlockList = taosArrayInit(4, POINTER_BYTES);
92,183,954✔
1164
    TSDB_CHECK_NULL(pBlockList, code, lino, _exit, terrno);
92,193,519✔
1165

1166
    void* px = taosArrayPush(pBlockList, &pSrcBlock);
92,180,993✔
1167
    TSDB_CHECK_NULL(px, code, lino, _exit, terrno);
92,180,993✔
1168
  }
1169
  
1170
  SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId);
92,181,871✔
1171
  TSDB_CHECK_NULL(pResColData, code, lino, _exit, terrno);
92,186,001✔
1172

1173
  SColumnInfoData idata = {.info = pResColData->info, .hasNull = true};
92,186,001✔
1174
  SScalarParam dest = {.columnData = &idata};
92,188,356✔
1175
  gTaskScalarExtra.pStreamInfo = (void*)pExtraParams;
92,187,785✔
1176
  gTaskScalarExtra.pStreamRange = NULL;
92,187,785✔
1177
  TAOS_CHECK_EXIT(scalarCalculate(pExpr->pExpr->_optrRoot.pRootNode, pBlockList, &dest, &gTaskScalarExtra));
92,186,586✔
1178

1179
  if (pResult->info.rows > 0 && !createNewColModel) {
90,852,277✔
1180
    code = colDataMergeCol(pResColData, pResult->info.rows, (int32_t*)&pResult->info.capacity, &idata, dest.numOfRows);
7,136✔
1181
  } else {
1182
    code = colDataAssign(pResColData, &idata, dest.numOfRows, &pResult->info);
90,848,706✔
1183
  }
1184

1185
  colDataDestroy(&idata);
90,866,000✔
1186
  TAOS_CHECK_EXIT(code);
90,859,442✔
1187

1188
  if (numOfRows) {
90,859,442✔
1189
    *numOfRows = dest.numOfRows;
90,857,403✔
1190
  }
1191
  
1192
_exit:
92,189,609✔
1193

1194
  if (code < 0) {
92,183,621✔
1195
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
1,328,975✔
1196
  }
1197

1198
  taosArrayDestroy(pBlockList);
92,183,621✔
1199
  
1200
  return code;
92,188,443✔
1201
}
1202

1203

1204
int32_t projectApplyFunction(SqlFunctionCtx* pCtx, SqlFunctionCtx* pfCtx, SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, 
612,691,617✔
1205
                                    int32_t outputSlotId, int32_t* numOfRows, bool createNewColModel, const void* pExtraParams, 
1206
                                    SArray* pPseudoList, SArray** processByRowFunctionCtx, bool doSelectFunc) {
1207
  int32_t code = 0, lino = 0;
612,691,617✔
1208
  SArray* pBlockList = NULL;
612,691,617✔
1209
  SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId);
612,691,617✔
1210
  TSDB_CHECK_NULL(pResColData, code, lino, _exit, terrno);
612,740,633✔
1211

1212
  if (fmIsPlaceHolderFunc(pfCtx->functionId) && pExtraParams && pfCtx->pExpr->base.pParamList && 1 == pfCtx->pExpr->base.pParamList->length) {
612,740,633✔
1213
    SNode* pParamNode = nodesListGetNode(pfCtx->pExpr->base.pParamList, 0);
3,353,353✔
1214
    SStreamRuntimeFuncInfo* pStreamInfo = (SStreamRuntimeFuncInfo*)pExtraParams;
3,353,353✔
1215
    if (pStreamInfo != NULL && pStreamInfo->withExternalWindow && pStreamInfo->pStreamBlkWinIdx != NULL &&
3,353,353✔
1216
        taosArrayGetSize(pStreamInfo->pStreamBlkWinIdx) > 1) {
×
1217
      TAOS_CHECK_EXIT(assignPlaceHolderInExternalWindows(pResColData, pResult->info.rows, pSrcBlock->info.rows,
×
1218
                                                         pfCtx->functionId, pStreamInfo, pParamNode));
1219
    } else {
1220
      TAOS_CHECK_EXIT(scalarAssignPlaceHolderRes(pResColData, pResult->info.rows, pSrcBlock->info.rows,
3,353,353✔
1221
                                                 pfCtx->functionId, pExtraParams, pParamNode));
1222
    }
1223
    *numOfRows = pSrcBlock->info.rows;
3,353,353✔
1224

1225
    return code;
3,353,353✔
1226
  }
1227

1228
  if (fmIsScalarFunc(pfCtx->functionId) || fmIsPlaceHolderFunc(pfCtx->functionId)) {
609,343,696✔
1229
    pBlockList = taosArrayInit(4, POINTER_BYTES);
450,528,384✔
1230
    TSDB_CHECK_NULL(pBlockList, code, lino, _exit, terrno);
450,550,536✔
1231

1232
    void* px = taosArrayPush(pBlockList, &pSrcBlock);
450,484,684✔
1233
    TSDB_CHECK_NULL(px, code, lino, _exit, terrno);
450,484,684✔
1234

1235
    SColumnInfoData idata = {.info = pResColData->info, .hasNull = true};
450,484,684✔
1236
    SScalarParam dest = {.columnData = &idata};
450,514,102✔
1237
    gTaskScalarExtra.pStreamInfo = (void*)pExtraParams;
450,597,795✔
1238
    gTaskScalarExtra.pStreamRange = NULL;
450,597,795✔
1239
    TAOS_CHECK_EXIT(scalarCalculate((SNode*)pExpr->pExpr->_function.pFunctNode, pBlockList, &dest, &gTaskScalarExtra));
450,635,513✔
1240

1241
    if (pResult->info.rows > 0 && !createNewColModel) {
450,190,654✔
1242
      code = colDataMergeCol(pResColData, pResult->info.rows, (int32_t*)&pResult->info.capacity, &idata, dest.numOfRows);
20,100,688✔
1243
    } else {
1244
      SColumnInfo oriInfo = pResColData->info;
430,304,452✔
1245
      code = colDataAssign(pResColData, &idata, dest.numOfRows, &pResult->info);
430,398,570✔
1246
      // restore the original column info to satisfy the output column schema
1247
      pResColData->info = oriInfo;
430,531,020✔
1248
    }
1249

1250
    colDataDestroy(&idata);
450,525,957✔
1251
    taosArrayDestroy(pBlockList);
450,485,858✔
1252
    TAOS_CHECK_EXIT(code);
450,574,668✔
1253

1254
    *numOfRows = dest.numOfRows;
450,574,668✔
1255

1256
    return code;
450,586,655✔
1257
  }
1258

1259
  if (fmIsIndefiniteRowsFunc(pfCtx->functionId)) {
158,735,839✔
1260
    SResultRowEntryInfo* pResInfo = GET_RES_INFO(pfCtx);
65,346,013✔
1261
    TAOS_CHECK_EXIT(pfCtx->fpSet.init(pfCtx, pResInfo));
65,346,013✔
1262

1263

1264
    pfCtx->pOutput = (char*)pResColData;
65,346,013✔
1265
    TSDB_CHECK_NULL(pfCtx->pOutput, code, lino, _exit, terrno);
65,346,013✔
1266

1267
    pfCtx->offset = createNewColModel ? 0 : pResult->info.rows;  // set the start offset
65,346,013✔
1268

1269
    // set the timestamp(_rowts) output buffer
1270
    if (taosArrayGetSize(pPseudoList) > 0) {
65,346,013✔
1271
      int32_t* outputColIndex = taosArrayGet(pPseudoList, 0);
1,068,614✔
1272
      TSDB_CHECK_NULL(outputColIndex, code, lino, _exit, terrno);
1,068,614✔
1273

1274
      pfCtx->pTsOutput = (SColumnInfoData*)pCtx[*outputColIndex].pOutput;
1,068,614✔
1275
    }
1276

1277
    // link pDstBlock to set selectivity value
1278
    if (pfCtx->subsidiaries.num > 0) {
65,346,013✔
1279
      pfCtx->pDstBlock = pResult;
49,637,269✔
1280
    }
1281

1282
    code = pfCtx->fpSet.process(pfCtx);
65,346,013✔
1283
    if (code != TSDB_CODE_SUCCESS) {
65,346,013✔
1284
      if (pfCtx->fpSet.cleanup != NULL) {
14,248✔
1285
        pfCtx->fpSet.cleanup(pfCtx);
×
1286
      }
1287
      TAOS_CHECK_EXIT(code);
14,248✔
1288
    }
1289

1290
    *numOfRows = pResInfo->numOfRes;
65,331,765✔
1291
    
1292
    if (fmIsProcessByRowFunc(pfCtx->functionId)) {
65,331,765✔
1293
      if (NULL == *processByRowFunctionCtx) {
61,666,117✔
1294
        *processByRowFunctionCtx = taosArrayInit(1, sizeof(SqlFunctionCtx*));
61,573,479✔
1295
        TSDB_CHECK_NULL(*processByRowFunctionCtx, code, lino, _exit, terrno);
61,573,479✔
1296
      }
1297

1298
      void* px = taosArrayPush(*processByRowFunctionCtx, &pfCtx);
61,666,117✔
1299
      TSDB_CHECK_NULL(px, code, lino, _exit, terrno);
61,666,117✔
1300
    }
1301

1302
    return code;
65,331,765✔
1303
  } 
1304

1305
  if (fmIsAggFunc(pfCtx->functionId)) {
93,389,826✔
1306
    // selective value output should be set during corresponding function execution
1307
    if (!doSelectFunc && fmIsSelectValueFunc(pfCtx->functionId)) {
90,823,556✔
1308
      return code;
49,839,883✔
1309
    }
1310
    
1311
    // _group_key function for "partition by tbname" + csum(col_name) query
1312
    int32_t slotId = pfCtx->param[0].pCol->slotId;
40,983,673✔
1313

1314
    // todo handle the json tag
1315
    SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId);
40,983,673✔
1316
    TSDB_CHECK_NULL(pInput, code, lino, _exit, terrno);
40,983,673✔
1317

1318
    for (int32_t f = 0; f < pSrcBlock->info.rows; ++f) {
2,147,483,647✔
1319
      bool isNull = colDataIsNull_s(pInput, f);
2,147,483,647✔
1320
      if (isNull) {
2,147,483,647✔
1321
        colDataSetNULL(pResColData, pResult->info.rows + f);
281,356,700✔
1322
      } else {
1323
        char* data = colDataGetData(pInput, f);
2,147,483,647✔
1324
        TAOS_CHECK_EXIT(colDataSetVal(pResColData, pResult->info.rows + f, data, isNull));
2,147,483,647✔
1325
      }
1326
    }
1327

1328
    *numOfRows = pSrcBlock->info.rows;
40,983,673✔
1329

1330
    return code;
40,983,673✔
1331
  } 
1332
  
1333
  if (fmIsGroupIdFunc(pfCtx->functionId)) {
2,566,270✔
1334
    for (int32_t f = 0; f < pSrcBlock->info.rows; ++f) {
×
1335
      TAOS_CHECK_EXIT(colDataSetVal(pResColData, pResult->info.rows + f, (const char*)&pSrcBlock->info.id.groupId, false));
×
1336
    }
1337

1338
    *numOfRows = pSrcBlock->info.rows;
×
1339
    return code;
×
1340
  }
1341
  
1342
_exit:
2,566,270✔
1343

1344
  if (code) {
2,618,669✔
1345
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
52,399✔
1346
  }
1347

1348
  taosArrayDestroy(pBlockList);
2,618,669✔
1349
  
1350
  return code;
2,618,669✔
1351
}
1352

1353

1354
int32_t projectApplyFunctionsWithSelect(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock,
591,565,657✔
1355
                                        SqlFunctionCtx* pCtx, int32_t numOfOutput, SArray* pPseudoList,
1356
                                        const void* pExtraParams, bool doSelectFunc, bool hasIndefRowsFunc,
1357
                                        SExecTaskInfo* pTaskInfo) {
1358
  int32_t lino = 0;
591,565,657✔
1359
  int32_t code = TSDB_CODE_SUCCESS;
591,565,657✔
1360

1361
  SExecTaskInfo* savedTaskInfo = gTaskScalarExtra.pTaskInfo;
591,565,657✔
1362
  __typeof__(gTaskScalarExtra.isTaskKilled) savedIsTaskKilled = gTaskScalarExtra.isTaskKilled;
591,690,134✔
1363

1364
  if (pTaskInfo != NULL) {
591,733,206✔
1365
    gTaskScalarExtra.pTaskInfo    = pTaskInfo;
591,769,829✔
1366
    gTaskScalarExtra.isTaskKilled = isTaskKilled;
591,728,334✔
1367
  }
1368

1369
  if (hasIndefRowsFunc) {
591,815,499✔
1370
    setPseudoOutputColInfo(pResult, pCtx, pPseudoList);
580,604,529✔
1371
  }
1372
  pResult->info.dataLoad = 1;
591,666,719✔
1373

1374
  SArray* processByRowFunctionCtx = NULL;
591,709,699✔
1375
  SArray* pProcessedFuncIds = NULL;
591,510,239✔
1376
  SArray* pGroupedCtxArray = NULL;
591,510,239✔
1377
  if (pSrcBlock == NULL) {
591,510,239✔
1378
    for (int32_t k = 0; k < numOfOutput; ++k) {
×
1379
      int32_t outputSlotId = pExpr[k].base.resSchema.slotId;
×
1380

1381
      if (pExpr[k].pExpr->nodeType != QUERY_NODE_VALUE) {
×
1382
        qError("project failed at: %s:%d", __func__, __LINE__);
×
1383
        TAOS_CHECK_EXIT(TSDB_CODE_INVALID_PARA);
×
1384
      }
1385
      SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
×
1386
      TSDB_CHECK_NULL(pColInfoData, code, lino, _exit, terrno);
×
1387

1388
      int32_t type = pExpr[k].base.pParam[0].param.nType;
×
1389
      if (TSDB_DATA_TYPE_NULL == type) {
×
1390
        colDataSetNNULL(pColInfoData, 0, 1);
1391
      } else {
1392
        TAOS_CHECK_EXIT(colDataSetVal(pColInfoData, 0, taosVariantGet(&pExpr[k].base.pParam[0].param, type), false));
×
1393
      }
1394
    }
1395

1396
    pResult->info.rows = 1;
×
1397
    goto _exit;
×
1398
  }
1399

1400
  if (pResult != pSrcBlock) {
591,510,239✔
1401
    pResult->info.id.groupId = pSrcBlock->info.id.groupId;
434,014,230✔
1402
    if (pSrcBlock->info.parTbName[0]) {
434,128,793✔
1403
      tstrncpy(pResult->info.parTbName, pSrcBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
×
1404
    }
1405
    qTrace("%s, parName:%s,groupId:%" PRIu64, __FUNCTION__, pSrcBlock->info.parTbName, pResult->info.id.groupId);
434,108,237✔
1406
  }
1407

1408
  // if the source equals to the destination, it is to create a new column as the result of scalar
1409
  // function or some operators.
1410
  bool createNewColModel = (pResult == pSrcBlock);
591,536,534✔
1411
  if (createNewColModel) {
591,536,534✔
1412
    TAOS_CHECK_EXIT(blockDataEnsureCapacity(pResult, pResult->info.rows));
157,582,618✔
1413
  }
1414

1415
  int32_t numOfRows = 0;
591,533,773✔
1416

1417
  for (int32_t k = 0; k < numOfOutput; ++k) {
2,147,483,647✔
1418
    int32_t               outputSlotId = pExpr[k].base.resSchema.slotId;
2,015,094,118✔
1419
    SqlFunctionCtx*       pfCtx = &pCtx[k];
2,015,325,400✔
1420
    switch (pExpr[k].pExpr->nodeType) {
2,015,328,685✔
1421
      case QUERY_NODE_COLUMN: {
1,291,267,811✔
1422
        TAOS_CHECK_EXIT(projectApplyColumn(pResult, pSrcBlock, outputSlotId, pfCtx, &numOfRows, createNewColModel));
1,291,267,811✔
1423
        break;
1,291,273,341✔
1424
      } 
1425
      case QUERY_NODE_VALUE: {
19,272,706✔
1426
        TAOS_CHECK_EXIT(projectApplyValue(&pExpr[k], pResult, pSrcBlock, outputSlotId, &numOfRows, createNewColModel));
19,272,706✔
1427
        break;
19,284,379✔
1428
      } 
1429
      case QUERY_NODE_OPERATOR: {
92,182,188✔
1430
        TAOS_CHECK_EXIT(projectApplyOperator(&pExpr[k], pResult, pSrcBlock, outputSlotId, &numOfRows, createNewColModel, pExtraParams));
92,182,188✔
1431
        break;
90,852,072✔
1432
      } 
1433
      case QUERY_NODE_FUNCTION: {
612,769,953✔
1434
        TAOS_CHECK_EXIT(projectApplyFunction(pCtx, pfCtx, &pExpr[k], pResult, pSrcBlock, outputSlotId, &numOfRows, createNewColModel, pExtraParams, pPseudoList, &processByRowFunctionCtx, doSelectFunc));
612,769,953✔
1435
        break;
612,360,407✔
1436
      }
1437
      default: {
×
1438
        qError("invalid project expr nodeType:%d", pExpr[k].pExpr->nodeType);
×
1439
        TAOS_CHECK_EXIT(TSDB_CODE_OPS_NOT_SUPPORT);
×
1440
      }
1441
    }
1442
  }
1443

1444
  if (processByRowFunctionCtx && taosArrayGetSize(processByRowFunctionCtx) > 0) {
590,084,742✔
1445
    int32_t processByRowSize = taosArrayGetSize(processByRowFunctionCtx);
61,365,319✔
1446
    SStreamRuntimeFuncInfo* pStreamInfo = (SStreamRuntimeFuncInfo*)pExtraParams;
61,573,479✔
1447
    bool splitByExternalWindow = pSrcBlock != NULL && pStreamInfo != NULL && pStreamInfo->withExternalWindow &&
61,573,479✔
1448
                                 pStreamInfo->pStreamBlkWinIdx != NULL &&
×
1449
                                 taosArrayGetSize(pStreamInfo->pStreamBlkWinIdx) > 1 &&
123,146,958✔
1450
                                 allProcessByRowCtxSameFuncId(processByRowFunctionCtx);
×
1451
    pProcessedFuncIds = taosArrayInit(4, sizeof(int32_t));
61,573,479✔
1452
    TSDB_CHECK_NULL(pProcessedFuncIds, code, lino, _exit, terrno);
61,573,479✔
1453

1454
    for (int32_t i = 0; i < processByRowSize; ++i) {
123,229,735✔
1455
      SqlFunctionCtx** ppCurrCtx = taosArrayGet(processByRowFunctionCtx, i);
61,665,626✔
1456
      TSDB_CHECK_NULL(ppCurrCtx, code, lino, _exit, terrno);
61,665,626✔
1457
      TSDB_CHECK_NULL(*ppCurrCtx, code, lino, _exit, terrno);
61,665,626✔
1458

1459
      bool    processed = false;
61,665,626✔
1460
      int32_t processedNum = taosArrayGetSize(pProcessedFuncIds);
61,665,626✔
1461
      for (int32_t j = 0; j < processedNum; ++j) {
61,688,892✔
1462
        int32_t* pFuncId = taosArrayGet(pProcessedFuncIds, j);
92,147✔
1463
        TSDB_CHECK_NULL(pFuncId, code, lino, _exit, terrno);
92,147✔
1464
        if (*pFuncId == (*ppCurrCtx)->functionId) {
92,147✔
1465
          processed = true;
68,881✔
1466
          break;
68,881✔
1467
        }
1468
      }
1469

1470
      if (processed) {
61,665,626✔
1471
        continue;
68,881✔
1472
      }
1473

1474
      pGroupedCtxArray = taosArrayInit(2, sizeof(SqlFunctionCtx*));
61,596,745✔
1475
      TSDB_CHECK_NULL(pGroupedCtxArray, code, lino, _exit, terrno);
61,596,745✔
1476

1477
      for (int32_t j = i; j < processByRowSize; ++j) {
123,286,574✔
1478
        SqlFunctionCtx** ppTmpCtx = taosArrayGet(processByRowFunctionCtx, j);
61,689,829✔
1479
        TSDB_CHECK_NULL(ppTmpCtx, code, lino, _exit, terrno);
61,689,829✔
1480
        TSDB_CHECK_NULL(*ppTmpCtx, code, lino, _exit, terrno);
61,689,829✔
1481

1482
        if ((*ppTmpCtx)->functionId == (*ppCurrCtx)->functionId) {
61,689,829✔
1483
          void* px = taosArrayPush(pGroupedCtxArray, ppTmpCtx);
61,666,117✔
1484
          TSDB_CHECK_NULL(px, code, lino, _exit, terrno);
61,666,117✔
1485
        }
1486
      }
1487

1488
      if (splitByExternalWindow) {
61,596,745✔
1489
        TAOS_CHECK_EXIT(processByRowInExternalWindows(pGroupedCtxArray, pSrcBlock, pStreamInfo));
×
1490
      } else {
1491
        TAOS_CHECK_EXIT((*ppCurrCtx)->fpSet.processFuncByRow(pGroupedCtxArray));
61,596,745✔
1492
      }
1493
      taosArrayDestroy(pGroupedCtxArray);
61,587,375✔
1494
      pGroupedCtxArray = NULL;
61,587,375✔
1495

1496
      void* px = taosArrayPush(pProcessedFuncIds, &(*ppCurrCtx)->functionId);
61,587,375✔
1497
      TSDB_CHECK_NULL(px, code, lino, _exit, terrno);
61,587,375✔
1498

1499
      numOfRows = (*ppCurrCtx)->resultInfo->numOfRes;
61,587,375✔
1500
    }
1501

1502
    taosArrayDestroy(pProcessedFuncIds);
61,564,109✔
1503
    pProcessedFuncIds = NULL;
61,564,109✔
1504
  }
1505

1506
  if (!createNewColModel) {
590,283,532✔
1507
    pResult->info.rows += numOfRows;
432,900,160✔
1508
  }
1509

1510
_exit:
591,698,759✔
1511
  gTaskScalarExtra.pTaskInfo    = savedTaskInfo;
591,705,492✔
1512
  gTaskScalarExtra.isTaskKilled = savedIsTaskKilled;
591,745,917✔
1513

1514
  if (pGroupedCtxArray) {
591,674,720✔
1515
    taosArrayDestroy(pGroupedCtxArray);
9,370✔
1516
  }
1517
  if (pProcessedFuncIds) {
591,674,720✔
1518
    taosArrayDestroy(pProcessedFuncIds);
9,370✔
1519
  }
1520
  if (processByRowFunctionCtx) {
591,674,720✔
1521
    taosArrayDestroy(processByRowFunctionCtx);
61,573,479✔
1522
  }
1523
  if (code) {
591,674,720✔
1524
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
1,390,744✔
1525
  }
1526
  return code;
591,674,720✔
1527
}
1528

1529
int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
580,374,589✔
1530
                              int32_t numOfOutput, SArray* pPseudoList, const void* pExtraParams,
1531
                              SExecTaskInfo* pTaskInfo) {
1532
  return projectApplyFunctionsWithSelect(pExpr, pResult, pSrcBlock, pCtx, numOfOutput, pPseudoList, pExtraParams,
580,374,589✔
1533
                                         false, true, pTaskInfo);
1534
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc