• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4982

12 Mar 2026 05:32AM UTC coverage: 68.587% (+0.1%) from 68.488%
#4982

push

travis-ci

web-flow
merge: from main to 3.0 branch #34705

merge: from main to 3.0 branch

279 of 443 new or added lines in 34 files covered. (62.98%)

2352 existing lines in 132 files now uncovered.

212163 of 309332 relevant lines covered (68.59%)

135254549.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.64
/source/libs/executor/src/projectoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "functionMgt.h"
19
#include "operator.h"
20
#include "querytask.h"
21
#include "taoserror.h"
22
#include "tdatablock.h"
23

24
typedef struct SProjectOperatorInfo {
25
  SOptrBasicInfo binfo;
26
  SAggSupporter  aggSup;
27
  SArray*        pPseudoColInfo;
28
  SLimitInfo     limitInfo;
29
  bool           mergeDataBlocks;
30
  SSDataBlock*   pFinalRes;
31
  bool           inputIgnoreGroup;
32
  bool           outputIgnoreGroup;
33
} SProjectOperatorInfo;
34

35
typedef struct SIndefOperatorInfo {
36
  SOptrBasicInfo binfo;
37
  SAggSupporter  aggSup;
38
  SArray*        pPseudoColInfo;
39
  SExprSupp      scalarSup;
40
  uint64_t       groupId;
41
  SSDataBlock*   pNextGroupRes;
42
} SIndefOperatorInfo;
43

44
static int32_t      doGenerateSourceData(SOperatorInfo* pOperator);
45
static int32_t      doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
46
static int32_t      doApplyIndefinitFunction(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
47
int32_t projectApplyOperator(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, int32_t outputSlotId, int32_t* numOfRows, bool createNewColModel, const void* pExtraParams);
48

49
static void destroyProjectOperatorInfo(void* param) {
140,404,060✔
50
  if (NULL == param) {
140,404,060✔
51
    return;
×
52
  }
53

54
  SProjectOperatorInfo* pInfo = (SProjectOperatorInfo*)param;
140,404,060✔
55
  cleanupBasicInfo(&pInfo->binfo);
140,404,060✔
56
  cleanupAggSup(&pInfo->aggSup);
140,414,975✔
57
  taosArrayDestroy(pInfo->pPseudoColInfo);
140,367,888✔
58

59
  blockDataDestroy(pInfo->pFinalRes);
140,363,931✔
60
  taosMemoryFreeClear(param);
140,377,286✔
61
}
62

63
static void destroyIndefinitOperatorInfo(void* param) {
3,013,507✔
64
  SIndefOperatorInfo* pInfo = (SIndefOperatorInfo*)param;
3,013,507✔
65
  if (pInfo == NULL) {
3,013,507✔
66
    return;
×
67
  }
68

69
  cleanupBasicInfo(&pInfo->binfo);
3,013,507✔
70
  taosArrayDestroy(pInfo->pPseudoColInfo);
3,012,634✔
71
  cleanupAggSup(&pInfo->aggSup);
3,012,265✔
72
  cleanupExprSupp(&pInfo->scalarSup);
3,012,630✔
73

74
  taosMemoryFreeClear(param);
3,012,999✔
75
}
76

77
void streamOperatorReleaseState(SOperatorInfo* pOperator) {
×
78
  SOperatorInfo* downstream = pOperator->pDownstream[0];
×
79
  if (downstream->fpSet.releaseStreamStateFn) {
×
80
    downstream->fpSet.releaseStreamStateFn(downstream);
×
81
  }
82
}
×
83

84
void streamOperatorReloadState(SOperatorInfo* pOperator) {
×
85
  SOperatorInfo* downstream = pOperator->pDownstream[0];
×
86
  if (downstream->fpSet.reloadStreamStateFn) {
×
87
    downstream->fpSet.reloadStreamStateFn(downstream);
×
88
  }
89
}
×
90

91
static int32_t resetProjectOperState(SOperatorInfo* pOper) {
8,019,865✔
92
  SProjectOperatorInfo* pProject = pOper->info;
8,019,865✔
93
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
8,019,865✔
94
  pOper->status = OP_NOT_OPENED;
8,019,865✔
95

96
  resetBasicOperatorState(&pProject->binfo);
8,019,865✔
97
  SProjectPhysiNode* pPhynode = (SProjectPhysiNode*)pOper->pPhyNode;
8,019,622✔
98

99
  pProject->limitInfo = (SLimitInfo){0};
8,019,622✔
100
  initLimitInfo(pPhynode->node.pLimit, pPhynode->node.pSlimit, &pProject->limitInfo);
8,019,865✔
101

102
  blockDataCleanup(pProject->pFinalRes);
8,019,655✔
103

104
  int32_t code = resetAggSup(&pOper->exprSupp, &pProject->aggSup, pTaskInfo, pPhynode->pProjections, NULL,
16,037,068✔
105
    sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, NULL,
8,019,655✔
106
    &pTaskInfo->storageAPI.functionStore);
107
  if (code == 0){
8,017,856✔
108
    code = setFunctionResultOutput(pOper, &pProject->binfo, &pProject->aggSup, MAIN_SCAN, pOper->exprSupp.numOfExprs);
8,018,463✔
109
  }
110
  return 0;
8,018,804✔
111
}
112

113
int32_t createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, SExecTaskInfo* pTaskInfo,
140,227,396✔
114
                                  SOperatorInfo** pOptrInfo) {
115
  QRY_PARAM_CHECK(pOptrInfo);
140,227,396✔
116

117
  int32_t code = TSDB_CODE_SUCCESS;
140,300,958✔
118
  SProjectOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SProjectOperatorInfo));
140,300,958✔
119
  SOperatorInfo*        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
139,660,472✔
120
  if (pInfo == NULL || pOperator == NULL) {
139,887,698✔
121
    code = terrno;
2,759✔
122
    goto _error;
×
123
  }
124

125
  pOperator->pPhyNode = pProjPhyNode;
139,884,946✔
126
  pOperator->exprSupp.hasWindowOrGroup = false;
139,889,951✔
127
  pOperator->pTaskInfo = pTaskInfo;
139,935,285✔
128

129
  int32_t    lino = 0;
140,028,340✔
130

131
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pProjPhyNode->node.pOutputDataBlockDesc);
140,028,340✔
132
  TSDB_CHECK_NULL(pResBlock, code, lino, _error, terrno);
140,367,942✔
133

134
  initLimitInfo(pProjPhyNode->node.pLimit, pProjPhyNode->node.pSlimit, &pInfo->limitInfo);
140,367,942✔
135

136
  pInfo->binfo.pRes = pResBlock;
140,395,044✔
137
  pInfo->pFinalRes = NULL;
140,360,970✔
138

139
  code = createOneDataBlock(pResBlock, false, &pInfo->pFinalRes);
140,359,972✔
140
  TSDB_CHECK_CODE(code, lino, _error);
140,225,925✔
141

142
  pInfo->binfo.inputTsOrder = pProjPhyNode->node.inputTsOrder;
140,225,925✔
143
  pInfo->binfo.outputTsOrder = pProjPhyNode->node.outputTsOrder;
140,215,224✔
144
  pInfo->inputIgnoreGroup = pProjPhyNode->inputIgnoreGroup;
140,074,652✔
145
  pInfo->outputIgnoreGroup = pProjPhyNode->ignoreGroupId;
140,155,861✔
146

147
  if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {
140,129,956✔
148
    pInfo->mergeDataBlocks = false;
321,936✔
149
  } else {
150
    if (!pProjPhyNode->ignoreGroupId) {
139,709,292✔
151
      pInfo->mergeDataBlocks = false;
2,038,381✔
152
    } else {
153
      pInfo->mergeDataBlocks = pProjPhyNode->mergeDataBlock;
137,658,380✔
154
    }
155
  }
156

157
  int32_t numOfRows = 4096;
140,138,544✔
158
  size_t  keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
140,138,544✔
159

160
  // Make sure the size of SSDataBlock will never exceed the size of 2MB.
161
  int32_t TWOMB = 2 * 1024 * 1024;
140,138,544✔
162
  if (numOfRows * pResBlock->info.rowSize > TWOMB) {
140,138,544✔
163
    numOfRows = TWOMB / pResBlock->info.rowSize;
4,494,974✔
164
  }
165

166
  initResultSizeInfo(&pOperator->resultInfo, numOfRows);
140,130,385✔
167
  
168
  int32_t    numOfCols = 0;
140,064,133✔
169
  SExprInfo* pExprInfo = NULL;
140,084,719✔
170
  code = createExprInfo(pProjPhyNode->pProjections, NULL, &pExprInfo, &numOfCols);
140,123,274✔
171
  TSDB_CHECK_CODE(code, lino, _error);
140,173,302✔
172
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
139,903,023✔
173
                    NULL, &pTaskInfo->storageAPI.functionStore);
174
  TSDB_CHECK_CODE(code, lino, _error);
139,911,603✔
175

176
  initBasicInfo(&pInfo->binfo, pResBlock);
139,911,603✔
177
  code = setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfCols);
139,751,839✔
178
  TSDB_CHECK_CODE(code, lino, _error);
139,713,822✔
179

180
  code = filterInitFromNode((SNode*)pProjPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
139,735,993✔
181
                            pTaskInfo->pStreamRuntimeInfo);
139,713,822✔
182
  TSDB_CHECK_CODE(code, lino, _error);
139,823,505✔
183

184
  code = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfCols, &pInfo->pPseudoColInfo);
139,823,505✔
185
  TSDB_CHECK_CODE(code, lino, _error);
139,692,402✔
186

187
  setOperatorInfo(pOperator, "ProjectOperator", QUERY_NODE_PHYSICAL_PLAN_PROJECT, false, OP_NOT_OPENED, pInfo,
139,692,402✔
188
                  pTaskInfo);
189
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doProjectOperation, NULL, destroyProjectOperatorInfo,
139,888,071✔
190
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
191
  setOperatorStreamStateFn(pOperator, streamOperatorReleaseState, streamOperatorReloadState);
139,870,714✔
192
  setOperatorResetStateFn(pOperator, resetProjectOperState);
139,809,062✔
193

194
  if (NULL != downstream) {
139,861,294✔
195
    code = appendDownstream(pOperator, &downstream, 1);
138,122,079✔
196
    if (code != TSDB_CODE_SUCCESS) {
137,875,125✔
197
      goto _error;
×
198
    }
199
  }
200

201
  *pOptrInfo = pOperator;
139,614,340✔
202
  return TSDB_CODE_SUCCESS;
139,707,959✔
203

204
_error:
270,279✔
205
  if (pInfo != NULL) destroyProjectOperatorInfo(pInfo);
270,279✔
206
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
270,279✔
207
  pTaskInfo->code = code;
270,279✔
208
  return code;
270,279✔
209
}
210

211
static int32_t discardGroupDataBlock(SSDataBlock* pBlock, SLimitInfo* pLimitInfo) {
289,323,235✔
212
  if (pLimitInfo->remainGroupOffset > 0) {
289,323,235✔
213
    // it is the first group
214
    if (pLimitInfo->currentGroupId == 0 || pLimitInfo->currentGroupId == pBlock->info.id.groupId) {
959,000✔
215
      pLimitInfo->currentGroupId = pBlock->info.id.groupId;
249,470✔
216
      return PROJECT_RETRIEVE_CONTINUE;
249,470✔
217
    } else if (pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
709,530✔
218
      // now it is the data from a new group
219
      pLimitInfo->remainGroupOffset -= 1;
709,530✔
220
      pLimitInfo->currentGroupId = pBlock->info.id.groupId;
709,530✔
221

222
      // ignore data block in current group
223
      if (pLimitInfo->remainGroupOffset > 0) {
709,530✔
224
        return PROJECT_RETRIEVE_CONTINUE;
667,609✔
225
      }
226

227
      pLimitInfo->currentGroupId = 0;
41,921✔
228
    }
229
  }
230

231
  return PROJECT_RETRIEVE_DONE;
288,326,028✔
232
}
233

234
static int32_t setInfoForNewGroup(SSDataBlock* pBlock, SLimitInfo* pLimitInfo, SOperatorInfo* pOperator) {
288,339,826✔
235
  // remainGroupOffset == 0
236
  // here check for a new group data, we need to handle the data of the previous group.
237
  if (!(pLimitInfo->remainGroupOffset == 0 || pLimitInfo->remainGroupOffset == -1)) {
288,339,826✔
238
    qError("project failed at: %s:%d", __func__, __LINE__);
×
239
    return TSDB_CODE_INVALID_PARA;
×
240
  }
241

242
  bool newGroup = false;
288,684,903✔
243
  if (0 == pBlock->info.id.groupId) {
288,684,903✔
244
    pLimitInfo->numOfOutputGroups = 1;
252,694,280✔
245
  } else if (pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
35,734,806✔
246
    pLimitInfo->currentGroupId = pBlock->info.id.groupId;
30,124,771✔
247
    pLimitInfo->numOfOutputGroups += 1;
30,120,542✔
248
    newGroup = true;
30,121,708✔
249
  } else {
250
    return PROJECT_RETRIEVE_CONTINUE;
5,630,251✔
251
  }
252

253
  if ((pLimitInfo->slimit.limit >= 0) && (pLimitInfo->slimit.limit < pLimitInfo->numOfOutputGroups)) {
282,786,035✔
254
    setOperatorCompleted(pOperator);
137,528✔
255
    return PROJECT_RETRIEVE_DONE;
137,528✔
256
  }
257

258
  // reset the value for a new group data
259
  // existing rows that belongs to previous group.
260
  if (newGroup) {
282,807,383✔
261
    resetLimitInfoForNextGroup(pLimitInfo);
29,957,187✔
262
  }
263

264
  return PROJECT_RETRIEVE_CONTINUE;
282,283,556✔
265
}
266

267
// todo refactor
268
static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SSDataBlock* pBlock,
289,183,530✔
269
                                    SOperatorInfo* pOperator) {
270
  // set current group id
271
  pLimitInfo->currentGroupId = groupId;
289,183,530✔
272
  bool limitReached = applyLimitOffset(pLimitInfo, pBlock, pOperator->pTaskInfo);
289,382,239✔
273
  if (pBlock->info.rows == 0 && 0 != pLimitInfo->limit.limit) {
289,292,502✔
274
    return PROJECT_RETRIEVE_CONTINUE;
6,176,864✔
275
  } else {
276
    if (limitReached && (pLimitInfo->slimit.limit >= 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
283,121,894✔
277
      setOperatorCompleted(pOperator);
78,625✔
278
    } else if (limitReached && groupId == 0) {
283,043,772✔
279
      setOperatorCompleted(pOperator);
7,063,446✔
280
    }
281
  }
282

283
  return PROJECT_RETRIEVE_DONE;
282,791,675✔
284
}
285

286
int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
396,344,898✔
287
  QRY_PARAM_CHECK(pResBlock);
396,344,898✔
288

289
  SProjectOperatorInfo* pProjectInfo = pOperator->info;
396,428,718✔
290
  SOptrBasicInfo*       pInfo = &pProjectInfo->binfo;
396,328,038✔
291
  SExprSupp*            pSup = &pOperator->exprSupp;
396,437,319✔
292
  SSDataBlock*          pRes = pInfo->pRes;
396,415,240✔
293
  SSDataBlock*          pFinalRes = pProjectInfo->pFinalRes;
396,354,634✔
294
  int32_t               code = 0;
396,418,129✔
295
  int32_t               lino = 0;
396,418,129✔
296
  int64_t               st = 0;
396,418,129✔
297
  int32_t               order = pInfo->inputTsOrder;
396,418,129✔
298
  int32_t               scanFlag = 0;
396,413,809✔
299

300
  blockDataCleanup(pFinalRes);
396,413,809✔
301
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
396,254,270✔
302

303
  if (pOperator->status == OP_EXEC_DONE) {
396,300,221✔
304
    return code;
79,452,902✔
305
  }
306

307
  if (pOperator->cost.openCost == 0) {
316,999,726✔
308
    st = taosGetTimestampUs();
139,984,936✔
309
  }
310

311
  SOperatorInfo* downstream = pOperator->numOfDownstream > 0 ? pOperator->pDownstream[0] : NULL;
317,031,328✔
312
  SLimitInfo*    pLimitInfo = &pProjectInfo->limitInfo;
316,937,198✔
313

314
  if (downstream == NULL) {
316,867,946✔
315
    code = doGenerateSourceData(pOperator);
1,792,913✔
316
    QUERY_CHECK_CODE(code, lino, _end);
1,792,913✔
317

318
    if (pProjectInfo->outputIgnoreGroup) {
1,792,913✔
319
      pRes->info.id.groupId = 0;
1,792,913✔
320
    }
321

322
    *pResBlock = (pRes->info.rows > 0)? pRes:NULL;
1,792,913✔
323
    return code;
1,792,913✔
324
  }
325

326
  while (1) {
115,426,783✔
327
    while (1) {
7,093,943✔
328
      blockDataCleanup(pRes);
437,595,759✔
329

330
      // The downstream exec may change the value of the newgroup, so use a local variable instead.
331
      SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
437,624,829✔
332
      if (pBlock == NULL) {
433,956,596✔
333
        qDebug("set op close, exec %d, status %d rows %" PRId64 , pTaskInfo->execModel, pOperator->status, pFinalRes->info.rows);
144,860,049✔
334
        setOperatorCompleted(pOperator);
144,873,750✔
335
        break;
144,872,186✔
336
      }
337
//      if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {
338
//        qDebug("set status recv");
339
//        pOperator->status = OP_EXEC_RECV;
340
//      }
341

342
      if (pProjectInfo->inputIgnoreGroup) {
289,096,547✔
343
        pBlock->info.id.groupId = 0;
4,946,350✔
344
      }
345

346
      int32_t status = discardGroupDataBlock(pBlock, pLimitInfo);
289,189,299✔
347
      if (status == PROJECT_RETRIEVE_CONTINUE) {
289,290,101✔
348
        continue;
917,079✔
349
      }
350

351
      (void) setInfoForNewGroup(pBlock, pLimitInfo, pOperator);
288,373,022✔
352
      if (pOperator->status == OP_EXEC_DONE) {
288,110,779✔
353
        break;
137,528✔
354
      }
355

356
      if (pProjectInfo->mergeDataBlocks) {
288,492,971✔
357
        pFinalRes->info.scanFlag = scanFlag = pBlock->info.scanFlag;
125,982,598✔
358
      } else {
359
        pRes->info.scanFlag = scanFlag = pBlock->info.scanFlag;
162,517,674✔
360
      }
361

362
      code = setInputDataBlock(pSup, pBlock, order, scanFlag, false);
288,335,188✔
363
      QUERY_CHECK_CODE(code, lino, _end);
288,460,372✔
364

365
      code = blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
288,460,372✔
366
      QUERY_CHECK_CODE(code, lino, _end);
288,482,208✔
367

368
      code = projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs,
288,433,873✔
369
                                   pProjectInfo->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo));
288,482,208✔
370
      QUERY_CHECK_CODE(code, lino, _end);
288,288,465✔
371

372
      status = doIngroupLimitOffset(pLimitInfo, pBlock->info.id.groupId, pInfo->pRes, pOperator);
287,214,339✔
373
      if (status == PROJECT_RETRIEVE_CONTINUE) {
287,283,067✔
374
        continue;
6,176,864✔
375
      }
376

377
      break;
281,106,203✔
378
    }
379

380
    if (pProjectInfo->mergeDataBlocks) {
426,115,917✔
381
      if (pRes->info.rows > 0) {
218,343,869✔
382
        pFinalRes->info.id.groupId = 0;  // clear groupId
123,223,163✔
383
        pFinalRes->info.version = pRes->info.version;
123,223,163✔
384

385
        // continue merge data, ignore the group id
386
        code = blockDataMerge(pFinalRes, pRes);
123,223,103✔
387
        QUERY_CHECK_CODE(code, lino, _end);
123,223,862✔
388

389
        if (pFinalRes->info.rows + pRes->info.rows <= pOperator->resultInfo.threshold && (pOperator->status != OP_EXEC_DONE)) {
123,223,862✔
390
          continue;
115,391,409✔
391
        }
392
      }
393

394
      // do apply filter
395
      code = doFilter(pFinalRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
102,953,360✔
396
      QUERY_CHECK_CODE(code, lino, _end);
102,953,161✔
397

398
      // when apply the limit/offset for each group, pRes->info.rows may be 0, due to limit constraint.
399
      if (pFinalRes->info.rows > 0 || (pOperator->status == OP_EXEC_DONE)) {
102,953,161✔
400
        qDebug("project return %" PRId64 " rows, status %d", pFinalRes->info.rows, pOperator->status);
102,917,787✔
401
        break;
102,917,785✔
402
      }
403
    } else {
404
      // do apply filter
405
      if (pRes->info.rows > 0) {
207,817,836✔
406
        code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
157,789,211✔
407
        QUERY_CHECK_CODE(code, lino, _end);
157,781,991✔
408

409
        if (pRes->info.rows == 0) {
157,781,991✔
UNCOV
410
          continue;
×
411
        }
412
      }
413

414
      // no results generated
415
      break;
207,845,001✔
416
    }
417
  }
418

419
  SSDataBlock* p = pProjectInfo->mergeDataBlocks ? pFinalRes : pRes;
310,762,786✔
420
  pOperator->resultInfo.totalRows += p->info.rows;
310,694,132✔
421
  p->info.dataLoad = 1;
310,701,616✔
422

423
  if (pOperator->cost.openCost == 0) {
310,812,273✔
424
    pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
136,149,889✔
425
  }
426

427
  if (pProjectInfo->outputIgnoreGroup) {
310,699,822✔
428
    p->info.id.groupId = 0;
294,746,407✔
429
  }
430

431
  if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
310,258,348✔
432
    printDataBlock(p, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo), pTaskInfo->id.queryId);
5,525,699✔
433
  }
434

435
  *pResBlock = (p->info.rows > 0)? p:NULL;
310,695,808✔
436

437
_end:
311,677,166✔
438
  if (code != TSDB_CODE_SUCCESS) {
311,677,166✔
439
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
1,074,126✔
440
    pTaskInfo->code = code;
1,074,126✔
441
    T_LONG_JMP(pTaskInfo->env, code);
1,074,126✔
442
  }
443
  return code;
310,603,040✔
444
}
445

UNCOV
446
static int32_t resetIndefinitOutputOperState(SOperatorInfo* pOper) {
×
UNCOV
447
  SIndefOperatorInfo* pInfo = pOper->info;
×
UNCOV
448
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
×
UNCOV
449
  SIndefRowsFuncPhysiNode* pPhynode = (SIndefRowsFuncPhysiNode*)pOper->pPhyNode;
×
UNCOV
450
  pOper->status = OP_NOT_OPENED;
×
451

UNCOV
452
  resetBasicOperatorState(&pInfo->binfo);
×
453

UNCOV
454
  pInfo->groupId = 0;
×
455
  pInfo->pNextGroupRes = NULL;
×
456
  int32_t code = resetAggSup(&pOper->exprSupp, &pInfo->aggSup, pTaskInfo, pPhynode->pFuncs, NULL,
×
457
    sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, NULL,
×
458
    &pTaskInfo->storageAPI.functionStore);
459
  if (code == 0){
×
UNCOV
460
    code = setFunctionResultOutput(pOper, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, pOper->exprSupp.numOfExprs);
×
461
  }
462

463
  if (code == 0) {
×
464
    code = resetExprSupp(&pInfo->scalarSup, pTaskInfo, pPhynode->pExprs, NULL,
×
465
                         &pTaskInfo->storageAPI.functionStore);
466
  }
UNCOV
467
  return 0;
×
468
}
469

470
int32_t createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode,
3,012,634✔
471
                                                 SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
472
  QRY_PARAM_CHECK(pOptrInfo);
3,012,634✔
473
  int32_t code = 0;
3,011,253✔
474
  int32_t lino = 0;
3,011,253✔
475
  int32_t numOfRows = 4096;
3,011,253✔
476
  size_t  keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
3,011,253✔
477

478
  SIndefOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIndefOperatorInfo));
3,011,253✔
479
  SOperatorInfo*      pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
3,008,555✔
480
  if (pInfo == NULL || pOperator == NULL) {
3,010,745✔
UNCOV
481
    code = terrno;
×
UNCOV
482
    goto _error;
×
483
  }
484

485
  pOperator->pPhyNode = pNode;
3,010,745✔
486
  pOperator->pTaskInfo = pTaskInfo;
3,010,745✔
487

488
  SExprSupp* pSup = &pOperator->exprSupp;
3,010,745✔
489
  pSup->hasWindowOrGroup = false;
3,012,126✔
490

491
  SIndefRowsFuncPhysiNode* pPhyNode = (SIndefRowsFuncPhysiNode*)pNode;
3,010,015✔
492

493
  if (pPhyNode->pExprs != NULL) {
3,010,015✔
494
    int32_t    num = 0;
22,156✔
495
    SExprInfo* pSExpr = NULL;
22,156✔
496
    code = createExprInfo(pPhyNode->pExprs, NULL, &pSExpr, &num);
22,156✔
497
    QUERY_CHECK_CODE(code, lino, _error);
22,156✔
498

499
    code = initExprSupp(&pInfo->scalarSup, pSExpr, num, &pTaskInfo->storageAPI.functionStore);
22,156✔
500
    if (code != TSDB_CODE_SUCCESS) {
22,156✔
UNCOV
501
      goto _error;
×
502
    }
503
  }
504

505
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->node.pOutputDataBlockDesc);
3,011,761✔
506
  TSDB_CHECK_NULL(pResBlock, code, lino, _error, terrno);
3,012,634✔
507

508
  // Make sure the size of SSDataBlock will never exceed the size of 2MB.
509
  int32_t TWOMB = 2 * 1024 * 1024;
3,012,634✔
510
  if (numOfRows * pResBlock->info.rowSize > TWOMB) {
3,012,634✔
UNCOV
511
    numOfRows = TWOMB / pResBlock->info.rowSize;
×
512
  }
513

514
  initBasicInfo(&pInfo->binfo, pResBlock);
3,012,269✔
515
  initResultSizeInfo(&pOperator->resultInfo, numOfRows);
3,010,517✔
516
  code = blockDataEnsureCapacity(pResBlock, numOfRows);
3,012,634✔
517
  TSDB_CHECK_CODE(code, lino, _error);
3,012,047✔
518

519
  int32_t    numOfExpr = 0;
3,012,047✔
520
  SExprInfo* pExprInfo = NULL;
3,012,047✔
521
  code = createExprInfo(pPhyNode->pFuncs, NULL, &pExprInfo, &numOfExpr);
3,011,682✔
522
  TSDB_CHECK_CODE(code, lino, _error);
3,011,031✔
523

524
  code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str,
3,011,031✔
525
                            NULL, &pTaskInfo->storageAPI.functionStore);
526
  TSDB_CHECK_CODE(code, lino, _error);
3,010,523✔
527

528
  code = setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr);
3,010,523✔
529
  TSDB_CHECK_CODE(code, lino, _error);
3,008,259✔
530

531
  code = filterInitFromNode((SNode*)pPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
3,008,052✔
532
                            pTaskInfo->pStreamRuntimeInfo);
3,008,259✔
533
  TSDB_CHECK_CODE(code, lino, _error);
3,008,190✔
534

535
  pInfo->binfo.pRes = pResBlock;
3,008,190✔
536
  pInfo->binfo.inputTsOrder = pNode->inputTsOrder;
3,007,460✔
537
  pInfo->binfo.outputTsOrder = pNode->outputTsOrder;
3,004,703✔
538
  code = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr, &pInfo->pPseudoColInfo);
3,009,793✔
539
  TSDB_CHECK_CODE(code, lino, _error);
3,009,285✔
540

541
  setOperatorInfo(pOperator, "IndefinitOperator", QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC, false, OP_NOT_OPENED, pInfo,
3,009,285✔
542
                  pTaskInfo);
543
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doApplyIndefinitFunction, NULL, destroyIndefinitOperatorInfo,
3,010,666✔
544
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
545
                                         
546
  setOperatorResetStateFn(pOperator, resetIndefinitOutputOperState);
3,010,020✔
547
  code = appendDownstream(pOperator, &downstream, 1);
3,010,528✔
548
  if (code != TSDB_CODE_SUCCESS) {
3,009,433✔
UNCOV
549
    goto _error;
×
550
  }
551

552
  *pOptrInfo = pOperator;
3,009,433✔
553
  return TSDB_CODE_SUCCESS;
3,008,565✔
554

UNCOV
555
_error:
×
UNCOV
556
  if (pInfo != NULL) destroyIndefinitOperatorInfo(pInfo);
×
UNCOV
557
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
558
  pTaskInfo->code = code;
×
UNCOV
559
  return code;
×
560
}
561

562
static void doHandleDataBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, SOperatorInfo* downstream,
47,322,281✔
563
                              SExecTaskInfo* pTaskInfo) {
564
  SIndefOperatorInfo* pIndefInfo = pOperator->info;
47,322,281✔
565
  SOptrBasicInfo*     pInfo = &pIndefInfo->binfo;
47,322,281✔
566
  SExprSupp*          pSup = &pOperator->exprSupp;
47,322,281✔
567

568
  int32_t order = pInfo->inputTsOrder;
47,322,281✔
569
  int32_t scanFlag = pBlock->info.scanFlag;
47,322,281✔
570
  int32_t code = TSDB_CODE_SUCCESS;
47,322,281✔
571

572
  // there is an scalar expression that needs to be calculated before apply the group aggregation.
573
  SExprSupp* pScalarSup = &pIndefInfo->scalarSup;
47,322,281✔
574
  if (pScalarSup->pExprInfo != NULL) {
47,322,281✔
575
    code = projectApplyFunctions(pScalarSup->pExprInfo, pBlock, pBlock, pScalarSup->pCtx, pScalarSup->numOfExprs,
67,032✔
576
                                 pIndefInfo->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo));
67,032✔
577
    if (code != TSDB_CODE_SUCCESS) {
67,032✔
UNCOV
578
      T_LONG_JMP(pTaskInfo->env, code);
×
579
    }
580
  }
581

582
  code = setInputDataBlock(pSup, pBlock, order, scanFlag, false);
47,322,281✔
583
  if (code) {
47,322,281✔
UNCOV
584
    T_LONG_JMP(pTaskInfo->env, code);
×
585
  }
586

587
  code = blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
47,322,281✔
588
  if (code != TSDB_CODE_SUCCESS) {
47,321,910✔
UNCOV
589
    T_LONG_JMP(pTaskInfo->env, code);
×
590
  }
591

592
  code = projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs,
47,321,408✔
593
                               pIndefInfo->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo));
47,321,910✔
594
  if (code != TSDB_CODE_SUCCESS) {
47,322,281✔
595
    T_LONG_JMP(pTaskInfo->env, code);
17,480✔
596
  }
597
}
47,304,801✔
598

599
int32_t doApplyIndefinitFunction(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
10,395,236✔
600
  QRY_PARAM_CHECK(pResBlock);
10,395,236✔
601
  SIndefOperatorInfo* pIndefInfo = pOperator->info;
10,397,268✔
602
  SOptrBasicInfo*     pInfo = &pIndefInfo->binfo;
10,396,390✔
603
  SExprSupp*          pSup = &pOperator->exprSupp;
10,396,903✔
604
  int64_t             st = 0;
10,396,316✔
605
  int32_t             code = TSDB_CODE_SUCCESS;
10,396,316✔
606
  int32_t             lino = 0;
10,396,316✔
607
  SSDataBlock*        pRes = pInfo->pRes;
10,396,316✔
608

609
  blockDataCleanup(pRes);
10,398,279✔
610

611
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
10,401,485✔
612
  if (pOperator->status == OP_EXEC_DONE) {
10,401,993✔
613
    return code;
1,953,425✔
614
  }
615

616
  if (pOperator->cost.openCost == 0) {
8,445,870✔
617
    st = taosGetTimestampUs();
3,004,565✔
618
  }
619

620
  SOperatorInfo* downstream = pOperator->pDownstream[0];
8,444,213✔
621

622
  while (1) {
2,808,427✔
623
    // here we need to handle the existsed group results
624
    if (pIndefInfo->pNextGroupRes != NULL) {  // todo extract method
11,254,243✔
625
      for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
20,746,177✔
626
        SqlFunctionCtx* pCtx = &pSup->pCtx[k];
15,390,280✔
627

628
        SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
15,390,280✔
629
        pResInfo->initialized = false;
15,390,280✔
630
        pCtx->pOutput = NULL;
15,390,280✔
631
      }
632

633
      doHandleDataBlock(pOperator, pIndefInfo->pNextGroupRes, downstream, pTaskInfo);
5,355,897✔
634
      pIndefInfo->pNextGroupRes = NULL;
5,355,897✔
635
    }
636

637
    if (pInfo->pRes->info.rows < pOperator->resultInfo.threshold) {
11,255,476✔
638
      while (1) {
39,031,867✔
639
        // The downstream exec may change the value of the newgroup, so use a local variable instead.
640
        SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
50,286,391✔
641
        if (pBlock == NULL) {
50,291,703✔
642
          setOperatorCompleted(pOperator);
2,950,753✔
643
          break;
2,951,118✔
644
        }
645
        pInfo->pRes->info.scanFlag = pBlock->info.scanFlag;
47,340,950✔
646

647
        if (pIndefInfo->groupId == 0 && pBlock->info.id.groupId != 0) {
47,340,950✔
648
          pIndefInfo->groupId = pBlock->info.id.groupId;  // this is the initial group result
388,179✔
649
        } else {
650
          if (pIndefInfo->groupId != pBlock->info.id.groupId) {  // reset output buffer and computing status
46,952,771✔
651
            pIndefInfo->groupId = pBlock->info.id.groupId;
5,374,566✔
652
            pIndefInfo->pNextGroupRes = pBlock;
5,374,566✔
653
            break;
5,374,566✔
654
          }
655
        }
656

657
        doHandleDataBlock(pOperator, pBlock, downstream, pTaskInfo);
41,966,384✔
658
        if (pInfo->pRes->info.rows >= pOperator->resultInfo.threshold) {
41,948,904✔
659
          break;
2,917,037✔
660
        }
661
      }
662
    }
663

664
    code = doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
11,240,240✔
665
    QUERY_CHECK_CODE(code, lino, _end);
11,242,721✔
666

667
    size_t rows = pInfo->pRes->info.rows;
11,242,721✔
668
    if (rows > 0 || pOperator->status == OP_EXEC_DONE) {
11,242,721✔
669
      break;
670
    } else {
671
      blockDataCleanup(pInfo->pRes);
2,808,427✔
672
    }
673
  }
674

675
  size_t rows = pInfo->pRes->info.rows;
8,434,294✔
676
  pOperator->resultInfo.totalRows += rows;
8,434,294✔
677

678
  if (pOperator->cost.openCost == 0) {
8,434,294✔
679
    pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
2,995,662✔
680
  }
681

682
  *pResBlock = (rows > 0) ? pInfo->pRes : NULL;
8,434,294✔
683

684
_end:
8,434,294✔
685
  if (code != TSDB_CODE_SUCCESS) {
8,434,294✔
UNCOV
686
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
687
    pTaskInfo->code = code;
×
UNCOV
688
    T_LONG_JMP(pTaskInfo->env, code);
×
689
  }
690
  return code;
8,434,294✔
691
}
692

693
int32_t initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size) {
150,809,527✔
694
  int32_t code = TSDB_CODE_SUCCESS;
150,809,527✔
695
  for (int32_t j = 0; j < size; ++j) {
712,642,436✔
696
    struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(&pCtx[j]);
562,083,589✔
697
    if (isRowEntryInitialized(pResInfo) || fmIsPseudoColumnFunc(pCtx[j].functionId) || pCtx[j].functionId == -1 ||
610,264,790✔
698
        fmIsScalarFunc(pCtx[j].functionId)) {
48,292,845✔
699
      continue;
558,440,348✔
700
    }
701

702
    code = pCtx[j].fpSet.init(&pCtx[j], pCtx[j].resultInfo);
3,616,151✔
703
    if (code) {
3,392,720✔
UNCOV
704
      return code;
×
705
    }
706
  }
707

708
  return 0;
150,558,847✔
709
}
710

711
/*
712
 * The start of each column SResultRowEntryInfo is denote by RowCellInfoOffset.
713
 * Note that in case of top/bottom query, the whole multiple rows of result is treated as only one row of results.
714
 * +------------+-----------------result column 1------------+------------------result column 2-----------+
715
 * | SResultRow | SResultRowEntryInfo | intermediate buffer1 | SResultRowEntryInfo | intermediate buffer 2|
716
 * +------------+--------------------------------------------+--------------------------------------------+
717
 *           offset[0]                                  offset[1]                                   offset[2]
718
 */
719
// TODO refactor: some function move away
720
int32_t setFunctionResultOutput(struct SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t stage,
151,026,161✔
721
                             int32_t numOfExprs) {
722
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
151,026,161✔
723
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
151,106,315✔
724
  int32_t*        rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset;
151,050,579✔
725

726
  SResultRowInfo* pResultRowInfo = &pInfo->resultRowInfo;
150,784,069✔
727
  initResultRowInfo(pResultRowInfo);
150,884,097✔
728

729
  int64_t     tid = 0;
150,785,903✔
730
  int64_t     groupId = 0;
150,795,235✔
731
  SResultRow* pRow = doSetResultOutBufByKey(pSup->pResultBuf, pResultRowInfo, (char*)&tid, sizeof(tid), true, groupId,
150,795,235✔
732
                                            pTaskInfo, false, pSup, true);
733
  if (pRow == NULL || pTaskInfo->code != 0) {
150,831,524✔
734
    return pTaskInfo->code;
85,716✔
735
  }
736

737
  for (int32_t i = 0; i < numOfExprs; ++i) {
713,022,214✔
738
    struct SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, rowEntryInfoOffset);
562,196,091✔
739
    cleanupResultRowEntry(pEntry);
561,902,322✔
740

741
    pCtx[i].resultInfo = pEntry;
561,931,735✔
742
    pCtx[i].scanFlag = stage;
561,959,106✔
743
  }
744

745
  return initCtxOutputBuffer(pCtx, numOfExprs);
150,826,123✔
746
}
747

748
int32_t setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols, SArray** pResList) {
142,973,304✔
749
  QRY_PARAM_CHECK(pResList);
142,973,304✔
750
  SArray* pList = taosArrayInit(4, sizeof(int32_t));
142,971,068✔
751
  if (pList == NULL) {
143,008,440✔
UNCOV
752
    return terrno;
×
753
  }
754

755
  for (int32_t i = 0; i < numOfCols; ++i) {
673,574,139✔
756
    if (fmIsPseudoColumnFunc(pCtx[i].functionId) && !fmIsPlaceHolderFunc(pCtx[i].functionId)) {
530,884,818✔
UNCOV
757
      void* px = taosArrayPush(pList, &i);
×
UNCOV
758
      if (px == NULL) {
×
UNCOV
759
        return terrno;
×
760
      }
761
    }
762
  }
763

764
  *pResList = pList;
142,709,238✔
765
  return 0;
142,745,019✔
766
}
767

768
int32_t doGenerateSourceData(SOperatorInfo* pOperator) {
1,792,913✔
769
  SProjectOperatorInfo* pProjectInfo = pOperator->info;
1,792,913✔
770

771
  SExprSupp*   pSup = &pOperator->exprSupp;
1,792,913✔
772
  SSDataBlock* pRes = pProjectInfo->binfo.pRes;
1,792,913✔
773
  SExprInfo*   pExpr = pSup->pExprInfo;
1,792,913✔
774
  int64_t      st = taosGetTimestampUs();
1,792,913✔
775
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1,792,913✔
776

777
  int32_t code = blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);
1,792,913✔
778
  if (code) {
1,792,913✔
UNCOV
779
    return code;
×
780
  }
781

782
  for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
3,589,514✔
783
    int32_t outputSlotId = pExpr[k].base.resSchema.slotId;
1,796,601✔
784

785
    if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) {
1,796,601✔
786
      SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, outputSlotId);
1,796,601✔
787
      if (pColInfoData == NULL) {
1,796,601✔
788
        return terrno;
×
789
      }
790

791
      int32_t type = pExpr[k].base.pParam[0].param.nType;
1,796,601✔
792
      if (TSDB_DATA_TYPE_NULL == type) {
1,796,601✔
793
        colDataSetNNULL(pColInfoData, 0, 1);
794
      } else {
795
        code = colDataSetVal(pColInfoData, 0, taosVariantGet(&pExpr[k].base.pParam[0].param, type), false);
1,793,643✔
796
        if (code) {
1,793,643✔
797
          return code;
×
798
        }
799
      }
UNCOV
800
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) {
×
UNCOV
801
      SqlFunctionCtx* pfCtx = &pSup->pCtx[k];
×
802

803
      // UDF scalar functions will be calculated here, for example, select foo(n) from (select 1 n).
804
      // UDF aggregate functions will be handled in agg operator.
UNCOV
805
      if (fmIsScalarFunc(pfCtx->functionId)) {
×
806
        SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
×
UNCOV
807
        if (pBlockList == NULL) {
×
UNCOV
808
          return terrno;
×
809
        }
810

UNCOV
811
        void* px = taosArrayPush(pBlockList, &pRes);
×
UNCOV
812
        if (px == NULL) {
×
UNCOV
813
          return terrno;
×
814
        }
815

816
        SColumnInfoData* pResColData = taosArrayGet(pRes->pDataBlock, outputSlotId);
×
817
        if (pResColData == NULL) {
×
UNCOV
818
          return terrno;
×
819
        }
820

821
        SColumnInfoData  idata = {.info = pResColData->info, .hasNull = true};
×
822

UNCOV
823
        SScalarParam dest = {.columnData = &idata};
×
UNCOV
824
        gTaskScalarExtra.pStreamInfo = GET_STM_RTINFO(pOperator->pTaskInfo);
×
825
        gTaskScalarExtra.pStreamRange = NULL;
×
826
        code = scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest, &gTaskScalarExtra);
×
827
        if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
828
          taosArrayDestroy(pBlockList);
×
UNCOV
829
          return code;
×
830
        }
831

832
        int32_t startOffset = pRes->info.rows;
×
833
        if (pRes->info.capacity <= 0) {
×
834
          qError("project failed at: %s:%d", __func__, __LINE__);
×
835
          return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
836
        }
837
        code = colDataAssign(pResColData, &idata, dest.numOfRows, &pRes->info);
×
838
        if (code) {
×
UNCOV
839
          return code;
×
840
        }
841

842
        colDataDestroy(&idata);
×
843
        taosArrayDestroy(pBlockList);
×
844
      } else {
UNCOV
845
        return TSDB_CODE_OPS_NOT_SUPPORT;
×
846
      }
847
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_OPERATOR) {
×
848
      TAOS_CHECK_RETURN(projectApplyOperator(&pExpr[k], pRes, NULL, outputSlotId, NULL, false, &gTaskScalarExtra));
×
849
    } else {
UNCOV
850
      return TSDB_CODE_OPS_NOT_SUPPORT;
×
851
    }
852
  }
853

854
  pRes->info.rows = 1;
1,792,913✔
855
  code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
1,792,913✔
856
  if (code) {
1,792,913✔
857
    pTaskInfo->code = code;
×
UNCOV
858
    return code;
×
859
  }
860

861
  (void) doIngroupLimitOffset(&pProjectInfo->limitInfo, 0, pRes, pOperator);
1,792,913✔
862

863
  pOperator->resultInfo.totalRows += pRes->info.rows;
1,792,913✔
864

865
  setOperatorCompleted(pOperator);
1,792,913✔
866
  if (pOperator->cost.openCost == 0) {
1,792,913✔
867
    pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
1,792,913✔
868
  }
869

870
  return code;
1,792,913✔
871
}
872

873
static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, SArray* pPseudoList) {
441,284,145✔
874
  size_t num = (pPseudoList != NULL) ? taosArrayGetSize(pPseudoList) : 0;
441,284,145✔
875
  for (int32_t i = 0; i < num; ++i) {
441,758,656✔
UNCOV
876
    pCtx[i].pOutput = taosArrayGet(pResult->pDataBlock, i);
×
UNCOV
877
    if (pCtx[i].pOutput == NULL) {
×
UNCOV
878
      qError("failed to get the output buf, ptr is null");
×
879
    }
880
  }
881
}
441,758,656✔
882

883
int32_t projectApplyColumn(SSDataBlock* pResult, SSDataBlock* pSrcBlock, int32_t outputSlotId, SqlFunctionCtx* pfCtx, int32_t* numOfRows, bool createNewColModel) {
977,369,968✔
884
  int32_t code = 0, lino = 0;
977,369,968✔
885
  SInputColumnInfoData* pInputData = &pfCtx->input;
977,369,968✔
886
  SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
977,479,748✔
887
  TSDB_CHECK_NULL(pColInfoData, code, lino, _exit, terrno);
977,462,179✔
888

889
  if (pResult->info.rows > 0 && !createNewColModel) {
977,462,179✔
890
    if (pInputData->pData[0] == NULL) {
3,240✔
891
      int32_t slotId = pfCtx->param[0].pCol->slotId;
3,240✔
892

893
      SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId);
3,240✔
894
      TSDB_CHECK_NULL(pInput, code, lino, _exit, terrno);
3,240✔
895

896
      TAOS_CHECK_EXIT(colDataMergeCol(pColInfoData, pResult->info.rows, (int32_t*)&pResult->info.capacity, pInput,
3,240✔
897
                            pSrcBlock->info.rows));
898
      *numOfRows = pSrcBlock->info.rows;
3,240✔
899
      return code;
3,240✔
900
    }
901
    
UNCOV
902
    TAOS_CHECK_EXIT(colDataMergeCol(pColInfoData, pResult->info.rows, (int32_t*)&pResult->info.capacity,
×
903
                          pInputData->pData[0], pInputData->numOfRows));
UNCOV
904
    *numOfRows = pInputData->numOfRows;
×
UNCOV
905
    return code;
×
906
  } 
907
  
908
  if (pInputData->pData[0] == NULL) {
977,455,802✔
909
    int32_t slotId = pfCtx->param[0].pCol->slotId;
3,176✔
910

911
    SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId);
3,176✔
912
    TSDB_CHECK_NULL(pInput, code, lino, _exit, terrno);
3,176✔
913

914
    TAOS_CHECK_EXIT(colDataAssign(pColInfoData, pInput, pSrcBlock->info.rows, &pResult->info));
3,176✔
915
    *numOfRows = pSrcBlock->info.rows;
3,176✔
916

917
    return code;
3,176✔
918
  }
919
  
920
  TAOS_CHECK_EXIT(colDataAssign(pColInfoData, pInputData->pData[0], pInputData->numOfRows, &pResult->info));
977,174,299✔
921
  *numOfRows = pInputData->numOfRows;
977,498,723✔
922

923
_exit:
977,538,763✔
924

925
  if (code) {
977,538,763✔
UNCOV
926
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
927
  }
928
  
929
  return code;
977,464,825✔
930
}
931

932

933
int32_t projectApplyValue(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, int32_t outputSlotId, int32_t* numOfRows, bool createNewColModel) {
19,532,458✔
934
  int32_t code = 0, lino = 0;
19,532,458✔
935
  SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
19,532,458✔
936
  TSDB_CHECK_NULL(pColInfoData, code, lino, _exit, terrno);
19,507,840✔
937

938
  int32_t offset = createNewColModel ? 0 : pResult->info.rows;
19,507,840✔
939
  int32_t type = pExpr->base.pParam[0].param.nType;
19,517,217✔
940
  if (TSDB_DATA_TYPE_NULL == type) {
19,498,661✔
941
    colDataSetNNULL(pColInfoData, offset, pSrcBlock->info.rows);
694,331✔
942
  } else {
943
    char* p = taosVariantGet(&pExpr->base.pParam[0].param, type);
18,804,330✔
944
    for (int32_t i = 0; i < pSrcBlock->info.rows; ++i) {
2,147,483,647✔
945
      TAOS_CHECK_EXIT(colDataSetVal(pColInfoData, i + offset, p, false));
2,147,483,647✔
946
    }
947
  }
948

949
  *numOfRows = pSrcBlock->info.rows;
18,384,002✔
950

951
_exit:
19,537,575✔
952

953
  if (code) {
19,537,575✔
UNCOV
954
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
955
  }
956
  
957
  return code;
19,522,562✔
958
}
959

960

961

962
int32_t projectApplyOperator(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, int32_t outputSlotId, int32_t* numOfRows, bool createNewColModel, const void* pExtraParams) {
80,776,787✔
963
  int32_t code = 0, lino = 0;
80,776,787✔
964
  SArray* pBlockList = NULL;
80,776,787✔
965
  if (NULL != pSrcBlock) {
80,776,787✔
966
    pBlockList = taosArrayInit(4, POINTER_BYTES);
80,787,793✔
967
    TSDB_CHECK_NULL(pBlockList, code, lino, _exit, terrno);
80,795,797✔
968

969
    void* px = taosArrayPush(pBlockList, &pSrcBlock);
80,789,956✔
970
    TSDB_CHECK_NULL(px, code, lino, _exit, terrno);
80,789,956✔
971
  }
972
  
973
  SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId);
80,778,950✔
974
  TSDB_CHECK_NULL(pResColData, code, lino, _exit, terrno);
80,757,616✔
975

976
  SColumnInfoData idata = {.info = pResColData->info, .hasNull = true};
80,757,616✔
977
  SScalarParam dest = {.columnData = &idata};
80,779,414✔
978
  gTaskScalarExtra.pStreamInfo = (void*)pExtraParams;
80,780,782✔
979
  gTaskScalarExtra.pStreamRange = NULL;
80,780,782✔
980
  TAOS_CHECK_EXIT(scalarCalculate(pExpr->pExpr->_optrRoot.pRootNode, pBlockList, &dest, &gTaskScalarExtra));
80,755,622✔
981

982
  if (pResult->info.rows > 0 && !createNewColModel) {
79,584,427✔
UNCOV
983
    code = colDataMergeCol(pResColData, pResult->info.rows, (int32_t*)&pResult->info.capacity, &idata, dest.numOfRows);
×
984
  } else {
985
    code = colDataAssign(pResColData, &idata, dest.numOfRows, &pResult->info);
79,585,589✔
986
  }
987

988
  colDataDestroy(&idata);
79,620,294✔
989
  TAOS_CHECK_EXIT(code);
79,621,342✔
990

991
  if (numOfRows) {
79,621,342✔
992
    *numOfRows = dest.numOfRows;
79,623,964✔
993
  }
994
  
995
_exit:
80,798,316✔
996

997
  if (code < 0) {
80,786,245✔
998
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
1,174,857✔
999
  }
1000

1001
  taosArrayDestroy(pBlockList);
80,786,245✔
1002
  
1003
  return code;
80,769,667✔
1004
}
1005

1006

1007
int32_t projectApplyFunction(SqlFunctionCtx* pCtx, SqlFunctionCtx* pfCtx, SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, 
341,643,627✔
1008
                                    int32_t outputSlotId, int32_t* numOfRows, bool createNewColModel, const void* pExtraParams, 
1009
                                    SArray* pPseudoList, SArray** processByRowFunctionCtx, bool doSelectFunc) {
1010
  int32_t code = 0, lino = 0;
341,643,627✔
1011
  SArray* pBlockList = NULL;
341,643,627✔
1012
  SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId);
341,643,627✔
1013
  TSDB_CHECK_NULL(pResColData, code, lino, _exit, terrno);
341,835,337✔
1014

1015
  if (fmIsPlaceHolderFunc(pfCtx->functionId) && pExtraParams && pfCtx->pExpr->base.pParamList && 1 == pfCtx->pExpr->base.pParamList->length) {
341,835,337✔
1016
    TAOS_CHECK_EXIT(scalarAssignPlaceHolderRes(pResColData, pResult->info.rows, pSrcBlock->info.rows, pfCtx->functionId, pExtraParams));
2,678,266✔
1017
    *numOfRows = pSrcBlock->info.rows;
2,678,266✔
1018

1019
    return code;
2,678,266✔
1020
  }
1021

1022
  if (fmIsScalarFunc(pfCtx->functionId) || fmIsPlaceHolderFunc(pfCtx->functionId)) {
339,057,908✔
1023
    pBlockList = taosArrayInit(4, POINTER_BYTES);
212,341,565✔
1024
    TSDB_CHECK_NULL(pBlockList, code, lino, _exit, terrno);
212,426,455✔
1025

1026
    void* px = taosArrayPush(pBlockList, &pSrcBlock);
212,176,352✔
1027
    TSDB_CHECK_NULL(px, code, lino, _exit, terrno);
212,176,352✔
1028

1029
    SColumnInfoData idata = {.info = pResColData->info, .hasNull = true};
212,176,352✔
1030
    SScalarParam dest = {.columnData = &idata};
212,129,709✔
1031
    gTaskScalarExtra.pStreamInfo = (void*)pExtraParams;
212,319,284✔
1032
    gTaskScalarExtra.pStreamRange = NULL;
212,319,284✔
1033
    TAOS_CHECK_EXIT(scalarCalculate((SNode*)pExpr->pExpr->_function.pFunctNode, pBlockList, &dest, &gTaskScalarExtra));
212,247,118✔
1034

1035
    if (pResult->info.rows > 0 && !createNewColModel) {
212,068,656✔
1036
      code = colDataMergeCol(pResColData, pResult->info.rows, (int32_t*)&pResult->info.capacity, &idata, dest.numOfRows);
1,392✔
1037
    } else {
1038
      SColumnInfo oriInfo = pResColData->info;
212,222,332✔
1039
      code = colDataAssign(pResColData, &idata, dest.numOfRows, &pResult->info);
212,199,597✔
1040
      // restore the original column info to satisfy the output column schema
1041
      pResColData->info = oriInfo;
212,361,173✔
1042
    }
1043

1044
    colDataDestroy(&idata);
212,290,149✔
1045
    taosArrayDestroy(pBlockList);
212,322,033✔
1046
    TAOS_CHECK_EXIT(code);
212,245,375✔
1047

1048
    *numOfRows = dest.numOfRows;
212,245,375✔
1049

1050
    return code;
212,296,336✔
1051
  }
1052

1053
  if (fmIsIndefiniteRowsFunc(pfCtx->functionId)) {
126,755,381✔
1054
    SResultRowEntryInfo* pResInfo = GET_RES_INFO(pfCtx);
47,382,650✔
1055
    TAOS_CHECK_EXIT(pfCtx->fpSet.init(pfCtx, pResInfo));
47,382,650✔
1056

1057

1058
    pfCtx->pOutput = (char*)pResColData;
47,383,158✔
1059
    TSDB_CHECK_NULL(pfCtx->pOutput, code, lino, _exit, terrno);
47,383,158✔
1060

1061
    pfCtx->offset = createNewColModel ? 0 : pResult->info.rows;  // set the start offset
47,382,650✔
1062

1063
    // set the timestamp(_rowts) output buffer
1064
    if (taosArrayGetSize(pPseudoList) > 0) {
47,382,650✔
UNCOV
1065
      int32_t* outputColIndex = taosArrayGet(pPseudoList, 0);
×
UNCOV
1066
      TSDB_CHECK_NULL(outputColIndex, code, lino, _exit, terrno);
×
1067

UNCOV
1068
      pfCtx->pTsOutput = (SColumnInfoData*)pCtx[*outputColIndex].pOutput;
×
1069
    }
1070

1071
    // link pDstBlock to set selectivity value
1072
    if (pfCtx->subsidiaries.num > 0) {
47,383,021✔
1073
      pfCtx->pDstBlock = pResult;
43,690,184✔
1074
    }
1075

1076
    code = pfCtx->fpSet.process(pfCtx);
47,383,666✔
1077
    if (code != TSDB_CODE_SUCCESS) {
47,384,037✔
1078
      if (pfCtx->fpSet.cleanup != NULL) {
9,408✔
UNCOV
1079
        pfCtx->fpSet.cleanup(pfCtx);
×
1080
      }
1081
      TAOS_CHECK_EXIT(code);
9,408✔
1082
    }
1083

1084
    *numOfRows = pResInfo->numOfRes;
47,374,629✔
1085
    
1086
    if (fmIsProcessByRowFunc(pfCtx->functionId)) {
47,374,629✔
1087
      if (NULL == *processByRowFunctionCtx) {
44,894,416✔
1088
        *processByRowFunctionCtx = taosArrayInit(1, sizeof(SqlFunctionCtx*));
44,845,230✔
1089
        TSDB_CHECK_NULL(*processByRowFunctionCtx, code, lino, _exit, terrno);
44,845,230✔
1090
      }
1091

1092
      void* px = taosArrayPush(*processByRowFunctionCtx, &pfCtx);
44,894,416✔
1093
      TSDB_CHECK_NULL(px, code, lino, _exit, terrno);
44,894,416✔
1094
    }
1095

1096
    return code;
47,374,629✔
1097
  } 
1098

1099
  if (fmIsAggFunc(pfCtx->functionId)) {
79,372,360✔
1100
    // selective value output should be set during corresponding function execution
1101
    if (!doSelectFunc && fmIsSelectValueFunc(pfCtx->functionId)) {
79,372,360✔
1102
      return code;
43,796,669✔
1103
    }
1104
    
1105
    // _group_key function for "partition by tbname" + csum(col_name) query
1106
    int32_t slotId = pfCtx->param[0].pCol->slotId;
35,575,691✔
1107

1108
    // todo handle the json tag
1109
    SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId);
35,575,691✔
1110
    TSDB_CHECK_NULL(pInput, code, lino, _exit, terrno);
35,575,691✔
1111

1112
    for (int32_t f = 0; f < pSrcBlock->info.rows; ++f) {
2,147,483,647✔
1113
      bool isNull = colDataIsNull_s(pInput, f);
2,147,483,647✔
1114
      if (isNull) {
2,147,483,647✔
1115
        colDataSetNULL(pResColData, pResult->info.rows + f);
249,803,170✔
1116
      } else {
1117
        char* data = colDataGetData(pInput, f);
2,147,483,647✔
1118
        TAOS_CHECK_EXIT(colDataSetVal(pResColData, pResult->info.rows + f, data, isNull));
2,147,483,647✔
1119
      }
1120
    }
1121

1122
    *numOfRows = pSrcBlock->info.rows;
35,575,691✔
1123

1124
    return code;
35,575,691✔
1125
  } 
1126
  
UNCOV
1127
  if (fmIsGroupIdFunc(pfCtx->functionId)) {
×
UNCOV
1128
    for (int32_t f = 0; f < pSrcBlock->info.rows; ++f) {
×
UNCOV
1129
      TAOS_CHECK_EXIT(colDataSetVal(pResColData, pResult->info.rows + f, (const char*)&pSrcBlock->info.id.groupId, false));
×
1130
    }
1131

UNCOV
1132
    *numOfRows = pSrcBlock->info.rows;
×
UNCOV
1133
    return code;
×
1134
  }
1135
  
1136
_exit:
×
1137

1138
  if (code) {
40,365✔
1139
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
40,365✔
1140
  }
1141

1142
  taosArrayDestroy(pBlockList);
40,365✔
1143
  
1144
  return code;
40,365✔
1145
}
1146

1147

1148
int32_t projectApplyFunctionsWithSelect(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock,
441,301,491✔
1149
                                        SqlFunctionCtx* pCtx, int32_t numOfOutput, SArray* pPseudoList,
1150
                                        const void* pExtraParams, bool doSelectFunc, bool hasIndefRowsFunc) {
1151
  int32_t lino = 0;
441,301,491✔
1152
  int32_t code = TSDB_CODE_SUCCESS;
441,301,491✔
1153
  if (hasIndefRowsFunc) {
441,301,491✔
1154
    setPseudoOutputColInfo(pResult, pCtx, pPseudoList);
441,438,264✔
1155
  }
1156
  pResult->info.dataLoad = 1;
441,440,030✔
1157

1158
  SArray* processByRowFunctionCtx = NULL;
441,641,053✔
1159
  if (pSrcBlock == NULL) {
441,051,659✔
UNCOV
1160
    for (int32_t k = 0; k < numOfOutput; ++k) {
×
UNCOV
1161
      int32_t outputSlotId = pExpr[k].base.resSchema.slotId;
×
1162

UNCOV
1163
      if (pExpr[k].pExpr->nodeType != QUERY_NODE_VALUE) {
×
UNCOV
1164
        qError("project failed at: %s:%d", __func__, __LINE__);
×
UNCOV
1165
        TAOS_CHECK_EXIT(TSDB_CODE_INVALID_PARA);
×
1166
      }
UNCOV
1167
      SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
×
UNCOV
1168
      TSDB_CHECK_NULL(pColInfoData, code, lino, _exit, terrno);
×
1169

1170
      int32_t type = pExpr[k].base.pParam[0].param.nType;
×
UNCOV
1171
      if (TSDB_DATA_TYPE_NULL == type) {
×
1172
        colDataSetNNULL(pColInfoData, 0, 1);
1173
      } else {
1174
        TAOS_CHECK_EXIT(colDataSetVal(pColInfoData, 0, taosVariantGet(&pExpr[k].base.pParam[0].param, type), false));
×
1175
      }
1176
    }
1177

UNCOV
1178
    pResult->info.rows = 1;
×
1179
    goto _exit;
×
1180
  }
1181

1182
  if (pResult != pSrcBlock) {
441,051,659✔
1183
    pResult->info.id.groupId = pSrcBlock->info.id.groupId;
339,508,454✔
1184
    if (pSrcBlock->info.parTbName[0]) {
339,794,612✔
UNCOV
1185
      tstrncpy(pResult->info.parTbName, pSrcBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
×
1186
    }
1187
    qTrace("%s, parName:%s,groupId:%" PRIu64, __FUNCTION__, pSrcBlock->info.parTbName, pResult->info.id.groupId);
339,678,689✔
1188
  }
1189

1190
  // if the source equals to the destination, it is to create a new column as the result of scalar
1191
  // function or some operators.
1192
  bool createNewColModel = (pResult == pSrcBlock);
440,984,520✔
1193
  if (createNewColModel) {
440,984,520✔
1194
    TAOS_CHECK_EXIT(blockDataEnsureCapacity(pResult, pResult->info.rows));
101,846,631✔
1195
  }
1196

1197
  int32_t numOfRows = 0;
440,967,981✔
1198

1199
  for (int32_t k = 0; k < numOfOutput; ++k) {
1,859,133,962✔
1200
    int32_t               outputSlotId = pExpr[k].base.resSchema.slotId;
1,418,895,527✔
1201
    SqlFunctionCtx*       pfCtx = &pCtx[k];
1,419,025,393✔
1202
    switch (pExpr[k].pExpr->nodeType) {
1,419,400,834✔
1203
      case QUERY_NODE_COLUMN: {
977,349,722✔
1204
        TAOS_CHECK_EXIT(projectApplyColumn(pResult, pSrcBlock, outputSlotId, pfCtx, &numOfRows, createNewColModel));
977,349,722✔
1205
        break;
977,453,929✔
1206
      } 
1207
      case QUERY_NODE_VALUE: {
19,516,068✔
1208
        TAOS_CHECK_EXIT(projectApplyValue(&pExpr[k], pResult, pSrcBlock, outputSlotId, &numOfRows, createNewColModel));
19,516,068✔
1209
        break;
19,521,261✔
1210
      } 
1211
      case QUERY_NODE_OPERATOR: {
80,752,042✔
1212
        TAOS_CHECK_EXIT(projectApplyOperator(&pExpr[k], pResult, pSrcBlock, outputSlotId, &numOfRows, createNewColModel, pExtraParams));
80,752,042✔
1213
        break;
79,577,610✔
1214
      } 
1215
      case QUERY_NODE_FUNCTION: {
341,710,909✔
1216
        TAOS_CHECK_EXIT(projectApplyFunction(pCtx, pfCtx, &pExpr[k], pResult, pSrcBlock, outputSlotId, &numOfRows, createNewColModel, pExtraParams, pPseudoList, &processByRowFunctionCtx, doSelectFunc));
341,710,909✔
1217
        break;
341,431,901✔
1218
      }
UNCOV
1219
      default: {
×
UNCOV
1220
        qError("invalid project expr nodeType:%d", pExpr[k].pExpr->nodeType);
×
1221
        TAOS_CHECK_EXIT(TSDB_CODE_OPS_NOT_SUPPORT);
64✔
1222
      }
1223
    }
1224
  }
1225

1226
  if (processByRowFunctionCtx && taosArrayGetSize(processByRowFunctionCtx) > 0) {
440,238,435✔
1227
    SqlFunctionCtx** pfCtx = taosArrayGet(processByRowFunctionCtx, 0);
44,845,230✔
1228
    TSDB_CHECK_NULL(pfCtx, code, lino, _exit, terrno);
44,845,230✔
1229

1230
    TAOS_CHECK_EXIT((*pfCtx)->fpSet.processFuncByRow(processByRowFunctionCtx));
44,845,230✔
1231
    numOfRows = (*pfCtx)->resultInfo->numOfRes;
44,837,158✔
1232
  }
1233

1234
  if (!createNewColModel) {
440,263,691✔
1235
    pResult->info.rows += numOfRows;
338,608,873✔
1236
  }
1237

1238
_exit:
441,562,668✔
1239
  if (processByRowFunctionCtx) {
441,644,343✔
1240
    taosArrayDestroy(processByRowFunctionCtx);
44,845,230✔
1241
  }
1242
  if (code) {
441,644,343✔
1243
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
1,223,294✔
1244
  }
1245
  return code;
441,644,343✔
1246
}
1247

1248
int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
441,400,317✔
1249
                              int32_t numOfOutput, SArray* pPseudoList, const void* pExtraParams) {
1250
  return projectApplyFunctionsWithSelect(pExpr, pResult, pSrcBlock, pCtx, numOfOutput, pPseudoList, pExtraParams, false, true);
441,400,317✔
1251
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc