• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5014

03 Apr 2026 03:59PM UTC coverage: 72.256% (-0.06%) from 72.317%
#5014

push

travis-ci

web-flow
merge: from main to 3.0 branch #35067

4054 of 5985 new or added lines in 68 files covered. (67.74%)

13285 existing lines in 168 files now uncovered.

257272 of 356056 relevant lines covered (72.26%)

133154720.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.39
/source/libs/executor/src/projectoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "functionMgt.h"
19
#include "operator.h"
20
#include "querytask.h"
21
#include "taoserror.h"
22
#include "tdatablock.h"
23

24
typedef struct SProjectOperatorInfo {
25
  SOptrBasicInfo binfo;
26
  SAggSupporter  aggSup;
27
  SArray*        pPseudoColInfo;
28
  SLimitInfo     limitInfo;
29
  bool           mergeDataBlocks;
30
  SSDataBlock*   pFinalRes;
31
  bool           inputIgnoreGroup;
32
  bool           outputIgnoreGroup;
33
} SProjectOperatorInfo;
34

35
typedef struct SIndefOperatorInfo {
36
  SOptrBasicInfo binfo;
37
  SAggSupporter  aggSup;
38
  SArray*        pPseudoColInfo;
39
  SExprSupp      scalarSup;
40
  uint64_t       groupId;
41
  SSDataBlock*   pNextGroupRes;
42
} SIndefOperatorInfo;
43

44
static int32_t      doGenerateSourceData(SOperatorInfo* pOperator);
45
static int32_t      doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
46
static int32_t      doApplyIndefinitFunction(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
47
int32_t projectApplyOperator(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, int32_t outputSlotId, int32_t* numOfRows, bool createNewColModel, const void* pExtraParams);
48

49
static bool hasLagLeadFunc(const SExprSupp* pSup) {
8,632,522✔
50
  if (pSup == NULL || pSup->pCtx == NULL) {
8,632,522✔
NEW
51
    return false;
×
52
  }
53

54
  for (int32_t i = 0; i < pSup->numOfExprs; ++i) {
28,124,127✔
55
    EFunctionType type = fmGetFuncTypeFromId(pSup->pCtx[i].functionId);
19,522,703✔
56
    if (type == FUNCTION_TYPE_LAG || type == FUNCTION_TYPE_LEAD) {
19,521,423✔
57
      return true;
30,191✔
58
    }
59
  }
60

61
  return false;
8,602,484✔
62
}
63

64
static void destroyProjectOperatorInfo(void* param) {
145,915,512✔
65
  if (NULL == param) {
145,915,512✔
66
    return;
×
67
  }
68

69
  SProjectOperatorInfo* pInfo = (SProjectOperatorInfo*)param;
145,915,512✔
70
  cleanupBasicInfo(&pInfo->binfo);
145,915,512✔
71
  cleanupAggSup(&pInfo->aggSup);
145,905,781✔
72
  taosArrayDestroy(pInfo->pPseudoColInfo);
145,886,735✔
73

74
  blockDataDestroy(pInfo->pFinalRes);
145,893,115✔
75
  taosMemoryFreeClear(param);
145,910,652✔
76
}
77

78
static void destroyIndefinitOperatorInfo(void* param) {
3,144,861✔
79
  SIndefOperatorInfo* pInfo = (SIndefOperatorInfo*)param;
3,144,861✔
80
  if (pInfo == NULL) {
3,144,861✔
81
    return;
×
82
  }
83

84
  cleanupBasicInfo(&pInfo->binfo);
3,144,861✔
85
  taosArrayDestroy(pInfo->pPseudoColInfo);
3,144,861✔
86
  cleanupAggSup(&pInfo->aggSup);
3,145,234✔
87
  cleanupExprSupp(&pInfo->scalarSup);
3,144,732✔
88

89
  taosMemoryFreeClear(param);
3,145,234✔
90
}
91

UNCOV
92
void streamOperatorReleaseState(SOperatorInfo* pOperator) {
×
UNCOV
93
  SOperatorInfo* downstream = pOperator->pDownstream[0];
×
UNCOV
94
  if (downstream->fpSet.releaseStreamStateFn) {
×
UNCOV
95
    downstream->fpSet.releaseStreamStateFn(downstream);
×
96
  }
UNCOV
97
}
×
98

UNCOV
99
void streamOperatorReloadState(SOperatorInfo* pOperator) {
×
UNCOV
100
  SOperatorInfo* downstream = pOperator->pDownstream[0];
×
UNCOV
101
  if (downstream->fpSet.reloadStreamStateFn) {
×
UNCOV
102
    downstream->fpSet.reloadStreamStateFn(downstream);
×
103
  }
UNCOV
104
}
×
105

106
static int32_t resetProjectOperState(SOperatorInfo* pOper) {
9,185,993✔
107
  SProjectOperatorInfo* pProject = pOper->info;
9,185,993✔
108
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
9,185,993✔
109
  pOper->status = OP_NOT_OPENED;
9,185,993✔
110

111
  resetBasicOperatorState(&pProject->binfo);
9,185,993✔
112
  SProjectPhysiNode* pPhynode = (SProjectPhysiNode*)pOper->pPhyNode;
9,185,947✔
113

114
  pProject->limitInfo = (SLimitInfo){0};
9,185,947✔
115
  initLimitInfo(pPhynode->node.pLimit, pPhynode->node.pSlimit, &pProject->limitInfo);
9,185,947✔
116

117
  blockDataCleanup(pProject->pFinalRes);
9,185,970✔
118

119
  int32_t code = resetAggSup(&pOper->exprSupp, &pProject->aggSup, pTaskInfo, pPhynode->pProjections, NULL,
17,798,508✔
120
    sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, NULL,
9,185,970✔
121
    &pTaskInfo->storageAPI.functionStore);
122
  if (code == 0){
9,185,257✔
123
    code = setFunctionResultOutput(pOper, &pProject->binfo, &pProject->aggSup, MAIN_SCAN, pOper->exprSupp.numOfExprs);
9,185,763✔
124
  }
125
  return 0;
9,185,147✔
126
}
127

128
int32_t createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, SExecTaskInfo* pTaskInfo,
145,892,717✔
129
                                  SOperatorInfo** pOptrInfo) {
130
  QRY_PARAM_CHECK(pOptrInfo);
145,892,717✔
131

132
  int32_t code = TSDB_CODE_SUCCESS;
145,892,294✔
133
  SProjectOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SProjectOperatorInfo));
145,892,294✔
134
  SOperatorInfo*        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
145,723,377✔
135
  if (pInfo == NULL || pOperator == NULL) {
145,828,659✔
136
    code = terrno;
824✔
UNCOV
137
    goto _error;
×
138
  }
139

140
  pOperator->pPhyNode = pProjPhyNode;
145,828,347✔
141
  pOperator->exprSupp.hasWindowOrGroup = false;
145,855,395✔
142
  pOperator->pTaskInfo = pTaskInfo;
145,858,963✔
143
  initOperatorCostInfo(pOperator);
145,849,326✔
144

145
  int32_t    lino = 0;
145,896,467✔
146

147
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pProjPhyNode->node.pOutputDataBlockDesc);
145,896,467✔
148
  TSDB_CHECK_NULL(pResBlock, code, lino, _error, terrno);
145,931,260✔
149

150
  initLimitInfo(pProjPhyNode->node.pLimit, pProjPhyNode->node.pSlimit, &pInfo->limitInfo);
145,931,260✔
151

152
  pInfo->binfo.pRes = pResBlock;
145,912,896✔
153
  pInfo->pFinalRes = NULL;
145,913,311✔
154

155
  code = createOneDataBlock(pResBlock, false, &pInfo->pFinalRes);
145,894,660✔
156
  TSDB_CHECK_CODE(code, lino, _error);
145,865,122✔
157

158
  pInfo->binfo.inputTsOrder = pProjPhyNode->node.inputTsOrder;
145,865,122✔
159
  pInfo->binfo.outputTsOrder = pProjPhyNode->node.outputTsOrder;
145,843,305✔
160
  pInfo->inputIgnoreGroup = pProjPhyNode->inputIgnoreGroup;
145,830,578✔
161
  pInfo->outputIgnoreGroup = pProjPhyNode->ignoreGroupId;
145,884,738✔
162

163
  if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {
145,876,991✔
164
    pInfo->mergeDataBlocks = false;
322,149✔
165
  } else {
166
    if (!pProjPhyNode->ignoreGroupId) {
145,509,689✔
167
      pInfo->mergeDataBlocks = false;
2,987,346✔
168
    } else {
169
      pInfo->mergeDataBlocks = pProjPhyNode->mergeDataBlock;
142,478,813✔
170
    }
171
  }
172

173
  int32_t numOfRows = 4096;
145,902,797✔
174
  size_t  keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
145,902,797✔
175

176
  // Make sure the size of SSDataBlock will never exceed the size of 2MB.
177
  int32_t TWOMB = 2 * 1024 * 1024;
145,902,797✔
178
  if (numOfRows * pResBlock->info.rowSize > TWOMB) {
145,902,797✔
179
    numOfRows = TWOMB / pResBlock->info.rowSize;
4,620,753✔
180
  }
181

182
  initResultSizeInfo(&pOperator->resultInfo, numOfRows);
145,870,869✔
183
  
184
  int32_t    numOfCols = 0;
145,839,058✔
185
  SExprInfo* pExprInfo = NULL;
145,842,705✔
186
  code = createExprInfo(pProjPhyNode->pProjections, NULL, &pExprInfo, &numOfCols);
145,845,388✔
187
  TSDB_CHECK_CODE(code, lino, _error);
145,854,359✔
188
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
145,573,900✔
189
                    NULL, &pTaskInfo->storageAPI.functionStore);
190
  TSDB_CHECK_CODE(code, lino, _error);
145,579,468✔
191

192
  initBasicInfo(&pInfo->binfo, pResBlock);
145,579,468✔
193
  code = setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfCols);
145,532,578✔
194
  TSDB_CHECK_CODE(code, lino, _error);
145,518,924✔
195

196
  code = filterInitFromNode((SNode*)pProjPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
145,542,189✔
197
                            pTaskInfo->pStreamRuntimeInfo);
145,518,924✔
198
  TSDB_CHECK_CODE(code, lino, _error);
145,553,548✔
199

200
  code = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfCols, &pInfo->pPseudoColInfo);
145,553,548✔
201
  TSDB_CHECK_CODE(code, lino, _error);
145,501,286✔
202

203
  setOperatorInfo(pOperator, "ProjectOperator", QUERY_NODE_PHYSICAL_PLAN_PROJECT, false, OP_NOT_OPENED, pInfo,
145,501,286✔
204
                  pTaskInfo);
205
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doProjectOperation, NULL, destroyProjectOperatorInfo,
145,593,402✔
206
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
207
  setOperatorStreamStateFn(pOperator, streamOperatorReleaseState, streamOperatorReloadState);
145,551,696✔
208
  setOperatorResetStateFn(pOperator, resetProjectOperState);
145,499,959✔
209

210
  if (NULL != downstream) {
145,508,401✔
211
    code = appendDownstream(pOperator, &downstream, 1);
143,680,536✔
212
    if (code != TSDB_CODE_SUCCESS) {
143,680,225✔
UNCOV
213
      goto _error;
×
214
    }
215
  }
216

217
  *pOptrInfo = pOperator;
145,508,090✔
218
  return TSDB_CODE_SUCCESS;
145,513,959✔
219

220
_error:
280,459✔
221
  if (pInfo != NULL) destroyProjectOperatorInfo(pInfo);
280,459✔
222
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
280,459✔
223
  pTaskInfo->code = code;
280,459✔
224
  return code;
280,459✔
225
}
226

227
static int32_t discardGroupDataBlock(SSDataBlock* pBlock, SLimitInfo* pLimitInfo) {
289,247,365✔
228
  if (pLimitInfo->remainGroupOffset > 0) {
289,247,365✔
229
    // it is the first group
230
    if (pLimitInfo->currentGroupId == 0 || pLimitInfo->currentGroupId == pBlock->info.id.groupId) {
1,016,910✔
231
      pLimitInfo->currentGroupId = pBlock->info.id.groupId;
271,648✔
232
      return PROJECT_RETRIEVE_CONTINUE;
271,648✔
233
    } else if (pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
745,262✔
234
      // now it is the data from a new group
235
      pLimitInfo->remainGroupOffset -= 1;
745,262✔
236
      pLimitInfo->currentGroupId = pBlock->info.id.groupId;
745,262✔
237

238
      // ignore data block in current group
239
      if (pLimitInfo->remainGroupOffset > 0) {
745,262✔
240
        return PROJECT_RETRIEVE_CONTINUE;
696,556✔
241
      }
242

243
      pLimitInfo->currentGroupId = 0;
48,706✔
244
    }
245
  }
246

247
  return PROJECT_RETRIEVE_DONE;
288,261,299✔
248
}
249

250
static int32_t setInfoForNewGroup(SSDataBlock* pBlock, SLimitInfo* pLimitInfo, SOperatorInfo* pOperator) {
288,267,527✔
251
  // remainGroupOffset == 0
252
  // here check for a new group data, we need to handle the data of the previous group.
253
  if (!(pLimitInfo->remainGroupOffset == 0 || pLimitInfo->remainGroupOffset == -1)) {
288,267,527✔
UNCOV
254
    qError("project failed at: %s:%d", __func__, __LINE__);
×
UNCOV
255
    return TSDB_CODE_INVALID_PARA;
×
256
  }
257

258
  bool newGroup = false;
288,319,895✔
259
  if (0 == pBlock->info.id.groupId) {
288,319,895✔
260
    pLimitInfo->numOfOutputGroups = 1;
252,459,201✔
261
  } else if (pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
35,710,715✔
262
    pLimitInfo->currentGroupId = pBlock->info.id.groupId;
28,619,420✔
263
    pLimitInfo->numOfOutputGroups += 1;
28,620,921✔
264
    newGroup = true;
28,616,245✔
265
  } else {
266
    return PROJECT_RETRIEVE_CONTINUE;
7,109,626✔
267
  }
268

269
  if ((pLimitInfo->slimit.limit >= 0) && (pLimitInfo->slimit.limit < pLimitInfo->numOfOutputGroups)) {
281,143,621✔
270
    setOperatorCompleted(pOperator);
169,169✔
271
    return PROJECT_RETRIEVE_DONE;
169,169✔
272
  }
273

274
  // reset the value for a new group data
275
  // existing rows that belongs to previous group.
276
  if (newGroup) {
280,959,749✔
277
    resetLimitInfoForNextGroup(pLimitInfo);
28,439,456✔
278
  }
279

280
  return PROJECT_RETRIEVE_CONTINUE;
280,903,140✔
281
}
282

283
// todo refactor
284
static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SSDataBlock* pBlock,
288,815,523✔
285
                                    SOperatorInfo* pOperator) {
286
  // set current group id
287
  pLimitInfo->currentGroupId = groupId;
288,815,523✔
288
  bool limitReached = applyLimitOffset(pLimitInfo, pBlock, pOperator->pTaskInfo);
288,882,074✔
289
  if (pBlock->info.rows == 0 && 0 != pLimitInfo->limit.limit) {
288,881,864✔
290
    return PROJECT_RETRIEVE_CONTINUE;
5,393,525✔
291
  } else {
292
    if (limitReached && (pLimitInfo->slimit.limit >= 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
283,486,255✔
293
      setOperatorCompleted(pOperator);
92,707✔
294
    } else if (limitReached && groupId == 0) {
283,393,548✔
295
      setOperatorCompleted(pOperator);
7,801,194✔
296
    }
297
  }
298

299
  return PROJECT_RETRIEVE_DONE;
283,367,180✔
300
}
301

302
int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
397,775,404✔
303
  QRY_PARAM_CHECK(pResBlock);
397,775,404✔
304

305
  SProjectOperatorInfo* pProjectInfo = pOperator->info;
397,823,716✔
306
  SOptrBasicInfo*       pInfo = &pProjectInfo->binfo;
397,809,962✔
307
  SExprSupp*            pSup = &pOperator->exprSupp;
397,838,740✔
308
  SSDataBlock*          pRes = pInfo->pRes;
397,789,434✔
309
  SSDataBlock*          pFinalRes = pProjectInfo->pFinalRes;
397,770,166✔
310
  int32_t               code = 0;
397,834,278✔
311
  int32_t               lino = 0;
397,834,278✔
312
  int32_t               order = pInfo->inputTsOrder;
397,834,278✔
313
  int32_t               scanFlag = 0;
397,808,113✔
314

315
  blockDataCleanup(pFinalRes);
397,808,113✔
316
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
397,778,861✔
317

318
  if (pOperator->status == OP_EXEC_DONE) {
397,806,032✔
319
    return code;
81,769,739✔
320
  }
321

322
  SOperatorInfo* downstream = pOperator->numOfDownstream > 0 ? pOperator->pDownstream[0] : NULL;
316,092,399✔
323
  SLimitInfo*    pLimitInfo = &pProjectInfo->limitInfo;
316,090,121✔
324

325
  if (downstream == NULL) {
316,122,298✔
326
    code = doGenerateSourceData(pOperator);
1,824,410✔
327
    QUERY_CHECK_CODE(code, lino, _end);
1,824,410✔
328

329
    if (pProjectInfo->outputIgnoreGroup) {
1,824,410✔
330
      pRes->info.id.groupId = 0;
1,824,410✔
331
    }
332

333
    *pResBlock = (pRes->info.rows > 0)? pRes:NULL;
1,824,410✔
334
    return code;
1,824,410✔
335
  }
336

337
  while (1) {
117,160,892✔
338
    while (1) {
6,361,729✔
339
      blockDataCleanup(pRes);
437,820,509✔
340

341
      // The downstream exec may change the value of the newgroup, so use a local variable instead.
342
      SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
437,756,659✔
343
      if (pBlock == NULL) {
435,202,243✔
344
        qDebug("set op close, exec %d, status %d rows %" PRId64 , pTaskInfo->execModel, pOperator->status, pFinalRes->info.rows);
146,015,672✔
345
        setOperatorCompleted(pOperator);
146,020,193✔
346
        break;
146,007,800✔
347
      }
348
//      if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {
349
//        qDebug("set status recv");
350
//        pOperator->status = OP_EXEC_RECV;
351
//      }
352

353
      if (pProjectInfo->inputIgnoreGroup) {
289,186,571✔
354
        pBlock->info.id.groupId = 0;
5,101,506✔
355
      }
356

357
      int32_t status = discardGroupDataBlock(pBlock, pLimitInfo);
289,184,956✔
358
      if (status == PROJECT_RETRIEVE_CONTINUE) {
289,234,114✔
359
        continue;
968,204✔
360
      }
361

362
      (void) setInfoForNewGroup(pBlock, pLimitInfo, pOperator);
288,265,910✔
363
      if (pOperator->status == OP_EXEC_DONE) {
288,156,004✔
364
        break;
169,169✔
365
      }
366

367
      if (pProjectInfo->mergeDataBlocks) {
288,059,779✔
368
        pFinalRes->info.scanFlag = scanFlag = pBlock->info.scanFlag;
128,623,622✔
369
      } else {
370
        pRes->info.scanFlag = scanFlag = pBlock->info.scanFlag;
159,514,710✔
371
      }
372

373
      code = setInputDataBlock(pSup, pBlock, order, scanFlag, false);
288,075,514✔
374
      QUERY_CHECK_CODE(code, lino, _end);
288,122,679✔
375

376
      code = blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
288,122,679✔
377
      QUERY_CHECK_CODE(code, lino, _end);
288,163,211✔
378

379
      code = projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs,
288,073,454✔
380
                                   pProjectInfo->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo));
288,163,211✔
381
      QUERY_CHECK_CODE(code, lino, _end);
288,069,241✔
382

383
      status = doIngroupLimitOffset(pLimitInfo, pBlock->info.id.groupId, pInfo->pRes, pOperator);
286,968,668✔
384
      if (status == PROJECT_RETRIEVE_CONTINUE) {
286,944,971✔
385
        continue;
5,393,525✔
386
      }
387

388
      break;
281,551,446✔
389
    }
390

391
      if (pProjectInfo->mergeDataBlocks) {
427,728,415✔
392
        if (pRes->info.rows > 0) {
222,949,701✔
393
          pFinalRes->info.id.groupId = 0;  // clear groupId
125,602,938✔
394
          pFinalRes->info.version = pRes->info.version;
125,603,653✔
395
          // keep baseGId from current upstream block; already set above for this merge round
396

397
          // continue merge data, ignore the group id
398
          code = blockDataMerge(pFinalRes, pRes);
125,603,653✔
399
          QUERY_CHECK_CODE(code, lino, _end);
125,602,223✔
400

401
        if (pFinalRes->info.rows + pRes->info.rows <= pOperator->resultInfo.threshold && (pOperator->status != OP_EXEC_DONE)) {
125,602,223✔
402
          continue;
117,123,710✔
403
        }
404
      }
405

406
      // do apply filter
407
      code = doFilter(pFinalRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
105,825,276✔
408
      QUERY_CHECK_CODE(code, lino, _end);
105,824,561✔
409

410
      // when apply the limit/offset for each group, pRes->info.rows may be 0, due to limit constraint.
411
      if (pFinalRes->info.rows > 0 || (pOperator->status == OP_EXEC_DONE)) {
105,824,561✔
412
        qDebug("project return %" PRId64 " rows, status %d", pFinalRes->info.rows, pOperator->status);
105,787,379✔
413
        break;
105,787,379✔
414
      }
415
    } else {
416
      // do apply filter
417
      if (pRes->info.rows > 0) {
204,832,066✔
418
        code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
155,918,989✔
419
        QUERY_CHECK_CODE(code, lino, _end);
155,892,598✔
420

421
        if (pRes->info.rows == 0) {
155,892,598✔
UNCOV
422
          continue;
×
423
        }
424
      }
425

426
      // no results generated
427
      break;
204,835,634✔
428
    }
429
  }
430

431
  SSDataBlock* p = pProjectInfo->mergeDataBlocks ? pFinalRes : pRes;
310,623,013✔
432
  p->info.dataLoad = 1;
310,673,055✔
433

434
  if (pProjectInfo->outputIgnoreGroup) {
310,551,653✔
435
    p->info.id.groupId = 0;
293,524,670✔
436
  }
437

438
  if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
310,514,815✔
439
    printDataBlock(p, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo), pTaskInfo->id.queryId);
8,397,345✔
440
  }
441

442
  *pResBlock = (p->info.rows > 0)? p:NULL;
310,633,466✔
443

444
_end:
311,689,489✔
445
  if (code != TSDB_CODE_SUCCESS) {
311,689,489✔
446
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
1,100,573✔
447
    pTaskInfo->code = code;
1,100,573✔
448
    T_LONG_JMP(pTaskInfo->env, code);
1,100,573✔
449
  }
450
  return code;
310,588,916✔
451
}
452

453
static int32_t resetIndefinitOutputOperState(SOperatorInfo* pOper) {
448✔
454
  SIndefOperatorInfo* pInfo = pOper->info;
448✔
455
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
448✔
456
  SIndefRowsFuncPhysiNode* pPhynode = (SIndefRowsFuncPhysiNode*)pOper->pPhyNode;
448✔
457
  pOper->status = OP_NOT_OPENED;
448✔
458

459
  resetBasicOperatorState(&pInfo->binfo);
448✔
460

461
  pInfo->groupId = 0;
448✔
462
  pInfo->pNextGroupRes = NULL;
448✔
463
  int32_t code = resetAggSup(&pOper->exprSupp, &pInfo->aggSup, pTaskInfo, pPhynode->pFuncs, NULL,
896✔
464
    sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, NULL,
448✔
465
    &pTaskInfo->storageAPI.functionStore);
466
  if (code == 0){
448✔
467
    code = setFunctionResultOutput(pOper, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, pOper->exprSupp.numOfExprs);
448✔
468
  }
469

470
  if (code == 0) {
448✔
471
    code = resetExprSupp(&pInfo->scalarSup, pTaskInfo, pPhynode->pExprs, NULL,
448✔
472
                         &pTaskInfo->storageAPI.functionStore);
473
  }
474
  return 0;
448✔
475
}
476

477
int32_t createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode,
3,144,861✔
478
                                                 SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
479
  QRY_PARAM_CHECK(pOptrInfo);
3,144,861✔
480
  int32_t code = 0;
3,143,428✔
481
  int32_t lino = 0;
3,143,428✔
482
  int32_t numOfRows = 4096;
3,143,428✔
483
  size_t  keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
3,143,428✔
484

485
  SIndefOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIndefOperatorInfo));
3,143,428✔
486
  SOperatorInfo*      pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
3,142,682✔
487
  if (pInfo == NULL || pOperator == NULL) {
3,143,581✔
UNCOV
488
    code = terrno;
×
UNCOV
489
    goto _error;
×
490
  }
491

492
  pOperator->pPhyNode = pNode;
3,143,581✔
493
  pOperator->pTaskInfo = pTaskInfo;
3,144,480✔
494
  initOperatorCostInfo(pOperator);
3,144,480✔
495

496
  SExprSupp* pSup = &pOperator->exprSupp;
3,144,861✔
497
  pSup->hasWindowOrGroup = false;
3,144,861✔
498

499
  SIndefRowsFuncPhysiNode* pPhyNode = (SIndefRowsFuncPhysiNode*)pNode;
3,144,708✔
500

501
  if (pPhyNode->pExprs != NULL) {
3,144,708✔
502
    int32_t    num = 0;
23,276✔
503
    SExprInfo* pSExpr = NULL;
23,276✔
504
    code = createExprInfo(pPhyNode->pExprs, NULL, &pSExpr, &num);
23,276✔
505
    QUERY_CHECK_CODE(code, lino, _error);
23,276✔
506

507
    code = initExprSupp(&pInfo->scalarSup, pSExpr, num, &pTaskInfo->storageAPI.functionStore);
23,276✔
508
    if (code != TSDB_CODE_SUCCESS) {
23,276✔
UNCOV
509
      goto _error;
×
510
    }
511
  }
512

513
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->node.pOutputDataBlockDesc);
3,143,809✔
514
  TSDB_CHECK_NULL(pResBlock, code, lino, _error, terrno);
3,145,234✔
515

516
  // Make sure the size of SSDataBlock will never exceed the size of 2MB.
517
  int32_t TWOMB = 2 * 1024 * 1024;
3,145,234✔
518
  if (numOfRows * pResBlock->info.rowSize > TWOMB) {
3,145,234✔
UNCOV
519
    numOfRows = TWOMB / pResBlock->info.rowSize;
×
520
  }
521

522
  initBasicInfo(&pInfo->binfo, pResBlock);
3,144,708✔
523
  initResultSizeInfo(&pOperator->resultInfo, numOfRows);
3,144,861✔
524
  code = blockDataEnsureCapacity(pResBlock, numOfRows);
3,144,335✔
525
  TSDB_CHECK_CODE(code, lino, _error);
3,144,708✔
526

527
  int32_t    numOfExpr = 0;
3,144,708✔
528
  SExprInfo* pExprInfo = NULL;
3,144,335✔
529
  code = createExprInfo(pPhyNode->pFuncs, NULL, &pExprInfo, &numOfExpr);
3,144,335✔
530
  TSDB_CHECK_CODE(code, lino, _error);
3,142,529✔
531

532
  code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str,
3,142,529✔
533
                            NULL, &pTaskInfo->storageAPI.functionStore);
534
  TSDB_CHECK_CODE(code, lino, _error);
3,144,107✔
535

536
  code = setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr);
3,144,107✔
537
  TSDB_CHECK_CODE(code, lino, _error);
3,142,835✔
538

539
  code = filterInitFromNode((SNode*)pPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
3,143,962✔
540
                            pTaskInfo->pStreamRuntimeInfo);
3,142,835✔
541
  TSDB_CHECK_CODE(code, lino, _error);
3,143,063✔
542

543
  pInfo->binfo.pRes = pResBlock;
3,143,063✔
544
  pInfo->binfo.inputTsOrder = pNode->inputTsOrder;
3,143,962✔
545
  pInfo->binfo.outputTsOrder = pNode->outputTsOrder;
3,143,063✔
546
  code = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr, &pInfo->pPseudoColInfo);
3,141,257✔
547
  TSDB_CHECK_CODE(code, lino, _error);
3,143,063✔
548

549
  setOperatorInfo(pOperator, "IndefinitOperator", QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC, false, OP_NOT_OPENED, pInfo,
3,143,063✔
550
                  pTaskInfo);
551
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doApplyIndefinitFunction, NULL, destroyIndefinitOperatorInfo,
3,144,861✔
552
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
553
                                         
554
  setOperatorResetStateFn(pOperator, resetIndefinitOutputOperState);
3,143,742✔
555
  code = appendDownstream(pOperator, &downstream, 1);
3,143,589✔
556
  if (code != TSDB_CODE_SUCCESS) {
3,143,742✔
UNCOV
557
    goto _error;
×
558
  }
559

560
  *pOptrInfo = pOperator;
3,143,742✔
561
  return TSDB_CODE_SUCCESS;
3,143,216✔
562

UNCOV
563
_error:
×
UNCOV
564
  if (pInfo != NULL) destroyIndefinitOperatorInfo(pInfo);
×
UNCOV
565
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
UNCOV
566
  pTaskInfo->code = code;
×
UNCOV
567
  return code;
×
568
}
569

570
static void doHandleDataBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, SOperatorInfo* downstream,
48,062,095✔
571
                              SExecTaskInfo* pTaskInfo) {
572
  SIndefOperatorInfo* pIndefInfo = pOperator->info;
48,062,095✔
573
  SOptrBasicInfo*     pInfo = &pIndefInfo->binfo;
48,062,476✔
574
  SExprSupp*          pSup = &pOperator->exprSupp;
48,062,476✔
575

576
  int32_t order = pInfo->inputTsOrder;
48,062,095✔
577
  int32_t scanFlag = pBlock->info.scanFlag;
48,062,095✔
578
  int32_t code = TSDB_CODE_SUCCESS;
48,062,095✔
579

580
  // there is an scalar expression that needs to be calculated before apply the group aggregation.
581
  SExprSupp* pScalarSup = &pIndefInfo->scalarSup;
48,062,095✔
582
  if (pScalarSup->pExprInfo != NULL) {
48,062,476✔
583
    code = projectApplyFunctions(pScalarSup->pExprInfo, pBlock, pBlock, pScalarSup->pCtx, pScalarSup->numOfExprs,
70,144✔
584
                                 pIndefInfo->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo));
70,144✔
585
    if (code != TSDB_CODE_SUCCESS) {
70,144✔
UNCOV
586
      T_LONG_JMP(pTaskInfo->env, code);
×
587
    }
588
  }
589

590
  code = setInputDataBlock(pSup, pBlock, order, scanFlag, false);
48,062,095✔
591
  if (code) {
48,062,476✔
UNCOV
592
    T_LONG_JMP(pTaskInfo->env, code);
×
593
  }
594

595
  code = blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
48,062,476✔
596
  if (code != TSDB_CODE_SUCCESS) {
48,062,476✔
UNCOV
597
    T_LONG_JMP(pTaskInfo->env, code);
×
598
  }
599

600
  code = projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs,
48,062,095✔
601
                               pIndefInfo->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo));
48,062,476✔
602
  if (code != TSDB_CODE_SUCCESS) {
48,062,476✔
603
    T_LONG_JMP(pTaskInfo->env, code);
18,028✔
604
  }
605
}
48,044,448✔
606

607
int32_t doApplyIndefinitFunction(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
10,506,432✔
608
  QRY_PARAM_CHECK(pResBlock);
10,506,432✔
609
  SIndefOperatorInfo* pIndefInfo = pOperator->info;
10,506,432✔
610
  SOptrBasicInfo*     pInfo = &pIndefInfo->binfo;
10,504,779✔
611
  SExprSupp*          pSup = &pOperator->exprSupp;
10,506,424✔
612
  int32_t             code = TSDB_CODE_SUCCESS;
10,506,958✔
613
  int32_t             lino = 0;
10,506,958✔
614
  SSDataBlock*        pRes = pInfo->pRes;
10,506,958✔
615

616
  blockDataCleanup(pRes);
10,505,686✔
617

618
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
10,506,805✔
619
  if (pOperator->status == OP_EXEC_DONE) {
10,506,805✔
620
    return code;
1,874,283✔
621
  }
622

623
  SOperatorInfo* downstream = pOperator->pDownstream[0];
8,631,623✔
624
  bool           noSplitOutput = hasLagLeadFunc(pSup);
8,631,623✔
625

626
  while (1) {
2,911,098✔
627
    // here we need to handle the existsed group results
628
    if (pIndefInfo->pNextGroupRes != NULL) {  // todo extract method
11,543,773✔
629
      for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
20,974,825✔
630
        SqlFunctionCtx* pCtx = &pSup->pCtx[k];
15,556,237✔
631

632
        SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
15,556,237✔
633
        if (pResInfo->initialized && pCtx->fpSet.cleanup != NULL) {
15,556,237✔
634
          pCtx->fpSet.cleanup(pCtx);
2,191✔
635
        }
636
        pResInfo->initialized = false;
15,556,237✔
637
        pCtx->pOutput = NULL;
15,556,237✔
638
      }
639

640
      doHandleDataBlock(pOperator, pIndefInfo->pNextGroupRes, downstream, pTaskInfo);
5,418,588✔
641
      pIndefInfo->pNextGroupRes = NULL;
5,418,588✔
642
    }
643

644
    if (noSplitOutput || pInfo->pRes->info.rows < pOperator->resultInfo.threshold) {
11,544,146✔
645
      while (1) {
39,614,581✔
646
        // The downstream exec may change the value of the newgroup, so use a local variable instead.
647
        SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
51,156,921✔
648
        if (pBlock == NULL) {
51,159,100✔
649
          setOperatorCompleted(pOperator);
3,077,752✔
650
          break;
3,077,752✔
651
        }
652
        pInfo->pRes->info.scanFlag = pBlock->info.scanFlag;
48,081,348✔
653

654
        if (pIndefInfo->groupId == 0 && pBlock->info.id.groupId != 0) {
48,081,348✔
655
          pIndefInfo->groupId = pBlock->info.id.groupId;  // this is the initial group result
400,616✔
656
        } else {
657
          if (pIndefInfo->groupId != pBlock->info.id.groupId) {  // reset output buffer and computing status
47,680,732✔
658
            pIndefInfo->groupId = pBlock->info.id.groupId;
5,437,460✔
659
            pIndefInfo->pNextGroupRes = pBlock;
5,437,460✔
660
            break;
5,437,460✔
661
          }
662
        }
663

664
        doHandleDataBlock(pOperator, pBlock, downstream, pTaskInfo);
42,643,507✔
665
        if (!noSplitOutput && pInfo->pRes->info.rows >= pOperator->resultInfo.threshold) {
42,625,860✔
666
          break;
3,011,279✔
667
        }
668
      }
669
    }
670

671
    code = doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
11,527,017✔
672
    QUERY_CHECK_CODE(code, lino, _end);
11,526,491✔
673

674
    size_t rows = pInfo->pRes->info.rows;
11,526,491✔
675
    if (rows > 0 || pOperator->status == OP_EXEC_DONE) {
11,526,491✔
676
      break;
677
    } else {
678
      blockDataCleanup(pInfo->pRes);
2,911,098✔
679
    }
680
  }
681

682
  *pResBlock = (pInfo->pRes->info.rows> 0) ? pInfo->pRes : NULL;
8,615,393✔
683

684
_end:
8,615,393✔
685
  if (code != TSDB_CODE_SUCCESS) {
8,615,393✔
UNCOV
686
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
687
    pTaskInfo->code = code;
×
UNCOV
688
    T_LONG_JMP(pTaskInfo->env, code);
×
689
  }
690
  return code;
8,615,393✔
691
}
692

693
int32_t initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size) {
157,875,373✔
694
  int32_t code = TSDB_CODE_SUCCESS;
157,875,373✔
695
  for (int32_t j = 0; j < size; ++j) {
752,645,948✔
696
    struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(&pCtx[j]);
594,846,575✔
697
    if (isRowEntryInitialized(pResInfo) || fmIsPseudoColumnFunc(pCtx[j].functionId) || pCtx[j].functionId == -1 ||
645,054,795✔
698
        fmIsScalarFunc(pCtx[j].functionId)) {
50,229,555✔
699
      continue;
591,021,140✔
700
    }
701

702
    code = pCtx[j].fpSet.init(&pCtx[j], pCtx[j].resultInfo);
3,817,178✔
703
    if (code) {
3,749,435✔
UNCOV
704
      return code;
×
705
    }
706
  }
707

708
  return 0;
157,799,373✔
709
}
710

711
/*
712
 * The start of each column SResultRowEntryInfo is denote by RowCellInfoOffset.
713
 * Note that in case of top/bottom query, the whole multiple rows of result is treated as only one row of results.
714
 * +------------+-----------------result column 1------------+------------------result column 2-----------+
715
 * | SResultRow | SResultRowEntryInfo | intermediate buffer1 | SResultRowEntryInfo | intermediate buffer 2|
716
 * +------------+--------------------------------------------+--------------------------------------------+
717
 *           offset[0]                                  offset[1]                                   offset[2]
718
 */
719
// TODO refactor: some function move away
720
int32_t setFunctionResultOutput(struct SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t stage,
157,937,331✔
721
                             int32_t numOfExprs) {
722
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
157,937,331✔
723
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
157,942,297✔
724
  int32_t*        rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset;
157,923,534✔
725

726
  SResultRowInfo* pResultRowInfo = &pInfo->resultRowInfo;
157,837,358✔
727
  initResultRowInfo(pResultRowInfo);
157,887,973✔
728

729
  int64_t     tid = 0;
157,845,567✔
730
  int64_t     groupId = 0;
157,831,016✔
731
  SResultRow* pRow = doSetResultOutBufByKey(pSup->pResultBuf, pResultRowInfo, (char*)&tid, sizeof(tid), true, groupId,
157,831,016✔
732
                                            pTaskInfo, false, pSup, true);
733
  if (pRow == NULL || pTaskInfo->code != 0) {
157,875,898✔
734
    return pTaskInfo->code;
10,736✔
735
  }
736

737
  for (int32_t i = 0; i < numOfExprs; ++i) {
752,744,277✔
738
    struct SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, rowEntryInfoOffset);
594,871,162✔
739
    cleanupResultRowEntry(pEntry);
594,757,792✔
740

741
    pCtx[i].resultInfo = pEntry;
594,766,357✔
742
    pCtx[i].scanFlag = stage;
594,796,263✔
743
  }
744

745
  return initCtxOutputBuffer(pCtx, numOfExprs);
157,873,115✔
746
}
747

748
int32_t setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols, SArray** pResList) {
148,751,268✔
749
  QRY_PARAM_CHECK(pResList);
148,751,268✔
750
  SArray* pList = taosArrayInit(4, sizeof(int32_t));
148,758,226✔
751
  if (pList == NULL) {
148,746,493✔
UNCOV
752
    return terrno;
×
753
  }
754

755
  for (int32_t i = 0; i < numOfCols; ++i) {
709,426,968✔
756
    if (fmIsPseudoColumnFunc(pCtx[i].functionId) && !fmIsPlaceHolderFunc(pCtx[i].functionId)) {
560,768,325✔
UNCOV
757
      void* px = taosArrayPush(pList, &i);
×
UNCOV
758
      if (px == NULL) {
×
UNCOV
759
        return terrno;
×
760
      }
761
    }
762
  }
763

764
  *pResList = pList;
148,647,829✔
765
  return 0;
148,655,956✔
766
}
767

768
int32_t doGenerateSourceData(SOperatorInfo* pOperator) {
1,824,410✔
769
  SProjectOperatorInfo* pProjectInfo = pOperator->info;
1,824,410✔
770

771
  SExprSupp*   pSup = &pOperator->exprSupp;
1,824,410✔
772
  SSDataBlock* pRes = pProjectInfo->binfo.pRes;
1,824,410✔
773
  SExprInfo*   pExpr = pSup->pExprInfo;
1,824,410✔
774
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1,824,410✔
775

776
  int32_t code = blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);
1,824,410✔
777
  if (code) {
1,824,410✔
778
    return code;
×
779
  }
780

781
  for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
3,652,721✔
782
    int32_t outputSlotId = pExpr[k].base.resSchema.slotId;
1,828,311✔
783

784
    if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) {
1,828,311✔
785
      SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, outputSlotId);
1,828,311✔
786
      if (pColInfoData == NULL) {
1,828,311✔
UNCOV
787
        return terrno;
×
788
      }
789

790
      int32_t type = pExpr[k].base.pParam[0].param.nType;
1,828,311✔
791
      if (TSDB_DATA_TYPE_NULL == type) {
1,828,311✔
792
        colDataSetNNULL(pColInfoData, 0, 1);
793
      } else {
794
        code = colDataSetVal(pColInfoData, 0, taosVariantGet(&pExpr[k].base.pParam[0].param, type), false);
1,825,299✔
795
        if (code) {
1,825,299✔
UNCOV
796
          return code;
×
797
        }
798
      }
UNCOV
799
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) {
×
UNCOV
800
      SqlFunctionCtx* pfCtx = &pSup->pCtx[k];
×
801

802
      // UDF scalar functions will be calculated here, for example, select foo(n) from (select 1 n).
803
      // UDF aggregate functions will be handled in agg operator.
UNCOV
804
      if (fmIsScalarFunc(pfCtx->functionId)) {
×
UNCOV
805
        SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
×
UNCOV
806
        if (pBlockList == NULL) {
×
807
          return terrno;
×
808
        }
809

UNCOV
810
        void* px = taosArrayPush(pBlockList, &pRes);
×
UNCOV
811
        if (px == NULL) {
×
UNCOV
812
          return terrno;
×
813
        }
814

UNCOV
815
        SColumnInfoData* pResColData = taosArrayGet(pRes->pDataBlock, outputSlotId);
×
816
        if (pResColData == NULL) {
×
UNCOV
817
          return terrno;
×
818
        }
819

820
        SColumnInfoData  idata = {.info = pResColData->info, .hasNull = true};
×
821

UNCOV
822
        SScalarParam dest = {.columnData = &idata};
×
UNCOV
823
        gTaskScalarExtra.pStreamInfo = GET_STM_RTINFO(pOperator->pTaskInfo);
×
824
        gTaskScalarExtra.pStreamRange = NULL;
×
825
        code = scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest, &gTaskScalarExtra);
×
826
        if (code != TSDB_CODE_SUCCESS) {
×
827
          taosArrayDestroy(pBlockList);
×
UNCOV
828
          return code;
×
829
        }
830

831
        int32_t startOffset = pRes->info.rows;
×
832
        if (pRes->info.capacity <= 0) {
×
UNCOV
833
          qError("project failed at: %s:%d", __func__, __LINE__);
×
UNCOV
834
          return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
835
        }
836
        code = colDataAssign(pResColData, &idata, dest.numOfRows, &pRes->info);
×
837
        if (code) {
×
UNCOV
838
          return code;
×
839
        }
840

UNCOV
841
        colDataDestroy(&idata);
×
842
        taosArrayDestroy(pBlockList);
×
843
      } else {
844
        return TSDB_CODE_OPS_NOT_SUPPORT;
×
845
      }
846
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_OPERATOR) {
×
847
      TAOS_CHECK_RETURN(projectApplyOperator(&pExpr[k], pRes, NULL, outputSlotId, NULL, false, &gTaskScalarExtra));
×
848
    } else {
UNCOV
849
      return TSDB_CODE_OPS_NOT_SUPPORT;
×
850
    }
851
  }
852

853
  pRes->info.rows = 1;
1,824,410✔
854
  code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
1,824,410✔
855
  if (code) {
1,824,410✔
856
    pTaskInfo->code = code;
×
857
    return code;
×
858
  }
859

860
  (void) doIngroupLimitOffset(&pProjectInfo->limitInfo, 0, pRes, pOperator);
1,824,410✔
861

862
  setOperatorCompleted(pOperator);
1,824,410✔
863

864
  return code;
1,824,410✔
865
}
866

867
static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, SArray* pPseudoList) {
444,432,965✔
868
  size_t num = (pPseudoList != NULL) ? taosArrayGetSize(pPseudoList) : 0;
444,432,965✔
869
  for (int32_t i = 0; i < num; ++i) {
444,576,855✔
UNCOV
870
    pCtx[i].pOutput = taosArrayGet(pResult->pDataBlock, i);
×
UNCOV
871
    if (pCtx[i].pOutput == NULL) {
×
UNCOV
872
      qError("failed to get the output buf, ptr is null");
×
873
    }
874
  }
875
}
444,576,855✔
876

877
int32_t projectApplyColumn(SSDataBlock* pResult, SSDataBlock* pSrcBlock, int32_t outputSlotId, SqlFunctionCtx* pfCtx, int32_t* numOfRows, bool createNewColModel) {
1,000,276,473✔
878
  int32_t code = 0, lino = 0;
1,000,276,473✔
879
  SInputColumnInfoData* pInputData = &pfCtx->input;
1,000,276,473✔
880
  SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
1,000,309,224✔
881
  TSDB_CHECK_NULL(pColInfoData, code, lino, _exit, terrno);
1,000,334,398✔
882

883
  if (pResult->info.rows > 0 && !createNewColModel) {
1,000,334,398✔
884
    if (pInputData->pData[0] == NULL) {
10,008,723✔
885
      int32_t slotId = pfCtx->param[0].pCol->slotId;
10,009,122✔
886

887
      SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId);
10,009,122✔
888
      TSDB_CHECK_NULL(pInput, code, lino, _exit, terrno);
10,008,723✔
889

890
      TAOS_CHECK_EXIT(colDataMergeCol(pColInfoData, pResult->info.rows, (int32_t*)&pResult->info.capacity, pInput,
10,008,723✔
891
                            pSrcBlock->info.rows));
892
      *numOfRows = pSrcBlock->info.rows;
10,008,723✔
893
      return code;
10,009,122✔
894
    }
895
    
UNCOV
896
    TAOS_CHECK_EXIT(colDataMergeCol(pColInfoData, pResult->info.rows, (int32_t*)&pResult->info.capacity,
×
897
                          pInputData->pData[0], pInputData->numOfRows));
UNCOV
898
    *numOfRows = pInputData->numOfRows;
×
UNCOV
899
    return code;
×
900
  } 
901
  
902
  if (pInputData->pData[0] == NULL) {
990,335,039✔
903
    int32_t slotId = pfCtx->param[0].pCol->slotId;
8,223,073✔
904

905
    SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId);
8,223,073✔
906
    TSDB_CHECK_NULL(pInput, code, lino, _exit, terrno);
8,222,674✔
907

908
    TAOS_CHECK_EXIT(colDataAssign(pColInfoData, pInput, pSrcBlock->info.rows, &pResult->info));
8,222,674✔
909
    *numOfRows = pSrcBlock->info.rows;
8,223,073✔
910

911
    return code;
8,223,073✔
912
  }
913
  
914
  TAOS_CHECK_EXIT(colDataAssign(pColInfoData, pInputData->pData[0], pInputData->numOfRows, &pResult->info));
982,007,518✔
915
  *numOfRows = pInputData->numOfRows;
982,061,800✔
916

917
_exit:
982,073,160✔
918

919
  if (code) {
982,073,160✔
UNCOV
920
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
921
  }
922
  
923
  return code;
982,064,224✔
924
}
925

926

927
int32_t projectApplyValue(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, int32_t outputSlotId, int32_t* numOfRows, bool createNewColModel) {
17,975,846✔
928
  int32_t code = 0, lino = 0;
17,975,846✔
929
  SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
17,975,846✔
930
  TSDB_CHECK_NULL(pColInfoData, code, lino, _exit, terrno);
17,959,675✔
931

932
  int32_t offset = createNewColModel ? 0 : pResult->info.rows;
17,959,675✔
933
  int32_t type = pExpr->base.pParam[0].param.nType;
17,972,684✔
934
  if (TSDB_DATA_TYPE_NULL == type) {
17,965,452✔
935
    colDataSetNNULL(pColInfoData, offset, pSrcBlock->info.rows);
701,716✔
936
  } else {
937
    char* p = taosVariantGet(&pExpr->base.pParam[0].param, type);
17,263,736✔
938
    for (int32_t i = 0; i < pSrcBlock->info.rows; ++i) {
2,147,483,647✔
939
      TAOS_CHECK_EXIT(colDataSetVal(pColInfoData, i + offset, p, false));
2,147,483,647✔
940
    }
941
  }
942

943
  *numOfRows = pSrcBlock->info.rows;
10,079,793✔
944

945
_exit:
17,976,702✔
946

947
  if (code) {
17,976,702✔
UNCOV
948
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
949
  }
950
  
951
  return code;
17,970,746✔
952
}
953

954

955

956
int32_t projectApplyOperator(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, int32_t outputSlotId, int32_t* numOfRows, bool createNewColModel, const void* pExtraParams) {
84,421,247✔
957
  int32_t code = 0, lino = 0;
84,421,247✔
958
  SArray* pBlockList = NULL;
84,421,247✔
959
  if (NULL != pSrcBlock) {
84,421,247✔
960
    pBlockList = taosArrayInit(4, POINTER_BYTES);
84,419,626✔
961
    TSDB_CHECK_NULL(pBlockList, code, lino, _exit, terrno);
84,420,714✔
962

963
    void* px = taosArrayPush(pBlockList, &pSrcBlock);
84,420,184✔
964
    TSDB_CHECK_NULL(px, code, lino, _exit, terrno);
84,420,184✔
965
  }
966
  
967
  SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId);
84,421,805✔
968
  TSDB_CHECK_NULL(pResColData, code, lino, _exit, terrno);
84,409,264✔
969

970
  SColumnInfoData idata = {.info = pResColData->info, .hasNull = true};
84,409,264✔
971
  SScalarParam dest = {.columnData = &idata};
84,416,369✔
972
  gTaskScalarExtra.pStreamInfo = (void*)pExtraParams;
84,409,448✔
973
  gTaskScalarExtra.pStreamRange = NULL;
84,409,448✔
974
  TAOS_CHECK_EXIT(scalarCalculate(pExpr->pExpr->_optrRoot.pRootNode, pBlockList, &dest, &gTaskScalarExtra));
84,411,048✔
975

976
  if (pResult->info.rows > 0 && !createNewColModel) {
83,211,025✔
977
    code = colDataMergeCol(pResColData, pResult->info.rows, (int32_t*)&pResult->info.capacity, &idata, dest.numOfRows);
6,384✔
978
  } else {
979
    code = colDataAssign(pResColData, &idata, dest.numOfRows, &pResult->info);
83,203,539✔
980
  }
981

982
  colDataDestroy(&idata);
83,218,580✔
983
  TAOS_CHECK_EXIT(code);
83,216,589✔
984

985
  if (numOfRows) {
83,216,589✔
986
    *numOfRows = dest.numOfRows;
83,217,133✔
987
  }
988
  
989
_exit:
84,411,613✔
990

991
  if (code < 0) {
84,417,595✔
992
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
1,201,568✔
993
  }
994

995
  taosArrayDestroy(pBlockList);
84,417,595✔
996
  
997
  return code;
84,417,406✔
998
}
999

1000

1001
int32_t projectApplyFunction(SqlFunctionCtx* pCtx, SqlFunctionCtx* pfCtx, SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, 
374,722,335✔
1002
                                    int32_t outputSlotId, int32_t* numOfRows, bool createNewColModel, const void* pExtraParams, 
1003
                                    SArray* pPseudoList, SArray** processByRowFunctionCtx, bool doSelectFunc) {
1004
  int32_t code = 0, lino = 0;
374,722,335✔
1005
  SArray* pBlockList = NULL;
374,722,335✔
1006
  SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId);
374,722,335✔
1007
  TSDB_CHECK_NULL(pResColData, code, lino, _exit, terrno);
374,750,277✔
1008

1009
  if (fmIsPlaceHolderFunc(pfCtx->functionId) && pExtraParams && pfCtx->pExpr->base.pParamList && 1 == pfCtx->pExpr->base.pParamList->length) {
374,750,277✔
1010
    SNode* pParamNode = nodesListGetNode(pfCtx->pExpr->base.pParamList, 0);
3,202,543✔
1011
    TAOS_CHECK_EXIT(scalarAssignPlaceHolderRes(pResColData, pResult->info.rows, pSrcBlock->info.rows, pfCtx->functionId, pExtraParams, pParamNode));
3,201,828✔
1012
    *numOfRows = pSrcBlock->info.rows;
3,202,543✔
1013

1014
    return code;
3,202,543✔
1015
  }
1016

1017
  if (fmIsScalarFunc(pfCtx->functionId) || fmIsPlaceHolderFunc(pfCtx->functionId)) {
371,541,755✔
1018
    pBlockList = taosArrayInit(4, POINTER_BYTES);
243,054,938✔
1019
    TSDB_CHECK_NULL(pBlockList, code, lino, _exit, terrno);
243,051,370✔
1020

1021
    void* px = taosArrayPush(pBlockList, &pSrcBlock);
243,027,646✔
1022
    TSDB_CHECK_NULL(px, code, lino, _exit, terrno);
243,027,646✔
1023

1024
    SColumnInfoData idata = {.info = pResColData->info, .hasNull = true};
243,027,646✔
1025
    SScalarParam dest = {.columnData = &idata};
243,006,986✔
1026
    gTaskScalarExtra.pStreamInfo = (void*)pExtraParams;
243,027,239✔
1027
    gTaskScalarExtra.pStreamRange = NULL;
243,027,239✔
1028
    TAOS_CHECK_EXIT(scalarCalculate((SNode*)pExpr->pExpr->_function.pFunctNode, pBlockList, &dest, &gTaskScalarExtra));
243,019,906✔
1029

1030
    if (pResult->info.rows > 0 && !createNewColModel) {
242,917,578✔
1031
      code = colDataMergeCol(pResColData, pResult->info.rows, (int32_t*)&pResult->info.capacity, &idata, dest.numOfRows);
17,993,976✔
1032
    } else {
1033
      SColumnInfo oriInfo = pResColData->info;
225,002,761✔
1034
      code = colDataAssign(pResColData, &idata, dest.numOfRows, &pResult->info);
225,014,202✔
1035
      // restore the original column info to satisfy the output column schema
1036
      pResColData->info = oriInfo;
225,025,853✔
1037
    }
1038

1039
    colDataDestroy(&idata);
243,022,945✔
1040
    taosArrayDestroy(pBlockList);
243,008,686✔
1041
    TAOS_CHECK_EXIT(code);
243,011,189✔
1042

1043
    *numOfRows = dest.numOfRows;
243,011,189✔
1044

1045
    return code;
243,013,248✔
1046
  }
1047

1048
  if (fmIsIndefiniteRowsFunc(pfCtx->functionId)) {
128,495,188✔
1049
    SResultRowEntryInfo* pResInfo = GET_RES_INFO(pfCtx);
48,144,364✔
1050
    TAOS_CHECK_EXIT(pfCtx->fpSet.init(pfCtx, pResInfo));
48,144,364✔
1051

1052

1053
    pfCtx->pOutput = (char*)pResColData;
48,144,737✔
1054
    TSDB_CHECK_NULL(pfCtx->pOutput, code, lino, _exit, terrno);
48,144,364✔
1055

1056
    pfCtx->offset = createNewColModel ? 0 : pResult->info.rows;  // set the start offset
48,144,745✔
1057

1058
    // set the timestamp(_rowts) output buffer
1059
    if (taosArrayGetSize(pPseudoList) > 0) {
48,144,745✔
UNCOV
1060
      int32_t* outputColIndex = taosArrayGet(pPseudoList, 0);
×
UNCOV
1061
      TSDB_CHECK_NULL(outputColIndex, code, lino, _exit, terrno);
×
1062

UNCOV
1063
      pfCtx->pTsOutput = (SColumnInfoData*)pCtx[*outputColIndex].pOutput;
×
1064
    }
1065

1066
    // link pDstBlock to set selectivity value
1067
    if (pfCtx->subsidiaries.num > 0) {
48,144,745✔
1068
      pfCtx->pDstBlock = pResult;
44,321,752✔
1069
    }
1070

1071
    code = pfCtx->fpSet.process(pfCtx);
48,145,126✔
1072
    if (code != TSDB_CODE_SUCCESS) {
48,145,499✔
1073
      if (pfCtx->fpSet.cleanup != NULL) {
9,684✔
UNCOV
1074
        pfCtx->fpSet.cleanup(pfCtx);
×
1075
      }
1076
      TAOS_CHECK_EXIT(code);
9,684✔
1077
    }
1078

1079
    *numOfRows = pResInfo->numOfRes;
48,135,815✔
1080
    
1081
    if (fmIsProcessByRowFunc(pfCtx->functionId)) {
48,135,815✔
1082
      if (NULL == *processByRowFunctionCtx) {
45,575,892✔
1083
        *processByRowFunctionCtx = taosArrayInit(1, sizeof(SqlFunctionCtx*));
45,506,129✔
1084
        TSDB_CHECK_NULL(*processByRowFunctionCtx, code, lino, _exit, terrno);
45,506,129✔
1085
      }
1086

1087
      void* px = taosArrayPush(*processByRowFunctionCtx, &pfCtx);
45,575,892✔
1088
      TSDB_CHECK_NULL(px, code, lino, _exit, terrno);
45,575,892✔
1089
    }
1090

1091
    return code;
48,135,815✔
1092
  } 
1093

1094
  if (fmIsAggFunc(pfCtx->functionId)) {
80,350,824✔
1095
    // selective value output should be set during corresponding function execution
1096
    if (!doSelectFunc && fmIsSelectValueFunc(pfCtx->functionId)) {
80,350,824✔
1097
      return code;
44,436,419✔
1098
    }
1099
    
1100
    // _group_key function for "partition by tbname" + csum(col_name) query
1101
    int32_t slotId = pfCtx->param[0].pCol->slotId;
35,914,405✔
1102

1103
    // todo handle the json tag
1104
    SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId);
35,914,405✔
1105
    TSDB_CHECK_NULL(pInput, code, lino, _exit, terrno);
35,914,405✔
1106

1107
    for (int32_t f = 0; f < pSrcBlock->info.rows; ++f) {
2,147,483,647✔
1108
      bool isNull = colDataIsNull_s(pInput, f);
2,147,483,647✔
1109
      if (isNull) {
2,147,483,647✔
1110
        colDataSetNULL(pResColData, pResult->info.rows + f);
252,135,580✔
1111
      } else {
1112
        char* data = colDataGetData(pInput, f);
2,147,483,647✔
1113
        TAOS_CHECK_EXIT(colDataSetVal(pResColData, pResult->info.rows + f, data, isNull));
2,147,483,647✔
1114
      }
1115
    }
1116

1117
    *numOfRows = pSrcBlock->info.rows;
35,914,405✔
1118

1119
    return code;
35,914,405✔
1120
  } 
1121
  
UNCOV
1122
  if (fmIsGroupIdFunc(pfCtx->functionId)) {
×
UNCOV
1123
    for (int32_t f = 0; f < pSrcBlock->info.rows; ++f) {
×
UNCOV
1124
      TAOS_CHECK_EXIT(colDataSetVal(pResColData, pResult->info.rows + f, (const char*)&pSrcBlock->info.id.groupId, false));
×
1125
    }
1126

UNCOV
1127
    *numOfRows = pSrcBlock->info.rows;
×
UNCOV
1128
    return code;
×
1129
  }
1130
  
UNCOV
1131
_exit:
×
1132

1133
  if (code) {
45,337✔
1134
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
45,337✔
1135
  }
1136

1137
  taosArrayDestroy(pBlockList);
45,337✔
1138
  
1139
  return code;
45,337✔
1140
}
1141

1142

1143
int32_t projectApplyFunctionsWithSelect(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock,
454,579,379✔
1144
                                        SqlFunctionCtx* pCtx, int32_t numOfOutput, SArray* pPseudoList,
1145
                                        const void* pExtraParams, bool doSelectFunc, bool hasIndefRowsFunc) {
1146
  int32_t lino = 0;
454,579,379✔
1147
  int32_t code = TSDB_CODE_SUCCESS;
454,579,379✔
1148
  if (hasIndefRowsFunc) {
454,579,379✔
1149
    setPseudoOutputColInfo(pResult, pCtx, pPseudoList);
444,507,717✔
1150
  }
1151
  pResult->info.dataLoad = 1;
454,612,046✔
1152

1153
  SArray* processByRowFunctionCtx = NULL;
454,665,678✔
1154
  SArray* pProcessedFuncIds = NULL;
454,535,533✔
1155
  SArray* pGroupedCtxArray = NULL;
454,535,533✔
1156
  if (pSrcBlock == NULL) {
454,535,533✔
UNCOV
1157
    for (int32_t k = 0; k < numOfOutput; ++k) {
×
UNCOV
1158
      int32_t outputSlotId = pExpr[k].base.resSchema.slotId;
×
1159

UNCOV
1160
      if (pExpr[k].pExpr->nodeType != QUERY_NODE_VALUE) {
×
UNCOV
1161
        qError("project failed at: %s:%d", __func__, __LINE__);
×
UNCOV
1162
        TAOS_CHECK_EXIT(TSDB_CODE_INVALID_PARA);
×
1163
      }
UNCOV
1164
      SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
×
UNCOV
1165
      TSDB_CHECK_NULL(pColInfoData, code, lino, _exit, terrno);
×
1166

UNCOV
1167
      int32_t type = pExpr[k].base.pParam[0].param.nType;
×
UNCOV
1168
      if (TSDB_DATA_TYPE_NULL == type) {
×
1169
        colDataSetNNULL(pColInfoData, 0, 1);
1170
      } else {
UNCOV
1171
        TAOS_CHECK_EXIT(colDataSetVal(pColInfoData, 0, taosVariantGet(&pExpr[k].base.pParam[0].param, type), false));
×
1172
      }
1173
    }
1174

UNCOV
1175
    pResult->info.rows = 1;
×
UNCOV
1176
    goto _exit;
×
1177
  }
1178

1179
  if (pResult != pSrcBlock) {
454,535,533✔
1180
    pResult->info.id.groupId = pSrcBlock->info.id.groupId;
350,112,436✔
1181
    if (pSrcBlock->info.parTbName[0]) {
350,184,235✔
UNCOV
1182
      tstrncpy(pResult->info.parTbName, pSrcBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
×
1183
    }
1184
    qTrace("%s, parName:%s,groupId:%" PRIu64, __FUNCTION__, pSrcBlock->info.parTbName, pResult->info.id.groupId);
350,157,203✔
1185
  }
1186

1187
  // if the source equals to the destination, it is to create a new column as the result of scalar
1188
  // function or some operators.
1189
  bool createNewColModel = (pResult == pSrcBlock);
454,504,562✔
1190
  if (createNewColModel) {
454,504,562✔
1191
    TAOS_CHECK_EXIT(blockDataEnsureCapacity(pResult, pResult->info.rows));
104,464,291✔
1192
  }
1193

1194
  int32_t numOfRows = 0;
454,503,181✔
1195

1196
  for (int32_t k = 0; k < numOfOutput; ++k) {
1,930,631,792✔
1197
    int32_t               outputSlotId = pExpr[k].base.resSchema.slotId;
1,477,213,586✔
1198
    SqlFunctionCtx*       pfCtx = &pCtx[k];
1,477,298,605✔
1199
    switch (pExpr[k].pExpr->nodeType) {
1,477,402,421✔
1200
      case QUERY_NODE_COLUMN: {
1,000,298,683✔
1201
        TAOS_CHECK_EXIT(projectApplyColumn(pResult, pSrcBlock, outputSlotId, pfCtx, &numOfRows, createNewColModel));
1,000,298,683✔
1202
        break;
1,000,273,366✔
1203
      } 
1204
      case QUERY_NODE_VALUE: {
17,967,764✔
1205
        TAOS_CHECK_EXIT(projectApplyValue(&pExpr[k], pResult, pSrcBlock, outputSlotId, &numOfRows, createNewColModel));
17,967,764✔
1206
        break;
17,972,345✔
1207
      } 
1208
      case QUERY_NODE_OPERATOR: {
84,412,121✔
1209
        TAOS_CHECK_EXIT(projectApplyOperator(&pExpr[k], pResult, pSrcBlock, outputSlotId, &numOfRows, createNewColModel, pExtraParams));
84,412,121✔
1210
        break;
83,210,949✔
1211
      } 
1212
      case QUERY_NODE_FUNCTION: {
374,738,729✔
1213
        TAOS_CHECK_EXIT(projectApplyFunction(pCtx, pfCtx, &pExpr[k], pResult, pSrcBlock, outputSlotId, &numOfRows, createNewColModel, pExtraParams, pPseudoList, &processByRowFunctionCtx, doSelectFunc));
374,738,729✔
1214
        break;
374,666,059✔
1215
      }
UNCOV
1216
      default: {
×
UNCOV
1217
        qError("invalid project expr nodeType:%d", pExpr[k].pExpr->nodeType);
×
1218
        TAOS_CHECK_EXIT(TSDB_CODE_OPS_NOT_SUPPORT);
1,579✔
1219
      }
1220
    }
1221
  }
1222

1223
  if (processByRowFunctionCtx && taosArrayGetSize(processByRowFunctionCtx) > 0) {
453,418,206✔
1224
    int32_t processByRowSize = taosArrayGetSize(processByRowFunctionCtx);
45,583,883✔
1225
    pProcessedFuncIds = taosArrayInit(4, sizeof(int32_t));
45,506,129✔
1226
    TSDB_CHECK_NULL(pProcessedFuncIds, code, lino, _exit, terrno);
45,506,129✔
1227

1228
    for (int32_t i = 0; i < processByRowSize; ++i) {
91,073,238✔
1229
      SqlFunctionCtx** ppCurrCtx = taosArrayGet(processByRowFunctionCtx, i);
45,575,453✔
1230
      TSDB_CHECK_NULL(ppCurrCtx, code, lino, _exit, terrno);
45,575,453✔
1231
      TSDB_CHECK_NULL(*ppCurrCtx, code, lino, _exit, terrno);
45,575,453✔
1232

1233
      bool    processed = false;
45,575,453✔
1234
      int32_t processedNum = taosArrayGetSize(pProcessedFuncIds);
45,575,453✔
1235
      for (int32_t j = 0; j < processedNum; ++j) {
45,591,035✔
1236
        int32_t* pFuncId = taosArrayGet(pProcessedFuncIds, j);
69,324✔
1237
        TSDB_CHECK_NULL(pFuncId, code, lino, _exit, terrno);
69,324✔
1238
        if (*pFuncId == (*ppCurrCtx)->functionId) {
69,324✔
1239
          processed = true;
53,742✔
1240
          break;
53,742✔
1241
        }
1242
      }
1243

1244
      if (processed) {
45,575,453✔
1245
        continue;
53,742✔
1246
      }
1247

1248
      pGroupedCtxArray = taosArrayInit(2, sizeof(SqlFunctionCtx*));
45,521,711✔
1249
      TSDB_CHECK_NULL(pGroupedCtxArray, code, lino, _exit, terrno);
45,521,711✔
1250

1251
      for (int32_t j = i; j < processByRowSize; ++j) {
91,113,584✔
1252
        SqlFunctionCtx** ppTmpCtx = taosArrayGet(processByRowFunctionCtx, j);
45,591,873✔
1253
        TSDB_CHECK_NULL(ppTmpCtx, code, lino, _exit, terrno);
45,591,873✔
1254
        TSDB_CHECK_NULL(*ppTmpCtx, code, lino, _exit, terrno);
45,591,873✔
1255

1256
        if ((*ppTmpCtx)->functionId == (*ppCurrCtx)->functionId) {
45,591,873✔
1257
          void* px = taosArrayPush(pGroupedCtxArray, ppTmpCtx);
45,575,892✔
1258
          TSDB_CHECK_NULL(px, code, lino, _exit, terrno);
45,575,892✔
1259
        }
1260
      }
1261

1262
      TAOS_CHECK_EXIT((*ppCurrCtx)->fpSet.processFuncByRow(pGroupedCtxArray));
45,521,711✔
1263
      taosArrayDestroy(pGroupedCtxArray);
45,513,367✔
1264
      pGroupedCtxArray = NULL;
45,513,367✔
1265

1266
      void* px = taosArrayPush(pProcessedFuncIds, &(*ppCurrCtx)->functionId);
45,513,367✔
1267
      TSDB_CHECK_NULL(px, code, lino, _exit, terrno);
45,513,367✔
1268

1269
      numOfRows = (*ppCurrCtx)->resultInfo->numOfRes;
45,513,367✔
1270
    }
1271

1272
    taosArrayDestroy(pProcessedFuncIds);
45,497,785✔
1273
    pProcessedFuncIds = NULL;
45,497,785✔
1274
  }
1275

1276
  if (!createNewColModel) {
453,332,108✔
1277
    pResult->info.rows += numOfRows;
349,013,672✔
1278
  }
1279

1280
_exit:
454,268,421✔
1281
  if (pGroupedCtxArray) {
454,675,547✔
1282
    taosArrayDestroy(pGroupedCtxArray);
8,344✔
1283
  }
1284
  if (pProcessedFuncIds) {
454,675,547✔
1285
    taosArrayDestroy(pProcessedFuncIds);
8,344✔
1286
  }
1287
  if (processByRowFunctionCtx) {
454,675,547✔
1288
    taosArrayDestroy(processByRowFunctionCtx);
45,506,129✔
1289
  }
1290
  if (code) {
454,675,547✔
1291
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
1,255,249✔
1292
  }
1293
  return code;
454,675,547✔
1294
}
1295

1296
int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
444,456,055✔
1297
                              int32_t numOfOutput, SArray* pPseudoList, const void* pExtraParams) {
1298
  return projectApplyFunctionsWithSelect(pExpr, pResult, pSrcBlock, pCtx, numOfOutput, pPseudoList, pExtraParams, false, true);
444,456,055✔
1299
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc