• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4548

22 Jul 2025 02:37AM UTC coverage: 54.273% (-3.0%) from 57.287%
#4548

push

travis-ci

GitHub
Merge pull request #32061 from taosdata/new_testcases

132738 of 315239 branches covered (42.11%)

Branch coverage included in aggregate %.

201371 of 300373 relevant lines covered (67.04%)

3475977.14 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.55
/source/libs/executor/src/projectoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "functionMgt.h"
19
#include "operator.h"
20
#include "querytask.h"
21
#include "taoserror.h"
22
#include "tdatablock.h"
23

24
typedef struct SProjectOperatorInfo {
25
  SOptrBasicInfo binfo;
26
  SAggSupporter  aggSup;
27
  SArray*        pPseudoColInfo;
28
  SLimitInfo     limitInfo;
29
  bool           mergeDataBlocks;
30
  SSDataBlock*   pFinalRes;
31
  bool           inputIgnoreGroup;
32
  bool           outputIgnoreGroup;
33
} SProjectOperatorInfo;
34

35
typedef struct SIndefOperatorInfo {
36
  SOptrBasicInfo binfo;
37
  SAggSupporter  aggSup;
38
  SArray*        pPseudoColInfo;
39
  SExprSupp      scalarSup;
40
  uint64_t       groupId;
41
  SSDataBlock*   pNextGroupRes;
42
} SIndefOperatorInfo;
43

44
static int32_t      doGenerateSourceData(SOperatorInfo* pOperator);
45
static int32_t      doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
46
static int32_t      doApplyIndefinitFunction(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
47
static int32_t      setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols, SArray** pResList);
48
static int32_t      setFunctionResultOutput(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SAggSupporter* pSup,
49
                                            int32_t stage, int32_t numOfExprs);
50

51
static void destroyProjectOperatorInfo(void* param) {
429,914✔
52
  if (NULL == param) {
429,914!
53
    return;
×
54
  }
55

56
  SProjectOperatorInfo* pInfo = (SProjectOperatorInfo*)param;
429,914✔
57
  cleanupBasicInfo(&pInfo->binfo);
429,914✔
58
  cleanupAggSup(&pInfo->aggSup);
429,966✔
59
  taosArrayDestroy(pInfo->pPseudoColInfo);
429,963✔
60

61
  blockDataDestroy(pInfo->pFinalRes);
429,969✔
62
  taosMemoryFreeClear(param);
429,970!
63
}
64

65
static void destroyIndefinitOperatorInfo(void* param) {
20,482✔
66
  SIndefOperatorInfo* pInfo = (SIndefOperatorInfo*)param;
20,482✔
67
  if (pInfo == NULL) {
20,482!
68
    return;
×
69
  }
70

71
  cleanupBasicInfo(&pInfo->binfo);
20,482✔
72
  taosArrayDestroy(pInfo->pPseudoColInfo);
20,482✔
73
  cleanupAggSup(&pInfo->aggSup);
20,482✔
74
  cleanupExprSupp(&pInfo->scalarSup);
20,482✔
75

76
  taosMemoryFreeClear(param);
20,482!
77
}
78

79
void streamOperatorReleaseState(SOperatorInfo* pOperator) {
×
80
  SOperatorInfo* downstream = pOperator->pDownstream[0];
×
81
  if (downstream->fpSet.releaseStreamStateFn) {
×
82
    downstream->fpSet.releaseStreamStateFn(downstream);
×
83
  }
84
}
×
85

86
void streamOperatorReloadState(SOperatorInfo* pOperator) {
×
87
  SOperatorInfo* downstream = pOperator->pDownstream[0];
×
88
  if (downstream->fpSet.reloadStreamStateFn) {
×
89
    downstream->fpSet.reloadStreamStateFn(downstream);
×
90
  }
91
}
×
92

93
static int32_t resetProjectOperState(SOperatorInfo* pOper) {
×
94
  SProjectOperatorInfo* pProject = pOper->info;
×
95
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
×
96
  pOper->status = OP_NOT_OPENED;
×
97

98
  resetBasicOperatorState(&pProject->binfo);
×
99
  SProjectPhysiNode* pPhynode = (SProjectPhysiNode*)pOper->pPhyNode;
×
100

101
  pProject->limitInfo = (SLimitInfo){0};
×
102
  initLimitInfo(pPhynode->node.pLimit, pPhynode->node.pSlimit, &pProject->limitInfo);
×
103

104
  blockDataCleanup(pProject->pFinalRes);
×
105

106
  int32_t code = resetAggSup(&pOper->exprSupp, &pProject->aggSup, pTaskInfo, pPhynode->pProjections, NULL,
×
107
    sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
×
108
    &pTaskInfo->storageAPI.functionStore);
109
  if (code == 0){
×
110
    code = setFunctionResultOutput(pOper, &pProject->binfo, &pProject->aggSup, MAIN_SCAN, pOper->exprSupp.numOfExprs);
×
111
  }
112
  return 0;
×
113
}
114

115
int32_t createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, SExecTaskInfo* pTaskInfo,
429,777✔
116
                                  SOperatorInfo** pOptrInfo) {
117
  QRY_PARAM_CHECK(pOptrInfo);
429,777!
118

119
  int32_t code = TSDB_CODE_SUCCESS;
429,777✔
120
  SProjectOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SProjectOperatorInfo));
429,777!
121
  SOperatorInfo*        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
429,871!
122
  if (pInfo == NULL || pOperator == NULL) {
429,887!
123
    code = terrno;
×
124
    goto _error;
×
125
  }
126

127
  pOperator->pPhyNode = pProjPhyNode;
429,895✔
128
  pOperator->exprSupp.hasWindowOrGroup = false;
429,895✔
129
  pOperator->pTaskInfo = pTaskInfo;
429,895✔
130

131
  int32_t    lino = 0;
429,895✔
132

133
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pProjPhyNode->node.pOutputDataBlockDesc);
429,895✔
134
  TSDB_CHECK_NULL(pResBlock, code, lino, _error, terrno);
429,941!
135

136
  initLimitInfo(pProjPhyNode->node.pLimit, pProjPhyNode->node.pSlimit, &pInfo->limitInfo);
429,941✔
137

138
  pInfo->binfo.pRes = pResBlock;
429,911✔
139
  pInfo->pFinalRes = NULL;
429,911✔
140

141
  code = createOneDataBlock(pResBlock, false, &pInfo->pFinalRes);
429,911✔
142
  TSDB_CHECK_CODE(code, lino, _error);
429,888!
143

144
  pInfo->binfo.inputTsOrder = pProjPhyNode->node.inputTsOrder;
429,888✔
145
  pInfo->binfo.outputTsOrder = pProjPhyNode->node.outputTsOrder;
429,888✔
146
  pInfo->inputIgnoreGroup = pProjPhyNode->inputIgnoreGroup;
429,888✔
147
  pInfo->outputIgnoreGroup = pProjPhyNode->ignoreGroupId;
429,888✔
148

149
  if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {
429,888✔
150
    pInfo->mergeDataBlocks = false;
197✔
151
  } else {
152
    if (!pProjPhyNode->ignoreGroupId) {
429,691✔
153
      pInfo->mergeDataBlocks = false;
5,396✔
154
    } else {
155
      pInfo->mergeDataBlocks = pProjPhyNode->mergeDataBlock;
424,295✔
156
    }
157
  }
158

159
  int32_t numOfRows = 4096;
429,888✔
160
  size_t  keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
429,888✔
161

162
  // Make sure the size of SSDataBlock will never exceed the size of 2MB.
163
  int32_t TWOMB = 2 * 1024 * 1024;
429,888✔
164
  if (numOfRows * pResBlock->info.rowSize > TWOMB) {
429,888✔
165
    numOfRows = TWOMB / pResBlock->info.rowSize;
17,456✔
166
  }
167

168
  initResultSizeInfo(&pOperator->resultInfo, numOfRows);
429,888✔
169
  
170
  int32_t    numOfCols = 0;
429,866✔
171
  SExprInfo* pExprInfo = NULL;
429,866✔
172
  code = createExprInfo(pProjPhyNode->pProjections, NULL, &pExprInfo, &numOfCols);
429,866✔
173
  TSDB_CHECK_CODE(code, lino, _error);
429,873!
174
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
429,873✔
175
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
429,873✔
176
  TSDB_CHECK_CODE(code, lino, _error);
429,844!
177

178
  initBasicInfo(&pInfo->binfo, pResBlock);
429,844✔
179
  code = setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfCols);
429,809✔
180
  TSDB_CHECK_CODE(code, lino, _error);
429,789!
181

182
  code = filterInitFromNode((SNode*)pProjPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
429,789✔
183
                            pTaskInfo->pStreamRuntimeInfo);
429,789✔
184
  TSDB_CHECK_CODE(code, lino, _error);
429,763!
185

186
  code = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfCols, &pInfo->pPseudoColInfo);
429,763✔
187
  TSDB_CHECK_CODE(code, lino, _error);
429,754!
188

189
  setOperatorInfo(pOperator, "ProjectOperator", QUERY_NODE_PHYSICAL_PLAN_PROJECT, false, OP_NOT_OPENED, pInfo,
429,754✔
190
                  pTaskInfo);
191
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doProjectOperation, NULL, destroyProjectOperatorInfo,
429,759✔
192
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
193
  setOperatorStreamStateFn(pOperator, streamOperatorReleaseState, streamOperatorReloadState);
429,741✔
194
  setOperatorResetStateFn(pOperator, resetProjectOperState);
429,808✔
195

196
  if (NULL != downstream) {
429,822✔
197
    code = appendDownstream(pOperator, &downstream, 1);
428,957✔
198
    if (code != TSDB_CODE_SUCCESS) {
429,020!
199
      goto _error;
×
200
    }
201
  }
202

203
  *pOptrInfo = pOperator;
429,885✔
204
  return TSDB_CODE_SUCCESS;
429,885✔
205

206
_error:
×
207
  if (pInfo != NULL) destroyProjectOperatorInfo(pInfo);
×
208
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
209
  pTaskInfo->code = code;
×
210
  return code;
×
211
}
212

213
static int32_t discardGroupDataBlock(SSDataBlock* pBlock, SLimitInfo* pLimitInfo) {
1,200,227✔
214
  if (pLimitInfo->remainGroupOffset > 0) {
1,200,227✔
215
    // it is the first group
216
    if (pLimitInfo->currentGroupId == 0 || pLimitInfo->currentGroupId == pBlock->info.id.groupId) {
3,471✔
217
      pLimitInfo->currentGroupId = pBlock->info.id.groupId;
1,014✔
218
      return PROJECT_RETRIEVE_CONTINUE;
1,014✔
219
    } else if (pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
2,457!
220
      // now it is the data from a new group
221
      pLimitInfo->remainGroupOffset -= 1;
2,457✔
222
      pLimitInfo->currentGroupId = pBlock->info.id.groupId;
2,457✔
223

224
      // ignore data block in current group
225
      if (pLimitInfo->remainGroupOffset > 0) {
2,457✔
226
        return PROJECT_RETRIEVE_CONTINUE;
2,285✔
227
      }
228

229
      pLimitInfo->currentGroupId = 0;
172✔
230
    }
231
  }
232

233
  return PROJECT_RETRIEVE_DONE;
1,196,928✔
234
}
235

236
static int32_t setInfoForNewGroup(SSDataBlock* pBlock, SLimitInfo* pLimitInfo, SOperatorInfo* pOperator) {
1,196,767✔
237
  // remainGroupOffset == 0
238
  // here check for a new group data, we need to handle the data of the previous group.
239
  if (!(pLimitInfo->remainGroupOffset == 0 || pLimitInfo->remainGroupOffset == -1)) {
1,196,767!
240
    qError("project failed at: %s:%d", __func__, __LINE__);
×
241
    return TSDB_CODE_INVALID_PARA;
×
242
  }
243

244
  bool newGroup = false;
1,196,767✔
245
  if (0 == pBlock->info.id.groupId) {
1,196,767✔
246
    pLimitInfo->numOfOutputGroups = 1;
1,092,022✔
247
  } else if (pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
104,745✔
248
    pLimitInfo->currentGroupId = pBlock->info.id.groupId;
88,165✔
249
    pLimitInfo->numOfOutputGroups += 1;
88,165✔
250
    newGroup = true;
88,165✔
251
  } else {
252
    return PROJECT_RETRIEVE_CONTINUE;
16,580✔
253
  }
254

255
  if ((pLimitInfo->slimit.limit >= 0) && (pLimitInfo->slimit.limit < pLimitInfo->numOfOutputGroups)) {
1,180,187✔
256
    setOperatorCompleted(pOperator);
291✔
257
    return PROJECT_RETRIEVE_DONE;
291✔
258
  }
259

260
  // reset the value for a new group data
261
  // existing rows that belongs to previous group.
262
  if (newGroup) {
1,179,896✔
263
    resetLimitInfoForNextGroup(pLimitInfo);
87,875✔
264
  }
265

266
  return PROJECT_RETRIEVE_CONTINUE;
1,179,819✔
267
}
268

269
// todo refactor
270
static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SSDataBlock* pBlock,
1,197,799✔
271
                                    SOperatorInfo* pOperator) {
272
  // set current group id
273
  pLimitInfo->currentGroupId = groupId;
1,197,799✔
274
  bool limitReached = applyLimitOffset(pLimitInfo, pBlock, pOperator->pTaskInfo);
1,197,799✔
275
  if (pBlock->info.rows == 0 && 0 != pLimitInfo->limit.limit) {
1,197,937✔
276
    return PROJECT_RETRIEVE_CONTINUE;
17,277✔
277
  } else {
278
    if (limitReached && (pLimitInfo->slimit.limit >= 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
1,180,660✔
279
      setOperatorCompleted(pOperator);
148✔
280
    } else if (limitReached && groupId == 0) {
1,180,512✔
281
      setOperatorCompleted(pOperator);
34,482✔
282
    }
283
  }
284

285
  return PROJECT_RETRIEVE_DONE;
1,180,468✔
286
}
287

288
int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
898,075✔
289
  QRY_PARAM_CHECK(pResBlock);
898,075!
290

291
  SProjectOperatorInfo* pProjectInfo = pOperator->info;
898,075✔
292
  SOptrBasicInfo*       pInfo = &pProjectInfo->binfo;
898,075✔
293
  SExprSupp*            pSup = &pOperator->exprSupp;
898,075✔
294
  SSDataBlock*          pRes = pInfo->pRes;
898,075✔
295
  SSDataBlock*          pFinalRes = pProjectInfo->pFinalRes;
898,075✔
296
  int32_t               code = 0;
898,075✔
297
  int32_t               lino = 0;
898,075✔
298
  int64_t               st = 0;
898,075✔
299
  int32_t               order = pInfo->inputTsOrder;
898,075✔
300
  int32_t               scanFlag = 0;
898,075✔
301

302
  blockDataCleanup(pFinalRes);
898,075✔
303
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
898,087✔
304

305
  if (pOperator->status == OP_EXEC_DONE) {
898,087✔
306
    return code;
309,860✔
307
  }
308

309
  if (pOperator->cost.openCost == 0) {
588,227✔
310
    st = taosGetTimestampUs();
429,885✔
311
  }
312

313
  SOperatorInfo* downstream = pOperator->numOfDownstream > 0 ? pOperator->pDownstream[0] : NULL;
588,259✔
314
  SLimitInfo*    pLimitInfo = &pProjectInfo->limitInfo;
588,259✔
315

316
  if (downstream == NULL) {
588,259✔
317
    code = doGenerateSourceData(pOperator);
900✔
318
    QUERY_CHECK_CODE(code, lino, _end);
900!
319

320
    if (pProjectInfo->outputIgnoreGroup) {
900!
321
      pRes->info.id.groupId = 0;
900✔
322
    }
323

324
    *pResBlock = (pRes->info.rows > 0)? pRes:NULL;
900!
325
    return code;
900✔
326
  }
327

328
  while (1) {
986,974✔
329
    while (1) {
20,576✔
330
      blockDataCleanup(pRes);
1,594,909✔
331

332
      // The downstream exec may change the value of the newgroup, so use a local variable instead.
333
      SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
1,594,667✔
334
      if (pBlock == NULL) {
1,594,581✔
335
        qDebug("set op close, exec %d, status %d rows %" PRId64 , pTaskInfo->execModel, pOperator->status, pFinalRes->info.rows);
393,974✔
336
        setOperatorCompleted(pOperator);
394,008✔
337
        break;
394,012✔
338
      }
339
//      if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {
340
//        qDebug("set status recv");
341
//        pOperator->status = OP_EXEC_RECV;
342
//      }
343

344
      // for stream interval
345
      if (pBlock->info.type == STREAM_RETRIEVE || pBlock->info.type == STREAM_DELETE_RESULT ||
1,200,607!
346
          pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_CREATE_CHILD_TABLE ||
1,200,318!
347
          pBlock->info.type == STREAM_CHECKPOINT || pBlock->info.type == STREAM_NOTIFY_EVENT) {
1,200,452!
348

349
        *pResBlock = pBlock;
×
350
        return code;
×
351
      }
352

353
      if (pProjectInfo->inputIgnoreGroup) {
1,200,607✔
354
        pBlock->info.id.groupId = 0;
67,241✔
355
      }
356

357
      int32_t status = discardGroupDataBlock(pBlock, pLimitInfo);
1,200,607✔
358
      if (status == PROJECT_RETRIEVE_CONTINUE) {
1,200,334✔
359
        continue;
3,299✔
360
      }
361

362
      (void) setInfoForNewGroup(pBlock, pLimitInfo, pOperator);
1,197,035✔
363
      if (pOperator->status == OP_EXEC_DONE) {
1,197,100✔
364
        break;
291✔
365
      }
366

367
      if (pProjectInfo->mergeDataBlocks) {
1,196,809✔
368
        pFinalRes->info.scanFlag = scanFlag = pBlock->info.scanFlag;
1,059,190✔
369
      } else {
370
        pRes->info.scanFlag = scanFlag = pBlock->info.scanFlag;
137,619✔
371
      }
372

373
      code = setInputDataBlock(pSup, pBlock, order, scanFlag, false);
1,196,809✔
374
      QUERY_CHECK_CODE(code, lino, _end);
1,197,014!
375

376
      code = blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
1,197,014✔
377
      QUERY_CHECK_CODE(code, lino, _end);
1,197,172!
378

379
      code = projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs,
1,197,172✔
380
                                   pProjectInfo->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo));
1,197,172!
381
      QUERY_CHECK_CODE(code, lino, _end);
1,197,054✔
382

383
      status = doIngroupLimitOffset(pLimitInfo, pBlock->info.id.groupId, pInfo->pRes, pOperator);
1,197,011✔
384
      if (status == PROJECT_RETRIEVE_CONTINUE) {
1,196,881✔
385
        continue;
17,277✔
386
      }
387

388
      break;
1,179,604✔
389
    }
390

391
    if (pProjectInfo->mergeDataBlocks) {
1,573,907✔
392
      if (pRes->info.rows > 0) {
1,366,804✔
393
        pFinalRes->info.id.groupId = 0;  // clear groupId
1,047,515✔
394
        pFinalRes->info.version = pRes->info.version;
1,047,515✔
395

396
        // continue merge data, ignore the group id
397
        code = blockDataMerge(pFinalRes, pRes);
1,047,515✔
398
        QUERY_CHECK_CODE(code, lino, _end);
1,047,651!
399

400
        if (pFinalRes->info.rows + pRes->info.rows <= pOperator->resultInfo.threshold && (pOperator->status != OP_EXEC_DONE)) {
1,047,651✔
401
          continue;
987,125✔
402
        }
403
      }
404

405
      // do apply filter
406
      code = doFilter(pFinalRes, pOperator->exprSupp.pFilterInfo, NULL);
379,815✔
407
      QUERY_CHECK_CODE(code, lino, _end);
380,004!
408

409
      // when apply the limit/offset for each group, pRes->info.rows may be 0, due to limit constraint.
410
      if (pFinalRes->info.rows > 0 || (pOperator->status == OP_EXEC_DONE)) {
380,004!
411
        qDebug("project return %" PRId64 " rows, status %d", pFinalRes->info.rows, pOperator->status);
380,155✔
412
        break;
380,005✔
413
      }
414
    } else {
415
      // do apply filter
416
      if (pRes->info.rows > 0) {
207,103✔
417
        code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
131,999✔
418
        QUERY_CHECK_CODE(code, lino, _end);
131,992!
419

420
        if (pRes->info.rows == 0) {
131,992!
421
          continue;
×
422
        }
423
      }
424

425
      // no results generated
426
      break;
207,096✔
427
    }
428
  }
429

430
  SSDataBlock* p = pProjectInfo->mergeDataBlocks ? pFinalRes : pRes;
587,101✔
431
  pOperator->resultInfo.totalRows += p->info.rows;
587,101✔
432
  p->info.dataLoad = 1;
587,101✔
433

434
  if (pOperator->cost.openCost == 0) {
587,101✔
435
    pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
428,938✔
436
  }
437

438
  if (pProjectInfo->outputIgnoreGroup) {
587,104✔
439
    p->info.id.groupId = 0;
556,391✔
440
  }
441

442
  if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
587,104!
443
    printDataBlock(p, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
×
444
  }
445

446
  *pResBlock = (p->info.rows > 0)? p:NULL;
587,308✔
447

448
_end:
587,351✔
449
  if (code != TSDB_CODE_SUCCESS) {
587,351✔
450
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
43!
451
    pTaskInfo->code = code;
43✔
452
    T_LONG_JMP(pTaskInfo->env, code);
43!
453
  }
454
  return code;
587,308✔
455
}
456

457
static int32_t resetIndefinitOutputOperState(SOperatorInfo* pOper) {
×
458
  SIndefOperatorInfo* pInfo = pOper->info;
×
459
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
×
460
  SIndefRowsFuncPhysiNode* pPhynode = (SIndefRowsFuncPhysiNode*)pOper->pPhyNode;
×
461
  pOper->status = OP_NOT_OPENED;
×
462

463
  resetBasicOperatorState(&pInfo->binfo);
×
464

465
  pInfo->groupId = 0;
×
466
  pInfo->pNextGroupRes = NULL;
×
467
  int32_t code = resetAggSup(&pOper->exprSupp, &pInfo->aggSup, pTaskInfo, pPhynode->pFuncs, NULL,
×
468
    sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
×
469
    &pTaskInfo->storageAPI.functionStore);
470
  if (code == 0){
×
471
    code = setFunctionResultOutput(pOper, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, pOper->exprSupp.numOfExprs);
×
472
  }
473

474
  if (code == 0) {
×
475
    code = resetExprSupp(&pInfo->scalarSup, pTaskInfo, pPhynode->pExprs, NULL,
×
476
                         &pTaskInfo->storageAPI.functionStore);
477
  }
478
  return 0;
×
479
}
480

481
int32_t createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode,
20,482✔
482
                                                 SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
483
  QRY_PARAM_CHECK(pOptrInfo);
20,482!
484
  int32_t code = 0;
20,482✔
485
  int32_t lino = 0;
20,482✔
486
  int32_t numOfRows = 4096;
20,482✔
487
  size_t  keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
20,482✔
488

489
  SIndefOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIndefOperatorInfo));
20,482!
490
  SOperatorInfo*      pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
20,481!
491
  if (pInfo == NULL || pOperator == NULL) {
20,481!
492
    code = terrno;
×
493
    goto _error;
×
494
  }
495

496
  pOperator->pPhyNode = pNode;
20,481✔
497
  pOperator->pTaskInfo = pTaskInfo;
20,481✔
498

499
  SExprSupp* pSup = &pOperator->exprSupp;
20,481✔
500
  pSup->hasWindowOrGroup = false;
20,481✔
501

502
  SIndefRowsFuncPhysiNode* pPhyNode = (SIndefRowsFuncPhysiNode*)pNode;
20,481✔
503

504
  if (pPhyNode->pExprs != NULL) {
20,481✔
505
    int32_t    num = 0;
119✔
506
    SExprInfo* pSExpr = NULL;
119✔
507
    code = createExprInfo(pPhyNode->pExprs, NULL, &pSExpr, &num);
119✔
508
    QUERY_CHECK_CODE(code, lino, _error);
119!
509

510
    code = initExprSupp(&pInfo->scalarSup, pSExpr, num, &pTaskInfo->storageAPI.functionStore);
119✔
511
    if (code != TSDB_CODE_SUCCESS) {
119!
512
      goto _error;
×
513
    }
514
  }
515

516
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->node.pOutputDataBlockDesc);
20,481✔
517
  TSDB_CHECK_NULL(pResBlock, code, lino, _error, terrno);
20,482!
518

519
  // Make sure the size of SSDataBlock will never exceed the size of 2MB.
520
  int32_t TWOMB = 2 * 1024 * 1024;
20,482✔
521
  if (numOfRows * pResBlock->info.rowSize > TWOMB) {
20,482!
522
    numOfRows = TWOMB / pResBlock->info.rowSize;
×
523
  }
524

525
  initBasicInfo(&pInfo->binfo, pResBlock);
20,482✔
526
  initResultSizeInfo(&pOperator->resultInfo, numOfRows);
20,482✔
527
  code = blockDataEnsureCapacity(pResBlock, numOfRows);
20,481✔
528
  TSDB_CHECK_CODE(code, lino, _error);
20,481!
529

530
  int32_t    numOfExpr = 0;
20,481✔
531
  SExprInfo* pExprInfo = NULL;
20,481✔
532
  code = createExprInfo(pPhyNode->pFuncs, NULL, &pExprInfo, &numOfExpr);
20,481✔
533
  TSDB_CHECK_CODE(code, lino, _error);
20,480!
534

535
  code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str,
20,480✔
536
                            pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
20,480✔
537
  TSDB_CHECK_CODE(code, lino, _error);
20,480!
538

539
  code = setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr);
20,480✔
540
  TSDB_CHECK_CODE(code, lino, _error);
20,479!
541

542
  code = filterInitFromNode((SNode*)pPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
20,479✔
543
                            pTaskInfo->pStreamRuntimeInfo);
20,479✔
544
  TSDB_CHECK_CODE(code, lino, _error);
20,477!
545

546
  pInfo->binfo.pRes = pResBlock;
20,477✔
547
  pInfo->binfo.inputTsOrder = pNode->inputTsOrder;
20,477✔
548
  pInfo->binfo.outputTsOrder = pNode->outputTsOrder;
20,477✔
549
  code = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr, &pInfo->pPseudoColInfo);
20,477✔
550
  TSDB_CHECK_CODE(code, lino, _error);
20,482!
551

552
  setOperatorInfo(pOperator, "IndefinitOperator", QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC, false, OP_NOT_OPENED, pInfo,
20,482✔
553
                  pTaskInfo);
554
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doApplyIndefinitFunction, NULL, destroyIndefinitOperatorInfo,
20,480✔
555
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
556
                                         
557
  setOperatorResetStateFn(pOperator, resetIndefinitOutputOperState);
20,480✔
558
  code = appendDownstream(pOperator, &downstream, 1);
20,481✔
559
  if (code != TSDB_CODE_SUCCESS) {
20,481!
560
    goto _error;
×
561
  }
562

563
  *pOptrInfo = pOperator;
20,481✔
564
  return TSDB_CODE_SUCCESS;
20,481✔
565

566
_error:
×
567
  if (pInfo != NULL) destroyIndefinitOperatorInfo(pInfo);
×
568
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
569
  pTaskInfo->code = code;
×
570
  return code;
×
571
}
572

573
static void doHandleDataBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, SOperatorInfo* downstream,
250,945✔
574
                              SExecTaskInfo* pTaskInfo) {
575
  SIndefOperatorInfo* pIndefInfo = pOperator->info;
250,945✔
576
  SOptrBasicInfo*     pInfo = &pIndefInfo->binfo;
250,945✔
577
  SExprSupp*          pSup = &pOperator->exprSupp;
250,945✔
578

579
  int32_t order = pInfo->inputTsOrder;
250,945✔
580
  int32_t scanFlag = pBlock->info.scanFlag;
250,945✔
581
  int32_t code = TSDB_CODE_SUCCESS;
250,945✔
582

583
  // there is an scalar expression that needs to be calculated before apply the group aggregation.
584
  SExprSupp* pScalarSup = &pIndefInfo->scalarSup;
250,945✔
585
  if (pScalarSup->pExprInfo != NULL) {
250,945✔
586
    code = projectApplyFunctions(pScalarSup->pExprInfo, pBlock, pBlock, pScalarSup->pCtx, pScalarSup->numOfExprs,
461✔
587
                                 pIndefInfo->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo));
461!
588
    if (code != TSDB_CODE_SUCCESS) {
461!
589
      T_LONG_JMP(pTaskInfo->env, code);
×
590
    }
591
  }
592

593
  code = setInputDataBlock(pSup, pBlock, order, scanFlag, false);
250,945✔
594
  if (code) {
250,946!
595
    T_LONG_JMP(pTaskInfo->env, code);
×
596
  }
597

598
  code = blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
250,946✔
599
  if (code != TSDB_CODE_SUCCESS) {
250,944!
600
    T_LONG_JMP(pTaskInfo->env, code);
×
601
  }
602

603
  code = projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs,
250,944✔
604
                               pIndefInfo->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo));
250,944!
605
  if (code != TSDB_CODE_SUCCESS) {
250,948✔
606
    T_LONG_JMP(pTaskInfo->env, code);
76!
607
  }
608
}
250,872✔
609

610
int32_t doApplyIndefinitFunction(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
66,155✔
611
  QRY_PARAM_CHECK(pResBlock);
66,155!
612
  SIndefOperatorInfo* pIndefInfo = pOperator->info;
66,155✔
613
  SOptrBasicInfo*     pInfo = &pIndefInfo->binfo;
66,155✔
614
  SExprSupp*          pSup = &pOperator->exprSupp;
66,155✔
615
  int64_t             st = 0;
66,155✔
616
  int32_t             code = TSDB_CODE_SUCCESS;
66,155✔
617
  int32_t             lino = 0;
66,155✔
618
  SSDataBlock*        pRes = pInfo->pRes;
66,155✔
619

620
  blockDataCleanup(pRes);
66,155✔
621

622
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
66,157✔
623
  if (pOperator->status == OP_EXEC_DONE) {
66,157✔
624
    return code;
12,890✔
625
  }
626

627
  if (pOperator->cost.openCost == 0) {
53,267✔
628
    st = taosGetTimestampUs();
20,479✔
629
  }
630

631
  SOperatorInfo* downstream = pOperator->pDownstream[0];
53,267✔
632

633
  while (1) {
3,288✔
634
    // here we need to handle the existsed group results
635
    if (pIndefInfo->pNextGroupRes != NULL) {  // todo extract method
56,555✔
636
      for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
128,931✔
637
        SqlFunctionCtx* pCtx = &pSup->pCtx[k];
95,627✔
638

639
        SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
95,627✔
640
        pResInfo->initialized = false;
95,627✔
641
        pCtx->pOutput = NULL;
95,627✔
642
      }
643

644
      doHandleDataBlock(pOperator, pIndefInfo->pNextGroupRes, downstream, pTaskInfo);
33,304✔
645
      pIndefInfo->pNextGroupRes = NULL;
33,304✔
646
    }
647

648
    if (pInfo->pRes->info.rows < pOperator->resultInfo.threshold) {
56,555✔
649
      while (1) {
214,776✔
650
        // The downstream exec may change the value of the newgroup, so use a local variable instead.
651
        SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
271,329✔
652
        if (pBlock == NULL) {
271,330✔
653
          setOperatorCompleted(pOperator);
20,274✔
654
          break;
20,275✔
655
        }
656
        pInfo->pRes->info.scanFlag = pBlock->info.scanFlag;
251,056✔
657

658
        if (pIndefInfo->groupId == 0 && pBlock->info.id.groupId != 0) {
251,056✔
659
          pIndefInfo->groupId = pBlock->info.id.groupId;  // this is the initial group result
2,425✔
660
        } else {
661
          if (pIndefInfo->groupId != pBlock->info.id.groupId) {  // reset output buffer and computing status
248,631✔
662
            pIndefInfo->groupId = pBlock->info.id.groupId;
33,417✔
663
            pIndefInfo->pNextGroupRes = pBlock;
33,417✔
664
            break;
33,417✔
665
          }
666
        }
667

668
        doHandleDataBlock(pOperator, pBlock, downstream, pTaskInfo);
217,639✔
669
        if (pInfo->pRes->info.rows >= pOperator->resultInfo.threshold) {
217,567✔
670
          break;
2,791✔
671
        }
672
      }
673
    }
674

675
    code = doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
56,485✔
676
    QUERY_CHECK_CODE(code, lino, _end);
56,483!
677

678
    size_t rows = pInfo->pRes->info.rows;
56,483✔
679
    if (rows > 0 || pOperator->status == OP_EXEC_DONE) {
56,483✔
680
      break;
681
    } else {
682
      blockDataCleanup(pInfo->pRes);
3,288✔
683
    }
684
  }
685

686
  size_t rows = pInfo->pRes->info.rows;
53,195✔
687
  pOperator->resultInfo.totalRows += rows;
53,195✔
688

689
  if (pOperator->cost.openCost == 0) {
53,195✔
690
    pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
20,406✔
691
  }
692

693
  *pResBlock = (rows > 0) ? pInfo->pRes : NULL;
53,195✔
694

695
_end:
53,195✔
696
  if (code != TSDB_CODE_SUCCESS) {
53,195!
697
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
698
    pTaskInfo->code = code;
×
699
    T_LONG_JMP(pTaskInfo->env, code);
×
700
  }
701
  return code;
53,195✔
702
}
703

704
int32_t initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size) {
450,311✔
705
  int32_t code = TSDB_CODE_SUCCESS;
450,311✔
706
  for (int32_t j = 0; j < size; ++j) {
1,536,379✔
707
    struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(&pCtx[j]);
1,085,986✔
708
    if (isRowEntryInitialized(pResInfo) || fmIsPseudoColumnFunc(pCtx[j].functionId) || pCtx[j].functionId == -1 ||
1,414,182!
709
        fmIsScalarFunc(pCtx[j].functionId)) {
328,212✔
710
      continue;
1,063,020✔
711
    }
712

713
    code = pCtx[j].fpSet.init(&pCtx[j], pCtx[j].resultInfo);
22,968✔
714
    if (code) {
23,048!
715
      return code;
×
716
    }
717
  }
718

719
  return 0;
450,393✔
720
}
721

722
/*
723
 * The start of each column SResultRowEntryInfo is denote by RowCellInfoOffset.
724
 * Note that in case of top/bottom query, the whole multiple rows of result is treated as only one row of results.
725
 * +------------+-----------------result column 1------------+------------------result column 2-----------+
726
 * | SResultRow | SResultRowEntryInfo | intermediate buffer1 | SResultRowEntryInfo | intermediate buffer 2|
727
 * +------------+--------------------------------------------+--------------------------------------------+
728
 *           offset[0]                                  offset[1]                                   offset[2]
729
 */
730
// TODO refactor: some function move away
731
int32_t setFunctionResultOutput(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t stage,
450,204✔
732
                             int32_t numOfExprs) {
733
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
450,204✔
734
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
450,204✔
735
  int32_t*        rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset;
450,204✔
736

737
  SResultRowInfo* pResultRowInfo = &pInfo->resultRowInfo;
450,204✔
738
  initResultRowInfo(pResultRowInfo);
450,204✔
739

740
  int64_t     tid = 0;
450,268✔
741
  int64_t     groupId = 0;
450,268✔
742
  SResultRow* pRow = doSetResultOutBufByKey(pSup->pResultBuf, pResultRowInfo, (char*)&tid, sizeof(tid), true, groupId,
450,268✔
743
                                            pTaskInfo, false, pSup, true);
744
  if (pRow == NULL || pTaskInfo->code != 0) {
450,371!
745
    return pTaskInfo->code;
×
746
  }
747

748
  for (int32_t i = 0; i < numOfExprs; ++i) {
1,536,488✔
749
    struct SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, rowEntryInfoOffset);
1,086,136✔
750
    cleanupResultRowEntry(pEntry);
1,086,117✔
751

752
    pCtx[i].resultInfo = pEntry;
1,086,117✔
753
    pCtx[i].scanFlag = stage;
1,086,117✔
754
  }
755

756
  return initCtxOutputBuffer(pCtx, numOfExprs);
450,352✔
757
}
758

759
int32_t setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols, SArray** pResList) {
450,063✔
760
  QRY_PARAM_CHECK(pResList);
450,063!
761
  SArray* pList = taosArrayInit(4, sizeof(int32_t));
450,063✔
762
  if (pList == NULL) {
450,421✔
763
    return terrno;
1✔
764
  }
765

766
  for (int32_t i = 0; i < numOfCols; ++i) {
1,536,855✔
767
    if (fmIsPseudoColumnFunc(pCtx[i].functionId)) {
1,086,443✔
768
      void* px = taosArrayPush(pList, &i);
×
769
      if (px == NULL) {
×
770
        return terrno;
×
771
      }
772
    }
773
  }
774

775
  *pResList = pList;
450,412✔
776
  return 0;
450,412✔
777
}
778

779
int32_t doGenerateSourceData(SOperatorInfo* pOperator) {
900✔
780
  SProjectOperatorInfo* pProjectInfo = pOperator->info;
900✔
781

782
  SExprSupp*   pSup = &pOperator->exprSupp;
900✔
783
  SSDataBlock* pRes = pProjectInfo->binfo.pRes;
900✔
784
  SExprInfo*   pExpr = pSup->pExprInfo;
900✔
785
  int64_t      st = taosGetTimestampUs();
900✔
786
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
900✔
787

788
  int32_t code = blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);
900✔
789
  if (code) {
900!
790
    return code;
×
791
  }
792

793
  for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
1,812✔
794
    int32_t outputSlotId = pExpr[k].base.resSchema.slotId;
912✔
795

796
    if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) {
912!
797
      SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, outputSlotId);
912✔
798
      if (pColInfoData == NULL) {
912!
799
        return terrno;
×
800
      }
801

802
      int32_t type = pExpr[k].base.pParam[0].param.nType;
912✔
803
      if (TSDB_DATA_TYPE_NULL == type) {
912✔
804
        colDataSetNNULL(pColInfoData, 0, 1);
805
      } else {
806
        code = colDataSetVal(pColInfoData, 0, taosVariantGet(&pExpr[k].base.pParam[0].param, type), false);
890✔
807
        if (code) {
890!
808
          return code;
×
809
        }
810
      }
811
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) {
×
812
      SqlFunctionCtx* pfCtx = &pSup->pCtx[k];
×
813

814
      // UDF scalar functions will be calculated here, for example, select foo(n) from (select 1 n).
815
      // UDF aggregate functions will be handled in agg operator.
816
      if (fmIsScalarFunc(pfCtx->functionId)) {
×
817
        SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
×
818
        if (pBlockList == NULL) {
×
819
          return terrno;
×
820
        }
821

822
        void* px = taosArrayPush(pBlockList, &pRes);
×
823
        if (px == NULL) {
×
824
          return terrno;
×
825
        }
826

827
        SColumnInfoData* pResColData = taosArrayGet(pRes->pDataBlock, outputSlotId);
×
828
        if (pResColData == NULL) {
×
829
          return terrno;
×
830
        }
831

832
        SColumnInfoData  idata = {.info = pResColData->info, .hasNull = true};
×
833

834
        SScalarParam dest = {.columnData = &idata};
×
835
        code = scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest, GET_STM_RTINFO(pOperator->pTaskInfo), NULL);
×
836
        if (code != TSDB_CODE_SUCCESS) {
×
837
          taosArrayDestroy(pBlockList);
×
838
          return code;
×
839
        }
840

841
        int32_t startOffset = pRes->info.rows;
×
842
        if (pRes->info.capacity <= 0) {
×
843
          qError("project failed at: %s:%d", __func__, __LINE__);
×
844
          return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
845
        }
846
        code = colDataAssign(pResColData, &idata, dest.numOfRows, &pRes->info);
×
847
        if (code) {
×
848
          return code;
×
849
        }
850

851
        colDataDestroy(&idata);
×
852
        taosArrayDestroy(pBlockList);
×
853
      } else {
854
        return TSDB_CODE_OPS_NOT_SUPPORT;
×
855
      }
856
    } else {
857
      return TSDB_CODE_OPS_NOT_SUPPORT;
×
858
    }
859
  }
860

861
  pRes->info.rows = 1;
900✔
862
  code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
900✔
863
  if (code) {
900!
864
    pTaskInfo->code = code;
×
865
    return code;
×
866
  }
867

868
  (void) doIngroupLimitOffset(&pProjectInfo->limitInfo, 0, pRes, pOperator);
900✔
869

870
  pOperator->resultInfo.totalRows += pRes->info.rows;
900✔
871

872
  setOperatorCompleted(pOperator);
900✔
873
  if (pOperator->cost.openCost == 0) {
900!
874
    pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
900✔
875
  }
876

877
  return code;
900✔
878
}
879

880
static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, SArray* pPseudoList) {
2,208,839✔
881
  size_t num = (pPseudoList != NULL) ? taosArrayGetSize(pPseudoList) : 0;
2,208,839✔
882
  for (int32_t i = 0; i < num; ++i) {
2,209,557!
883
    pCtx[i].pOutput = taosArrayGet(pResult->pDataBlock, i);
×
884
    if (pCtx[i].pOutput == NULL) {
×
885
      qError("failed to get the output buf, ptr is null");
×
886
    }
887
  }
888
}
2,209,557✔
889

890
int32_t projectApplyFunctionsWithSelect(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock,
2,208,904✔
891
                                        SqlFunctionCtx* pCtx, int32_t numOfOutput, SArray* pPseudoList,
892
                                        const void* pExtraParams, bool doSelectFunc) {
893
  int32_t lino = 0;
2,208,904✔
894
  int32_t code = TSDB_CODE_SUCCESS;
2,208,904✔
895
  setPseudoOutputColInfo(pResult, pCtx, pPseudoList);
2,208,904✔
896
  pResult->info.dataLoad = 1;
2,209,667✔
897

898
  SArray* processByRowFunctionCtx = NULL;
2,209,667✔
899
  if (pSrcBlock == NULL) {
2,209,667!
900
    for (int32_t k = 0; k < numOfOutput; ++k) {
×
901
      int32_t outputSlotId = pExpr[k].base.resSchema.slotId;
×
902

903
      if (pExpr[k].pExpr->nodeType != QUERY_NODE_VALUE) {
×
904
        qError("project failed at: %s:%d", __func__, __LINE__);
×
905
        code = TSDB_CODE_INVALID_PARA;
×
906
        TSDB_CHECK_CODE(code, lino, _exit);
×
907
      }
908
      SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
×
909
      if (pColInfoData == NULL) {
×
910
        code = terrno;
×
911
        TSDB_CHECK_CODE(code, lino, _exit);
×
912
      }
913

914
      int32_t type = pExpr[k].base.pParam[0].param.nType;
×
915
      if (TSDB_DATA_TYPE_NULL == type) {
×
916
        colDataSetNNULL(pColInfoData, 0, 1);
917
      } else {
918
        code = colDataSetVal(pColInfoData, 0, taosVariantGet(&pExpr[k].base.pParam[0].param, type), false);
×
919
        TSDB_CHECK_CODE(code, lino, _exit);
×
920
      }
921
    }
922

923
    pResult->info.rows = 1;
×
924
    goto _exit;
×
925
  }
926

927
  if (pResult != pSrcBlock) {
2,209,667✔
928
    pResult->info.id.groupId = pSrcBlock->info.id.groupId;
1,465,474✔
929
    memcpy(pResult->info.parTbName, pSrcBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
1,465,474✔
930
    qTrace("%s, parName:%s,groupId:%" PRIu64, __FUNCTION__, pSrcBlock->info.parTbName, pResult->info.id.groupId);
1,465,474!
931
  }
932

933
  // if the source equals to the destination, it is to create a new column as the result of scalar
934
  // function or some operators.
935
  bool createNewColModel = (pResult == pSrcBlock);
2,208,743✔
936
  if (createNewColModel) {
2,208,743✔
937
    code = blockDataEnsureCapacity(pResult, pResult->info.rows);
743,592✔
938
    if (code) {
743,667!
939
      TSDB_CHECK_CODE(code, lino, _exit);
×
940
    }
941
  }
942

943
  int32_t numOfRows = 0;
2,208,818✔
944

945
  for (int32_t k = 0; k < numOfOutput; ++k) {
6,598,003✔
946
    int32_t               outputSlotId = pExpr[k].base.resSchema.slotId;
5,475,843✔
947
    SqlFunctionCtx*       pfCtx = &pCtx[k];
5,475,843✔
948
    SInputColumnInfoData* pInputData = &pfCtx->input;
5,475,843✔
949

950
    if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) {  // it is a project query
5,475,843✔
951
      SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
2,738,273✔
952
      if (pColInfoData == NULL) {
2,738,242✔
953
        code = terrno;
32✔
954
        TSDB_CHECK_CODE(code, lino, _exit);
79!
955
      }
956

957
      if (pResult->info.rows > 0 && !createNewColModel) {
2,738,210!
958
        int32_t ret = 0;
×
959

960
        if (pInputData->pData[0] == NULL) {
×
961
          int32_t slotId = pfCtx->param[0].pCol->slotId;
×
962

963
          SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId);
×
964
          if (pInput == NULL) {
×
965
            code = terrno;
×
966
            TSDB_CHECK_CODE(code, lino, _exit);
×
967
          }
968

969
          ret = colDataMergeCol(pColInfoData, pResult->info.rows, (int32_t*)&pResult->info.capacity, pInput,
×
970
                                pSrcBlock->info.rows);
×
971
        } else {
972
          ret = colDataMergeCol(pColInfoData, pResult->info.rows, (int32_t*)&pResult->info.capacity,
×
973
                                pInputData->pData[0], pInputData->numOfRows);
×
974
        }
975

976
        if (ret < 0) {
×
977
          code = ret;
×
978
        }
979

980
        TSDB_CHECK_CODE(code, lino, _exit);
×
981
      } else {
982
        if (pInputData->pData[0] == NULL) {
2,738,210!
983
          int32_t slotId = pfCtx->param[0].pCol->slotId;
×
984

985
          SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId);
×
986
          if (pInput == NULL) {
×
987
            code = terrno;
×
988
            TSDB_CHECK_CODE(code, lino, _exit);
×
989
          }
990

991
          code = colDataAssign(pColInfoData, pInput, pSrcBlock->info.rows, &pResult->info);
×
992
          numOfRows = pSrcBlock->info.rows;
×
993
        } else {
994
          code = colDataAssign(pColInfoData, pInputData->pData[0], pInputData->numOfRows, &pResult->info);
2,738,210✔
995
          numOfRows = pInputData->numOfRows;
2,738,101✔
996
        }
997

998
        TSDB_CHECK_CODE(code, lino, _exit);
2,738,101!
999
      }
1000
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) {
2,737,570✔
1001
      SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
101,628✔
1002
      if (pColInfoData == NULL) {
101,648!
1003
        code = terrno;
×
1004
        TSDB_CHECK_CODE(code, lino, _exit);
×
1005
      }
1006

1007
      int32_t offset = createNewColModel ? 0 : pResult->info.rows;
101,652!
1008

1009
      int32_t type = pExpr[k].base.pParam[0].param.nType;
101,652✔
1010
      if (TSDB_DATA_TYPE_NULL == type) {
101,652✔
1011
        colDataSetNNULL(pColInfoData, offset, pSrcBlock->info.rows);
1,539✔
1012
      } else {
1013
        char* p = taosVariantGet(&pExpr[k].base.pParam[0].param, type);
100,113✔
1014
        for (int32_t i = 0; i < pSrcBlock->info.rows; ++i) {
245,369,019!
1015
          code = colDataSetVal(pColInfoData, i + offset, p, false);
246,356,368✔
1016
          TSDB_CHECK_CODE(code, lino, _exit);
245,268,923!
1017
        }
1018
      }
1019

1020
      numOfRows = pSrcBlock->info.rows;
×
1021
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_OPERATOR) {
2,635,942✔
1022
      SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
333,451✔
1023
      if (pBlockList == NULL) {
333,550!
1024
        code = terrno;
×
1025
        TSDB_CHECK_CODE(code, lino, _exit);
15!
1026
      }
1027

1028
      void* px = taosArrayPush(pBlockList, &pSrcBlock);
333,520✔
1029
      if (px == NULL) {
333,520!
1030
        code = terrno;
×
1031
        taosArrayDestroy(pBlockList);
×
1032
        TSDB_CHECK_CODE(code, lino, _exit);
×
1033
      }
1034

1035
      SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId);
333,520✔
1036
      if (pResColData == NULL) {
333,517!
1037
        code = terrno;
×
1038
        taosArrayDestroy(pBlockList);
×
1039
        TSDB_CHECK_CODE(code, lino, _exit);
×
1040
      }
1041

1042
      SColumnInfoData idata = {.info = pResColData->info, .hasNull = true};
333,517✔
1043

1044
      SScalarParam dest = {.columnData = &idata};
333,517✔
1045
      code = scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest, pExtraParams, NULL);
333,517✔
1046
      if (code != TSDB_CODE_SUCCESS) {
333,466✔
1047
        taosArrayDestroy(pBlockList);
50✔
1048
        TSDB_CHECK_CODE(code, lino, _exit);
15!
1049
      }
1050

1051
      int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
333,416✔
1052
      if (pResult->info.capacity <= 0) {
333,416!
1053
        qError("project failed at: %s:%d", __func__, __LINE__);
×
1054
        code = TSDB_CODE_INVALID_PARA;
×
1055
        TSDB_CHECK_CODE(code, lino, _exit);
×
1056
      }
1057

1058
      int32_t ret =
1059
          colDataMergeCol(pResColData, startOffset, (int32_t*)&pResult->info.capacity, &idata, dest.numOfRows);
333,416✔
1060
      if (ret < 0) {
333,352!
1061
        code = ret;
×
1062
      }
1063

1064
      colDataDestroy(&idata);
333,352✔
1065
      TSDB_CHECK_CODE(code, lino, _exit);
333,429!
1066

1067
      numOfRows = dest.numOfRows;
333,429✔
1068
      taosArrayDestroy(pBlockList);
333,429✔
1069
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) {
2,302,491✔
1070
      // _rowts/_c0, not tbname column
1071
      if (fmIsPseudoColumnFunc(pfCtx->functionId) && (!fmIsScanPseudoColumnFunc(pfCtx->functionId)) &&
2,302,329!
1072
          !fmIsPlaceHolderFunc(pfCtx->functionId)) {
×
1073
        if (fmIsGroupIdFunc(pfCtx->functionId)) {
×
1074
          SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
×
1075
          TSDB_CHECK_NULL(pColInfoData, code, lino, _exit, terrno);
×
1076
          code = colDataSetVal(pColInfoData, pResult->info.rows, (const char*)&pSrcBlock->info.id.groupId, false);
×
1077
          TSDB_CHECK_CODE(code, lino, _exit);
×
1078
        }
1079
      } else if (fmIsIndefiniteRowsFunc(pfCtx->functionId)) {
2,302,506✔
1080
        SResultRowEntryInfo* pResInfo = GET_RES_INFO(pfCtx);
251,247✔
1081
        code = pfCtx->fpSet.init(pfCtx, pResInfo);
251,247✔
1082
        TSDB_CHECK_CODE(code, lino, _exit);
251,248!
1083
        pfCtx->pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId);
251,248✔
1084
        if (pfCtx->pOutput == NULL) {
251,248✔
1085
          code = terrno;
1✔
1086
          TSDB_CHECK_CODE(code, lino, _exit);
×
1087
        }
1088

1089
        pfCtx->offset = createNewColModel ? 0 : pResult->info.rows;  // set the start offset
251,247!
1090

1091
        // set the timestamp(_rowts) output buffer
1092
        if (taosArrayGetSize(pPseudoList) > 0) {
251,247!
1093
          int32_t* outputColIndex = taosArrayGet(pPseudoList, 0);
×
1094
          if (outputColIndex == NULL) {
×
1095
            code = terrno;
×
1096
            TSDB_CHECK_CODE(code, lino, _exit);
×
1097
          }
1098

1099
          pfCtx->pTsOutput = (SColumnInfoData*)pCtx[*outputColIndex].pOutput;
×
1100
        }
1101

1102
        // link pDstBlock to set selectivity value
1103
        if (pfCtx->subsidiaries.num > 0) {
251,247✔
1104
          pfCtx->pDstBlock = pResult;
225,739✔
1105
        }
1106

1107
        code = pfCtx->fpSet.process(pfCtx);
251,247✔
1108
        if (code != TSDB_CODE_SUCCESS) {
251,248✔
1109
          if (pCtx[k].fpSet.cleanup != NULL) {
36!
1110
            pCtx[k].fpSet.cleanup(&pCtx[k]);
×
1111
          }
1112
          TSDB_CHECK_CODE(code, lino, _exit);
36!
1113
        }
1114

1115
        numOfRows = pResInfo->numOfRes;
251,212✔
1116
        if (fmIsProcessByRowFunc(pfCtx->functionId)) {
251,212✔
1117
          if (NULL == processByRowFunctionCtx) {
231,846✔
1118
            processByRowFunctionCtx = taosArrayInit(1, sizeof(SqlFunctionCtx*));
231,653✔
1119
            if (!processByRowFunctionCtx) {
231,653!
1120
              code = terrno;
×
1121
              TSDB_CHECK_CODE(code, lino, _exit);
×
1122
            }
1123
          }
1124

1125
          void* px = taosArrayPush(processByRowFunctionCtx, &pfCtx);
231,863✔
1126
          if (px == NULL) {
231,863!
1127
            code = terrno;
×
1128
            TSDB_CHECK_CODE(code, lino, _exit);
×
1129
          }
1130
        }
1131
      } else if (fmIsAggFunc(pfCtx->functionId)) {
2,050,367✔
1132
        // selective value output should be set during corresponding function execution
1133
        if (!doSelectFunc && fmIsSelectValueFunc(pfCtx->functionId)) {
447,234!
1134
          continue;
226,255✔
1135
        }
1136
        // _group_key function for "partition by tbname" + csum(col_name) query
1137
        SColumnInfoData* pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId);
220,979✔
1138
        if (pOutput == NULL) {
220,979!
1139
          code = terrno;
×
1140
          TSDB_CHECK_CODE(code, lino, _exit);
×
1141
        }
1142

1143
        int32_t slotId = pfCtx->param[0].pCol->slotId;
220,979✔
1144

1145
        // todo handle the json tag
1146
        SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId);
220,979✔
1147
        if (pInput == NULL) {
220,979!
1148
          code = terrno;
×
1149
          TSDB_CHECK_CODE(code, lino, _exit);
×
1150
        }
1151

1152
        for (int32_t f = 0; f < pSrcBlock->info.rows; ++f) {
15,955,671✔
1153
          bool isNull = colDataIsNull_s(pInput, f);
15,734,692✔
1154
          if (isNull) {
15,734,692✔
1155
            colDataSetNULL(pOutput, pResult->info.rows + f);
1,551,690✔
1156
          } else {
1157
            char* data = colDataGetData(pInput, f);
14,183,002!
1158
            code = colDataSetVal(pOutput, pResult->info.rows + f, data, isNull);
14,183,002✔
1159
            TSDB_CHECK_CODE(code, lino, _exit);
14,183,002!
1160
          }
1161
        }
1162

1163
      } else {
1164
        SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
1,602,480✔
1165
        if (pBlockList == NULL) {
1,604,733!
1166
          code = terrno;
×
1167
          TSDB_CHECK_CODE(code, lino, _exit);
28!
1168
        }
1169

1170
        void* px = taosArrayPush(pBlockList, &pSrcBlock);
1,603,999✔
1171
        if (px == NULL) {
1,603,999!
1172
          code = terrno;
×
1173
          TSDB_CHECK_CODE(code, lino, _exit);
×
1174
        }
1175

1176
        SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId);
1,603,999✔
1177
        if (pResColData == NULL) {
1,603,832!
1178
          taosArrayDestroy(pBlockList);
×
1179
          code = terrno;
×
1180
          TSDB_CHECK_CODE(code, lino, _exit);
×
1181
        }
1182

1183
        SColumnInfoData idata = {.info = pResColData->info, .hasNull = true};
1,603,832✔
1184

1185
        SScalarParam dest = {.columnData = &idata};
1,603,832✔
1186
        code = scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest, pExtraParams, NULL);
1,603,832✔
1187
        if (code != TSDB_CODE_SUCCESS) {
1,603,017✔
1188
          taosArrayDestroy(pBlockList);
223✔
1189
          TSDB_CHECK_CODE(code, lino, _exit);
28!
1190
        }
1191

1192
        int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
1,602,794✔
1193
        if (pResult->info.capacity <= 0) {
1,602,794!
1194
          qError("project failed at: %s:%d", __func__, __LINE__);
×
1195
          code = TSDB_CODE_INVALID_PARA;
×
1196
          TSDB_CHECK_CODE(code, lino, _exit);
×
1197
        }
1198
        int32_t ret =
1199
            colDataMergeCol(pResColData, startOffset, (int32_t*)&pResult->info.capacity, &idata, dest.numOfRows);
1,602,794✔
1200
        if (ret < 0) {
1,602,657!
1201
          code = ret;
×
1202
        }
1203

1204
        colDataDestroy(&idata);
1,602,657✔
1205

1206
        numOfRows = dest.numOfRows;
1,604,637✔
1207
        taosArrayDestroy(pBlockList);
1,604,637✔
1208
        TSDB_CHECK_CODE(code, lino, _exit);
1,605,004!
1209
      }
1210
    } else {
1211
      return TSDB_CODE_OPS_NOT_SUPPORT;
162✔
1212
    }
1213
  }
1214

1215
  if (processByRowFunctionCtx && taosArrayGetSize(processByRowFunctionCtx) > 0) {
1,122,160!
1216
    SqlFunctionCtx** pfCtx = taosArrayGet(processByRowFunctionCtx, 0);
×
1217
    if (pfCtx == NULL) {
231,653!
1218
      code = terrno;
×
1219
      TSDB_CHECK_CODE(code, lino, _exit);
×
1220
    }
1221

1222
    code = (*pfCtx)->fpSet.processFuncByRow(processByRowFunctionCtx);
231,653✔
1223
    TSDB_CHECK_CODE(code, lino, _exit);
231,653✔
1224
    numOfRows = (*pfCtx)->resultInfo->numOfRes;
231,613✔
1225
  }
1226

1227
  if (!createNewColModel) {
2,209,326✔
1228
    pResult->info.rows += numOfRows;
1,465,705✔
1229
  }
1230

1231
_exit:
743,621✔
1232
  if (processByRowFunctionCtx) {
2,209,445✔
1233
    taosArrayDestroy(processByRowFunctionCtx);
231,653✔
1234
  }
1235
  if (code) {
2,209,445✔
1236
    qError("project apply functions failed at: %s:%d", __func__, lino);
119!
1237
  }
1238
  return code;
2,209,414✔
1239
}
1240

1241
int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
2,209,224✔
1242
                              int32_t numOfOutput, SArray* pPseudoList, const void* pExtraParams) {
1243
  return projectApplyFunctionsWithSelect(pExpr, pResult, pSrcBlock, pCtx, numOfOutput, pPseudoList, pExtraParams, false);
2,209,224✔
1244
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc