• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4513

17 Jul 2025 02:02AM UTC coverage: 31.359% (-31.1%) from 62.446%
#4513

push

travis-ci

web-flow
Merge pull request #31914 from taosdata/fix/3.0/compare-ans-failed

fix:Convert line endings from LF to CRLF for ans file

68541 of 301034 branches covered (22.77%)

Branch coverage included in aggregate %.

117356 of 291771 relevant lines covered (40.22%)

602262.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

50.84
/source/libs/executor/src/projectoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "functionMgt.h"
19
#include "operator.h"
20
#include "querytask.h"
21
#include "taoserror.h"
22
#include "tdatablock.h"
23

24
typedef struct SProjectOperatorInfo {
25
  SOptrBasicInfo binfo;
26
  SAggSupporter  aggSup;
27
  SArray*        pPseudoColInfo;
28
  SLimitInfo     limitInfo;
29
  bool           mergeDataBlocks;
30
  SSDataBlock*   pFinalRes;
31
  bool           inputIgnoreGroup;
32
  bool           outputIgnoreGroup;
33
} SProjectOperatorInfo;
34

35
typedef struct SIndefOperatorInfo {
36
  SOptrBasicInfo binfo;
37
  SAggSupporter  aggSup;
38
  SArray*        pPseudoColInfo;
39
  SExprSupp      scalarSup;
40
  uint64_t       groupId;
41
  SSDataBlock*   pNextGroupRes;
42
} SIndefOperatorInfo;
43

44
static int32_t      doGenerateSourceData(SOperatorInfo* pOperator);
45
static int32_t      doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
46
static int32_t      doApplyIndefinitFunction(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
47
static int32_t      setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols, SArray** pResList);
48
static int32_t      setFunctionResultOutput(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SAggSupporter* pSup,
49
                                            int32_t stage, int32_t numOfExprs);
50

51
static void destroyProjectOperatorInfo(void* param) {
2,275✔
52
  if (NULL == param) {
2,275!
53
    return;
×
54
  }
55

56
  SProjectOperatorInfo* pInfo = (SProjectOperatorInfo*)param;
2,275✔
57
  cleanupBasicInfo(&pInfo->binfo);
2,275✔
58
  cleanupAggSup(&pInfo->aggSup);
2,275✔
59
  taosArrayDestroy(pInfo->pPseudoColInfo);
2,275✔
60

61
  blockDataDestroy(pInfo->pFinalRes);
2,275✔
62
  taosMemoryFreeClear(param);
2,275!
63
}
64

65
static void destroyIndefinitOperatorInfo(void* param) {
156✔
66
  SIndefOperatorInfo* pInfo = (SIndefOperatorInfo*)param;
156✔
67
  if (pInfo == NULL) {
156!
68
    return;
×
69
  }
70

71
  cleanupBasicInfo(&pInfo->binfo);
156✔
72
  taosArrayDestroy(pInfo->pPseudoColInfo);
156✔
73
  cleanupAggSup(&pInfo->aggSup);
156✔
74
  cleanupExprSupp(&pInfo->scalarSup);
156✔
75

76
  taosMemoryFreeClear(param);
156!
77
}
78

79
void streamOperatorReleaseState(SOperatorInfo* pOperator) {
×
80
  SOperatorInfo* downstream = pOperator->pDownstream[0];
×
81
  if (downstream->fpSet.releaseStreamStateFn) {
×
82
    downstream->fpSet.releaseStreamStateFn(downstream);
×
83
  }
84
}
×
85

86
void streamOperatorReloadState(SOperatorInfo* pOperator) {
×
87
  SOperatorInfo* downstream = pOperator->pDownstream[0];
×
88
  if (downstream->fpSet.reloadStreamStateFn) {
×
89
    downstream->fpSet.reloadStreamStateFn(downstream);
×
90
  }
91
}
×
92

93
static int32_t resetProjectOperState(SOperatorInfo* pOper) {
×
94
  SProjectOperatorInfo* pProject = pOper->info;
×
95
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
×
96
  pOper->status = OP_NOT_OPENED;
×
97

98
  resetBasicOperatorState(&pProject->binfo);
×
99
  SProjectPhysiNode* pPhynode = (SProjectPhysiNode*)pOper->pPhyNode;
×
100

101
  pProject->limitInfo = (SLimitInfo){0};
×
102
  initLimitInfo(pPhynode->node.pLimit, pPhynode->node.pSlimit, &pProject->limitInfo);
×
103

104
  blockDataCleanup(pProject->pFinalRes);
×
105

106
  int32_t code = resetAggSup(&pOper->exprSupp, &pProject->aggSup, pTaskInfo, pPhynode->pProjections, NULL,
×
107
    sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
×
108
    &pTaskInfo->storageAPI.functionStore);
109
  if (code == 0){
×
110
    code = setFunctionResultOutput(pOper, &pProject->binfo, &pProject->aggSup, MAIN_SCAN, pOper->exprSupp.numOfExprs);
×
111
  }
112
  return 0;
×
113
}
114

115
int32_t createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, SExecTaskInfo* pTaskInfo,
2,274✔
116
                                  SOperatorInfo** pOptrInfo) {
117
  QRY_PARAM_CHECK(pOptrInfo);
2,274!
118

119
  int32_t code = TSDB_CODE_SUCCESS;
2,274✔
120
  SProjectOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SProjectOperatorInfo));
2,274!
121
  SOperatorInfo*        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2,275!
122
  if (pInfo == NULL || pOperator == NULL) {
2,274!
123
    code = terrno;
×
124
    goto _error;
×
125
  }
126

127
  pOperator->pPhyNode = pProjPhyNode;
2,274✔
128
  pOperator->exprSupp.hasWindowOrGroup = false;
2,274✔
129
  pOperator->pTaskInfo = pTaskInfo;
2,274✔
130

131
  int32_t    lino = 0;
2,274✔
132

133
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pProjPhyNode->node.pOutputDataBlockDesc);
2,274✔
134
  TSDB_CHECK_NULL(pResBlock, code, lino, _error, terrno);
2,275!
135

136
  initLimitInfo(pProjPhyNode->node.pLimit, pProjPhyNode->node.pSlimit, &pInfo->limitInfo);
2,275✔
137

138
  pInfo->binfo.pRes = pResBlock;
2,274✔
139
  pInfo->pFinalRes = NULL;
2,274✔
140

141
  code = createOneDataBlock(pResBlock, false, &pInfo->pFinalRes);
2,274✔
142
  TSDB_CHECK_CODE(code, lino, _error);
2,275!
143

144
  pInfo->binfo.inputTsOrder = pProjPhyNode->node.inputTsOrder;
2,275✔
145
  pInfo->binfo.outputTsOrder = pProjPhyNode->node.outputTsOrder;
2,275✔
146
  pInfo->inputIgnoreGroup = pProjPhyNode->inputIgnoreGroup;
2,275✔
147
  pInfo->outputIgnoreGroup = pProjPhyNode->ignoreGroupId;
2,275✔
148

149
  if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {
2,275✔
150
    pInfo->mergeDataBlocks = false;
5✔
151
  } else {
152
    if (!pProjPhyNode->ignoreGroupId) {
2,270✔
153
      pInfo->mergeDataBlocks = false;
6✔
154
    } else {
155
      pInfo->mergeDataBlocks = pProjPhyNode->mergeDataBlock;
2,264✔
156
    }
157
  }
158

159
  int32_t numOfRows = 4096;
2,275✔
160
  size_t  keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
2,275✔
161

162
  // Make sure the size of SSDataBlock will never exceed the size of 2MB.
163
  int32_t TWOMB = 2 * 1024 * 1024;
2,275✔
164
  if (numOfRows * pResBlock->info.rowSize > TWOMB) {
2,275✔
165
    numOfRows = TWOMB / pResBlock->info.rowSize;
36✔
166
  }
167

168
  initResultSizeInfo(&pOperator->resultInfo, numOfRows);
2,275✔
169
  
170
  int32_t    numOfCols = 0;
2,275✔
171
  SExprInfo* pExprInfo = NULL;
2,275✔
172
  code = createExprInfo(pProjPhyNode->pProjections, NULL, &pExprInfo, &numOfCols);
2,275✔
173
  TSDB_CHECK_CODE(code, lino, _error);
2,275!
174
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
2,275✔
175
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
2,275✔
176
  TSDB_CHECK_CODE(code, lino, _error);
2,275!
177

178
  initBasicInfo(&pInfo->binfo, pResBlock);
2,275✔
179
  code = setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfCols);
2,274✔
180
  TSDB_CHECK_CODE(code, lino, _error);
2,273!
181

182
  code = filterInitFromNode((SNode*)pProjPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
2,273✔
183
                            pTaskInfo->pStreamRuntimeInfo);
2,273✔
184
  TSDB_CHECK_CODE(code, lino, _error);
2,274!
185

186
  code = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfCols, &pInfo->pPseudoColInfo);
2,274✔
187
  TSDB_CHECK_CODE(code, lino, _error);
2,275!
188

189
  setOperatorInfo(pOperator, "ProjectOperator", QUERY_NODE_PHYSICAL_PLAN_PROJECT, false, OP_NOT_OPENED, pInfo,
2,275✔
190
                  pTaskInfo);
191
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doProjectOperation, NULL, destroyProjectOperatorInfo,
2,274✔
192
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
193
  setOperatorStreamStateFn(pOperator, streamOperatorReleaseState, streamOperatorReloadState);
2,274✔
194
  setOperatorResetStateFn(pOperator, resetProjectOperState);
2,275✔
195

196
  if (NULL != downstream) {
2,274!
197
    code = appendDownstream(pOperator, &downstream, 1);
2,275✔
198
    if (code != TSDB_CODE_SUCCESS) {
2,275!
199
      goto _error;
×
200
    }
201
  }
202

203
  *pOptrInfo = pOperator;
2,274✔
204
  return TSDB_CODE_SUCCESS;
2,274✔
205

206
_error:
×
207
  if (pInfo != NULL) destroyProjectOperatorInfo(pInfo);
×
208
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
209
  pTaskInfo->code = code;
×
210
  return code;
×
211
}
212

213
static int32_t discardGroupDataBlock(SSDataBlock* pBlock, SLimitInfo* pLimitInfo) {
3,101✔
214
  if (pLimitInfo->remainGroupOffset > 0) {
3,101!
215
    // it is the first group
216
    if (pLimitInfo->currentGroupId == 0 || pLimitInfo->currentGroupId == pBlock->info.id.groupId) {
×
217
      pLimitInfo->currentGroupId = pBlock->info.id.groupId;
×
218
      return PROJECT_RETRIEVE_CONTINUE;
×
219
    } else if (pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
×
220
      // now it is the data from a new group
221
      pLimitInfo->remainGroupOffset -= 1;
×
222
      pLimitInfo->currentGroupId = pBlock->info.id.groupId;
×
223

224
      // ignore data block in current group
225
      if (pLimitInfo->remainGroupOffset > 0) {
×
226
        return PROJECT_RETRIEVE_CONTINUE;
×
227
      }
228

229
      pLimitInfo->currentGroupId = 0;
×
230
    }
231
  }
232

233
  return PROJECT_RETRIEVE_DONE;
3,101✔
234
}
235

236
static int32_t setInfoForNewGroup(SSDataBlock* pBlock, SLimitInfo* pLimitInfo, SOperatorInfo* pOperator) {
3,101✔
237
  // remainGroupOffset == 0
238
  // here check for a new group data, we need to handle the data of the previous group.
239
  if (!(pLimitInfo->remainGroupOffset == 0 || pLimitInfo->remainGroupOffset == -1)) {
3,101!
240
    qError("project failed at: %s:%d", __func__, __LINE__);
×
241
    return TSDB_CODE_INVALID_PARA;
×
242
  }
243

244
  bool newGroup = false;
3,101✔
245
  if (0 == pBlock->info.id.groupId) {
3,101✔
246
    pLimitInfo->numOfOutputGroups = 1;
3,099✔
247
  } else if (pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
2!
248
    pLimitInfo->currentGroupId = pBlock->info.id.groupId;
2✔
249
    pLimitInfo->numOfOutputGroups += 1;
2✔
250
    newGroup = true;
2✔
251
  } else {
252
    return PROJECT_RETRIEVE_CONTINUE;
×
253
  }
254

255
  if ((pLimitInfo->slimit.limit >= 0) && (pLimitInfo->slimit.limit < pLimitInfo->numOfOutputGroups)) {
3,101!
256
    setOperatorCompleted(pOperator);
×
257
    return PROJECT_RETRIEVE_DONE;
×
258
  }
259

260
  // reset the value for a new group data
261
  // existing rows that belongs to previous group.
262
  if (newGroup) {
3,101✔
263
    resetLimitInfoForNextGroup(pLimitInfo);
2✔
264
  }
265

266
  return PROJECT_RETRIEVE_CONTINUE;
3,101✔
267
}
268

269
// todo refactor
270
static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SSDataBlock* pBlock,
3,101✔
271
                                    SOperatorInfo* pOperator) {
272
  // set current group id
273
  pLimitInfo->currentGroupId = groupId;
3,101✔
274
  bool limitReached = applyLimitOffset(pLimitInfo, pBlock, pOperator->pTaskInfo);
3,101✔
275
  if (pBlock->info.rows == 0 && 0 != pLimitInfo->limit.limit) {
3,101!
276
    return PROJECT_RETRIEVE_CONTINUE;
×
277
  } else {
278
    if (limitReached && (pLimitInfo->slimit.limit >= 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
3,101!
279
      setOperatorCompleted(pOperator);
×
280
    } else if (limitReached && groupId == 0) {
3,101!
281
      setOperatorCompleted(pOperator);
8✔
282
    }
283
  }
284

285
  return PROJECT_RETRIEVE_DONE;
3,101✔
286
}
287

288
int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
4,610✔
289
  QRY_PARAM_CHECK(pResBlock);
4,610!
290

291
  SProjectOperatorInfo* pProjectInfo = pOperator->info;
4,610✔
292
  SOptrBasicInfo*       pInfo = &pProjectInfo->binfo;
4,610✔
293
  SExprSupp*            pSup = &pOperator->exprSupp;
4,610✔
294
  SSDataBlock*          pRes = pInfo->pRes;
4,610✔
295
  SSDataBlock*          pFinalRes = pProjectInfo->pFinalRes;
4,610✔
296
  int32_t               code = 0;
4,610✔
297
  int32_t               lino = 0;
4,610✔
298
  int64_t               st = 0;
4,610✔
299
  int32_t               order = pInfo->inputTsOrder;
4,610✔
300
  int32_t               scanFlag = 0;
4,610✔
301

302
  blockDataCleanup(pFinalRes);
4,610✔
303
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
4,610✔
304

305
  if (pOperator->status == OP_EXEC_DONE) {
4,610✔
306
    return code;
1,331✔
307
  }
308

309
  if (pOperator->cost.openCost == 0) {
3,279✔
310
    st = taosGetTimestampUs();
2,271✔
311
  }
312

313
  SOperatorInfo* downstream = pOperator->numOfDownstream > 0 ? pOperator->pDownstream[0] : NULL;
3,279!
314
  SLimitInfo*    pLimitInfo = &pProjectInfo->limitInfo;
3,279✔
315

316
  if (downstream == NULL) {
3,279!
317
    code = doGenerateSourceData(pOperator);
×
318
    QUERY_CHECK_CODE(code, lino, _end);
×
319

320
    if (pProjectInfo->outputIgnoreGroup) {
×
321
      pRes->info.id.groupId = 0;
×
322
    }
323

324
    *pResBlock = (pRes->info.rows > 0)? pRes:NULL;
×
325
    return code;
×
326
  }
327

328
  while (1) {
2,083✔
329
    while (1) {
×
330
      blockDataCleanup(pRes);
5,362✔
331

332
      // The downstream exec may change the value of the newgroup, so use a local variable instead.
333
      SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
5,362✔
334
      if (pBlock == NULL) {
5,362✔
335
        qDebug("set op close, exec %d, status %d rows %" PRId64 , pTaskInfo->execModel, pOperator->status, pFinalRes->info.rows);
2,261!
336
        setOperatorCompleted(pOperator);
2,261✔
337
        break;
2,261✔
338
      }
339
//      if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {
340
//        qDebug("set status recv");
341
//        pOperator->status = OP_EXEC_RECV;
342
//      }
343

344
      // for stream interval
345
      if (pBlock->info.type == STREAM_RETRIEVE || pBlock->info.type == STREAM_DELETE_RESULT ||
3,101!
346
          pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_CREATE_CHILD_TABLE ||
3,101!
347
          pBlock->info.type == STREAM_CHECKPOINT || pBlock->info.type == STREAM_NOTIFY_EVENT) {
3,101!
348

349
        *pResBlock = pBlock;
×
350
        return code;
×
351
      }
352

353
      if (pProjectInfo->inputIgnoreGroup) {
3,101!
354
        pBlock->info.id.groupId = 0;
×
355
      }
356

357
      int32_t status = discardGroupDataBlock(pBlock, pLimitInfo);
3,101✔
358
      if (status == PROJECT_RETRIEVE_CONTINUE) {
3,101!
359
        continue;
×
360
      }
361

362
      (void) setInfoForNewGroup(pBlock, pLimitInfo, pOperator);
3,101✔
363
      if (pOperator->status == OP_EXEC_DONE) {
3,101!
364
        break;
×
365
      }
366

367
      if (pProjectInfo->mergeDataBlocks) {
3,101✔
368
        pFinalRes->info.scanFlag = scanFlag = pBlock->info.scanFlag;
2,092✔
369
      } else {
370
        pRes->info.scanFlag = scanFlag = pBlock->info.scanFlag;
1,009✔
371
      }
372

373
      code = setInputDataBlock(pSup, pBlock, order, scanFlag, false);
3,101✔
374
      QUERY_CHECK_CODE(code, lino, _end);
3,101!
375

376
      code = blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
3,101✔
377
      QUERY_CHECK_CODE(code, lino, _end);
3,101!
378

379
      code = projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs,
3,101✔
380
                                   pProjectInfo->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo));
3,101!
381
      QUERY_CHECK_CODE(code, lino, _end);
3,101!
382

383
      status = doIngroupLimitOffset(pLimitInfo, pBlock->info.id.groupId, pInfo->pRes, pOperator);
3,101✔
384
      if (status == PROJECT_RETRIEVE_CONTINUE) {
3,101!
385
        continue;
×
386
      }
387

388
      break;
3,101✔
389
    }
390

391
    if (pProjectInfo->mergeDataBlocks) {
5,362✔
392
      if (pRes->info.rows > 0) {
3,563✔
393
        pFinalRes->info.id.groupId = 0;  // clear groupId
2,092✔
394
        pFinalRes->info.version = pRes->info.version;
2,092✔
395

396
        // continue merge data, ignore the group id
397
        code = blockDataMerge(pFinalRes, pRes);
2,092✔
398
        QUERY_CHECK_CODE(code, lino, _end);
2,092!
399

400
        if (pFinalRes->info.rows + pRes->info.rows <= pOperator->resultInfo.threshold && (pOperator->status != OP_EXEC_DONE)) {
2,092✔
401
          continue;
2,083✔
402
        }
403
      }
404

405
      // do apply filter
406
      code = doFilter(pFinalRes, pOperator->exprSupp.pFilterInfo, NULL);
1,480✔
407
      QUERY_CHECK_CODE(code, lino, _end);
1,480!
408

409
      // when apply the limit/offset for each group, pRes->info.rows may be 0, due to limit constraint.
410
      if (pFinalRes->info.rows > 0 || (pOperator->status == OP_EXEC_DONE)) {
1,480!
411
        qDebug("project return %" PRId64 " rows, status %d", pFinalRes->info.rows, pOperator->status);
1,480!
412
        break;
1,480✔
413
      }
414
    } else {
415
      // do apply filter
416
      if (pRes->info.rows > 0) {
1,799✔
417
        code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
1,009✔
418
        QUERY_CHECK_CODE(code, lino, _end);
1,009!
419

420
        if (pRes->info.rows == 0) {
1,009!
421
          continue;
×
422
        }
423
      }
424

425
      // no results generated
426
      break;
1,799✔
427
    }
428
  }
429

430
  SSDataBlock* p = pProjectInfo->mergeDataBlocks ? pFinalRes : pRes;
3,279✔
431
  pOperator->resultInfo.totalRows += p->info.rows;
3,279✔
432
  p->info.dataLoad = 1;
3,279✔
433

434
  if (pOperator->cost.openCost == 0) {
3,279✔
435
    pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
2,271✔
436
  }
437

438
  if (pProjectInfo->outputIgnoreGroup) {
3,279✔
439
    p->info.id.groupId = 0;
3,273✔
440
  }
441

442
  if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
3,279!
443
    printDataBlock(p, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
×
444
  }
445

446
  *pResBlock = (p->info.rows > 0)? p:NULL;
3,279✔
447

448
_end:
3,279✔
449
  if (code != TSDB_CODE_SUCCESS) {
3,279!
450
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
451
    pTaskInfo->code = code;
×
452
    T_LONG_JMP(pTaskInfo->env, code);
×
453
  }
454
  return code;
3,279✔
455
}
456

457
static int32_t resetIndefinitOutputOperState(SOperatorInfo* pOper) {
×
458
  SIndefOperatorInfo* pInfo = pOper->info;
×
459
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
×
460
  SIndefRowsFuncPhysiNode* pPhynode = (SIndefRowsFuncPhysiNode*)pOper->pPhyNode;
×
461
  pOper->status = OP_NOT_OPENED;
×
462

463
  resetBasicOperatorState(&pInfo->binfo);
×
464

465
  pInfo->groupId = 0;
×
466
  pInfo->pNextGroupRes = NULL;
×
467
  int32_t code = resetAggSup(&pOper->exprSupp, &pInfo->aggSup, pTaskInfo, pPhynode->pFuncs, NULL,
×
468
    sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
×
469
    &pTaskInfo->storageAPI.functionStore);
470
  if (code == 0){
×
471
    code = setFunctionResultOutput(pOper, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, pOper->exprSupp.numOfExprs);
×
472
  }
473

474
  if (code == 0) {
×
475
    code = resetExprSupp(&pInfo->scalarSup, pTaskInfo, pPhynode->pExprs, NULL,
×
476
                         &pTaskInfo->storageAPI.functionStore);
477
  }
478
  return 0;
×
479
}
480

481
int32_t createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode,
156✔
482
                                                 SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
483
  QRY_PARAM_CHECK(pOptrInfo);
156!
484
  int32_t code = 0;
156✔
485
  int32_t lino = 0;
156✔
486
  int32_t numOfRows = 4096;
156✔
487
  size_t  keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
156✔
488

489
  SIndefOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIndefOperatorInfo));
156!
490
  SOperatorInfo*      pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
156!
491
  if (pInfo == NULL || pOperator == NULL) {
156!
492
    code = terrno;
×
493
    goto _error;
×
494
  }
495

496
  pOperator->pPhyNode = pNode;
156✔
497
  pOperator->pTaskInfo = pTaskInfo;
156✔
498

499
  SExprSupp* pSup = &pOperator->exprSupp;
156✔
500
  pSup->hasWindowOrGroup = false;
156✔
501

502
  SIndefRowsFuncPhysiNode* pPhyNode = (SIndefRowsFuncPhysiNode*)pNode;
156✔
503

504
  if (pPhyNode->pExprs != NULL) {
156✔
505
    int32_t    num = 0;
12✔
506
    SExprInfo* pSExpr = NULL;
12✔
507
    code = createExprInfo(pPhyNode->pExprs, NULL, &pSExpr, &num);
12✔
508
    QUERY_CHECK_CODE(code, lino, _error);
12!
509

510
    code = initExprSupp(&pInfo->scalarSup, pSExpr, num, &pTaskInfo->storageAPI.functionStore);
12✔
511
    if (code != TSDB_CODE_SUCCESS) {
12!
512
      goto _error;
×
513
    }
514
  }
515

516
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->node.pOutputDataBlockDesc);
156✔
517
  TSDB_CHECK_NULL(pResBlock, code, lino, _error, terrno);
156!
518

519
  // Make sure the size of SSDataBlock will never exceed the size of 2MB.
520
  int32_t TWOMB = 2 * 1024 * 1024;
156✔
521
  if (numOfRows * pResBlock->info.rowSize > TWOMB) {
156!
522
    numOfRows = TWOMB / pResBlock->info.rowSize;
×
523
  }
524

525
  initBasicInfo(&pInfo->binfo, pResBlock);
156✔
526
  initResultSizeInfo(&pOperator->resultInfo, numOfRows);
156✔
527
  code = blockDataEnsureCapacity(pResBlock, numOfRows);
156✔
528
  TSDB_CHECK_CODE(code, lino, _error);
156!
529

530
  int32_t    numOfExpr = 0;
156✔
531
  SExprInfo* pExprInfo = NULL;
156✔
532
  code = createExprInfo(pPhyNode->pFuncs, NULL, &pExprInfo, &numOfExpr);
156✔
533
  TSDB_CHECK_CODE(code, lino, _error);
156!
534

535
  code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str,
156✔
536
                            pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
156✔
537
  TSDB_CHECK_CODE(code, lino, _error);
156!
538

539
  code = setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr);
156✔
540
  TSDB_CHECK_CODE(code, lino, _error);
156!
541

542
  code = filterInitFromNode((SNode*)pPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
156✔
543
                            pTaskInfo->pStreamRuntimeInfo);
156✔
544
  TSDB_CHECK_CODE(code, lino, _error);
156!
545

546
  pInfo->binfo.pRes = pResBlock;
156✔
547
  pInfo->binfo.inputTsOrder = pNode->inputTsOrder;
156✔
548
  pInfo->binfo.outputTsOrder = pNode->outputTsOrder;
156✔
549
  code = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr, &pInfo->pPseudoColInfo);
156✔
550
  TSDB_CHECK_CODE(code, lino, _error);
156!
551

552
  setOperatorInfo(pOperator, "IndefinitOperator", QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC, false, OP_NOT_OPENED, pInfo,
156✔
553
                  pTaskInfo);
554
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doApplyIndefinitFunction, NULL, destroyIndefinitOperatorInfo,
156✔
555
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
556
                                         
557
  setOperatorResetStateFn(pOperator, resetIndefinitOutputOperState);
156✔
558
  code = appendDownstream(pOperator, &downstream, 1);
156✔
559
  if (code != TSDB_CODE_SUCCESS) {
156!
560
    goto _error;
×
561
  }
562

563
  *pOptrInfo = pOperator;
156✔
564
  return TSDB_CODE_SUCCESS;
156✔
565

566
_error:
×
567
  if (pInfo != NULL) destroyIndefinitOperatorInfo(pInfo);
×
568
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
569
  pTaskInfo->code = code;
×
570
  return code;
×
571
}
572

573
static void doHandleDataBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, SOperatorInfo* downstream,
246✔
574
                              SExecTaskInfo* pTaskInfo) {
575
  SIndefOperatorInfo* pIndefInfo = pOperator->info;
246✔
576
  SOptrBasicInfo*     pInfo = &pIndefInfo->binfo;
246✔
577
  SExprSupp*          pSup = &pOperator->exprSupp;
246✔
578

579
  int32_t order = pInfo->inputTsOrder;
246✔
580
  int32_t scanFlag = pBlock->info.scanFlag;
246✔
581
  int32_t code = TSDB_CODE_SUCCESS;
246✔
582

583
  // there is an scalar expression that needs to be calculated before apply the group aggregation.
584
  SExprSupp* pScalarSup = &pIndefInfo->scalarSup;
246✔
585
  if (pScalarSup->pExprInfo != NULL) {
246!
586
    code = projectApplyFunctions(pScalarSup->pExprInfo, pBlock, pBlock, pScalarSup->pCtx, pScalarSup->numOfExprs,
×
587
                                 pIndefInfo->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo));
×
588
    if (code != TSDB_CODE_SUCCESS) {
×
589
      T_LONG_JMP(pTaskInfo->env, code);
×
590
    }
591
  }
592

593
  code = setInputDataBlock(pSup, pBlock, order, scanFlag, false);
246✔
594
  if (code) {
246!
595
    T_LONG_JMP(pTaskInfo->env, code);
×
596
  }
597

598
  code = blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
246✔
599
  if (code != TSDB_CODE_SUCCESS) {
246!
600
    T_LONG_JMP(pTaskInfo->env, code);
×
601
  }
602

603
  code = projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs,
246✔
604
                               pIndefInfo->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo));
246!
605
  if (code != TSDB_CODE_SUCCESS) {
246✔
606
    T_LONG_JMP(pTaskInfo->env, code);
36!
607
  }
608
}
210✔
609

610
int32_t doApplyIndefinitFunction(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
292✔
611
  QRY_PARAM_CHECK(pResBlock);
292!
612
  SIndefOperatorInfo* pIndefInfo = pOperator->info;
292✔
613
  SOptrBasicInfo*     pInfo = &pIndefInfo->binfo;
292✔
614
  SExprSupp*          pSup = &pOperator->exprSupp;
292✔
615
  int64_t             st = 0;
292✔
616
  int32_t             code = TSDB_CODE_SUCCESS;
292✔
617
  int32_t             lino = 0;
292✔
618
  SSDataBlock*        pRes = pInfo->pRes;
292✔
619

620
  blockDataCleanup(pRes);
292✔
621

622
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
292✔
623
  if (pOperator->status == OP_EXEC_DONE) {
292✔
624
    return code;
76✔
625
  }
626

627
  if (pOperator->cost.openCost == 0) {
216✔
628
    st = taosGetTimestampUs();
156✔
629
  }
630

631
  SOperatorInfo* downstream = pOperator->pDownstream[0];
216✔
632

633
  while (1) {
54✔
634
    // here we need to handle the existsed group results
635
    if (pIndefInfo->pNextGroupRes != NULL) {  // todo extract method
270✔
636
      for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
228✔
637
        SqlFunctionCtx* pCtx = &pSup->pCtx[k];
114✔
638

639
        SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
114✔
640
        pResInfo->initialized = false;
114✔
641
        pCtx->pOutput = NULL;
114✔
642
      }
643

644
      doHandleDataBlock(pOperator, pIndefInfo->pNextGroupRes, downstream, pTaskInfo);
114✔
645
      pIndefInfo->pNextGroupRes = NULL;
114✔
646
    }
647

648
    if (pInfo->pRes->info.rows < pOperator->resultInfo.threshold) {
270!
649
      while (1) {
96✔
650
        // The downstream exec may change the value of the newgroup, so use a local variable instead.
651
        SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
366✔
652
        if (pBlock == NULL) {
366✔
653
          setOperatorCompleted(pOperator);
120✔
654
          break;
120✔
655
        }
656
        pInfo->pRes->info.scanFlag = pBlock->info.scanFlag;
246✔
657

658
        if (pIndefInfo->groupId == 0 && pBlock->info.id.groupId != 0) {
246✔
659
          pIndefInfo->groupId = pBlock->info.id.groupId;  // this is the initial group result
53✔
660
        } else {
661
          if (pIndefInfo->groupId != pBlock->info.id.groupId) {  // reset output buffer and computing status
193✔
662
            pIndefInfo->groupId = pBlock->info.id.groupId;
114✔
663
            pIndefInfo->pNextGroupRes = pBlock;
114✔
664
            break;
114✔
665
          }
666
        }
667

668
        doHandleDataBlock(pOperator, pBlock, downstream, pTaskInfo);
132✔
669
        if (pInfo->pRes->info.rows >= pOperator->resultInfo.threshold) {
96!
670
          break;
×
671
        }
672
      }
673
    }
674

675
    code = doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
234✔
676
    QUERY_CHECK_CODE(code, lino, _end);
234!
677

678
    size_t rows = pInfo->pRes->info.rows;
234✔
679
    if (rows > 0 || pOperator->status == OP_EXEC_DONE) {
234✔
680
      break;
681
    } else {
682
      blockDataCleanup(pInfo->pRes);
54✔
683
    }
684
  }
685

686
  size_t rows = pInfo->pRes->info.rows;
180✔
687
  pOperator->resultInfo.totalRows += rows;
180✔
688

689
  if (pOperator->cost.openCost == 0) {
180✔
690
    pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
120✔
691
  }
692

693
  *pResBlock = (rows > 0) ? pInfo->pRes : NULL;
180✔
694

695
_end:
180✔
696
  if (code != TSDB_CODE_SUCCESS) {
180!
697
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
698
    pTaskInfo->code = code;
×
699
    T_LONG_JMP(pTaskInfo->env, code);
×
700
  }
701
  return code;
180✔
702
}
703

704
int32_t initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size) {
2,430✔
705
  int32_t code = TSDB_CODE_SUCCESS;
2,430✔
706
  for (int32_t j = 0; j < size; ++j) {
6,942✔
707
    struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(&pCtx[j]);
4,512✔
708
    if (isRowEntryInitialized(pResInfo) || fmIsPseudoColumnFunc(pCtx[j].functionId) || pCtx[j].functionId == -1 ||
4,750!
709
        fmIsScalarFunc(pCtx[j].functionId)) {
238✔
710
      continue;
4,304✔
711
    }
712

713
    code = pCtx[j].fpSet.init(&pCtx[j], pCtx[j].resultInfo);
207✔
714
    if (code) {
208!
715
      return code;
×
716
    }
717
  }
718

719
  return 0;
2,430✔
720
}
721

722
/*
723
 * The start of each column SResultRowEntryInfo is denote by RowCellInfoOffset.
724
 * Note that in case of top/bottom query, the whole multiple rows of result is treated as only one row of results.
725
 * +------------+-----------------result column 1------------+------------------result column 2-----------+
726
 * | SResultRow | SResultRowEntryInfo | intermediate buffer1 | SResultRowEntryInfo | intermediate buffer 2|
727
 * +------------+--------------------------------------------+--------------------------------------------+
728
 *           offset[0]                                  offset[1]                                   offset[2]
729
 */
730
// TODO refactor: some function move away
731
int32_t setFunctionResultOutput(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t stage,
2,430✔
732
                             int32_t numOfExprs) {
733
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
2,430✔
734
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
2,430✔
735
  int32_t*        rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset;
2,430✔
736

737
  SResultRowInfo* pResultRowInfo = &pInfo->resultRowInfo;
2,430✔
738
  initResultRowInfo(pResultRowInfo);
2,430✔
739

740
  int64_t     tid = 0;
2,430✔
741
  int64_t     groupId = 0;
2,430✔
742
  SResultRow* pRow = doSetResultOutBufByKey(pSup->pResultBuf, pResultRowInfo, (char*)&tid, sizeof(tid), true, groupId,
2,430✔
743
                                            pTaskInfo, false, pSup, true);
744
  if (pRow == NULL || pTaskInfo->code != 0) {
2,429!
745
    return pTaskInfo->code;
×
746
  }
747

748
  for (int32_t i = 0; i < numOfExprs; ++i) {
6,941✔
749
    struct SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, rowEntryInfoOffset);
4,512✔
750
    cleanupResultRowEntry(pEntry);
4,512✔
751

752
    pCtx[i].resultInfo = pEntry;
4,512✔
753
    pCtx[i].scanFlag = stage;
4,512✔
754
  }
755

756
  return initCtxOutputBuffer(pCtx, numOfExprs);
2,429✔
757
}
758

759
int32_t setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols, SArray** pResList) {
2,429✔
760
  QRY_PARAM_CHECK(pResList);
2,429!
761
  SArray* pList = taosArrayInit(4, sizeof(int32_t));
2,429✔
762
  if (pList == NULL) {
2,431✔
763
    return terrno;
1✔
764
  }
765

766
  for (int32_t i = 0; i < numOfCols; ++i) {
6,944✔
767
    if (fmIsPseudoColumnFunc(pCtx[i].functionId)) {
4,513!
768
      void* px = taosArrayPush(pList, &i);
×
769
      if (px == NULL) {
×
770
        return terrno;
×
771
      }
772
    }
773
  }
774

775
  *pResList = pList;
2,431✔
776
  return 0;
2,431✔
777
}
778

779
int32_t doGenerateSourceData(SOperatorInfo* pOperator) {
×
780
  SProjectOperatorInfo* pProjectInfo = pOperator->info;
×
781

782
  SExprSupp*   pSup = &pOperator->exprSupp;
×
783
  SSDataBlock* pRes = pProjectInfo->binfo.pRes;
×
784
  SExprInfo*   pExpr = pSup->pExprInfo;
×
785
  int64_t      st = taosGetTimestampUs();
×
786
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
×
787

788
  int32_t code = blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);
×
789
  if (code) {
×
790
    return code;
×
791
  }
792

793
  for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
×
794
    int32_t outputSlotId = pExpr[k].base.resSchema.slotId;
×
795

796
    if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) {
×
797
      SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, outputSlotId);
×
798
      if (pColInfoData == NULL) {
×
799
        return terrno;
×
800
      }
801

802
      int32_t type = pExpr[k].base.pParam[0].param.nType;
×
803
      if (TSDB_DATA_TYPE_NULL == type) {
×
804
        colDataSetNNULL(pColInfoData, 0, 1);
805
      } else {
806
        code = colDataSetVal(pColInfoData, 0, taosVariantGet(&pExpr[k].base.pParam[0].param, type), false);
×
807
        if (code) {
×
808
          return code;
×
809
        }
810
      }
811
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) {
×
812
      SqlFunctionCtx* pfCtx = &pSup->pCtx[k];
×
813

814
      // UDF scalar functions will be calculated here, for example, select foo(n) from (select 1 n).
815
      // UDF aggregate functions will be handled in agg operator.
816
      if (fmIsScalarFunc(pfCtx->functionId)) {
×
817
        SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
×
818
        if (pBlockList == NULL) {
×
819
          return terrno;
×
820
        }
821

822
        void* px = taosArrayPush(pBlockList, &pRes);
×
823
        if (px == NULL) {
×
824
          return terrno;
×
825
        }
826

827
        SColumnInfoData* pResColData = taosArrayGet(pRes->pDataBlock, outputSlotId);
×
828
        if (pResColData == NULL) {
×
829
          return terrno;
×
830
        }
831

832
        SColumnInfoData  idata = {.info = pResColData->info, .hasNull = true};
×
833

834
        SScalarParam dest = {.columnData = &idata};
×
835
        code = scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest, GET_STM_RTINFO(pOperator->pTaskInfo), NULL);
×
836
        if (code != TSDB_CODE_SUCCESS) {
×
837
          taosArrayDestroy(pBlockList);
×
838
          return code;
×
839
        }
840

841
        int32_t startOffset = pRes->info.rows;
×
842
        if (pRes->info.capacity <= 0) {
×
843
          qError("project failed at: %s:%d", __func__, __LINE__);
×
844
          return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
845
        }
846
        code = colDataAssign(pResColData, &idata, dest.numOfRows, &pRes->info);
×
847
        if (code) {
×
848
          return code;
×
849
        }
850

851
        colDataDestroy(&idata);
×
852
        taosArrayDestroy(pBlockList);
×
853
      } else {
854
        return TSDB_CODE_OPS_NOT_SUPPORT;
×
855
      }
856
    } else {
857
      return TSDB_CODE_OPS_NOT_SUPPORT;
×
858
    }
859
  }
860

861
  pRes->info.rows = 1;
×
862
  code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
×
863
  if (code) {
×
864
    pTaskInfo->code = code;
×
865
    return code;
×
866
  }
867

868
  (void) doIngroupLimitOffset(&pProjectInfo->limitInfo, 0, pRes, pOperator);
×
869

870
  pOperator->resultInfo.totalRows += pRes->info.rows;
×
871

872
  setOperatorCompleted(pOperator);
×
873
  if (pOperator->cost.openCost == 0) {
×
874
    pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
×
875
  }
876

877
  return code;
×
878
}
879

880
static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, SArray* pPseudoList) {
3,395✔
881
  size_t num = (pPseudoList != NULL) ? taosArrayGetSize(pPseudoList) : 0;
3,395✔
882
  for (int32_t i = 0; i < num; ++i) {
3,395!
883
    pCtx[i].pOutput = taosArrayGet(pResult->pDataBlock, i);
×
884
    if (pCtx[i].pOutput == NULL) {
×
885
      qError("failed to get the output buf, ptr is null");
×
886
    }
887
  }
888
}
3,395✔
889

890
int32_t projectApplyFunctionsWithSelect(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock,
3,395✔
891
                                        SqlFunctionCtx* pCtx, int32_t numOfOutput, SArray* pPseudoList,
892
                                        const void* pExtraParams, bool doSelectFunc) {
893
  int32_t lino = 0;
3,395✔
894
  int32_t code = TSDB_CODE_SUCCESS;
3,395✔
895
  setPseudoOutputColInfo(pResult, pCtx, pPseudoList);
3,395✔
896
  pResult->info.dataLoad = 1;
3,395✔
897

898
  SArray* processByRowFunctionCtx = NULL;
3,395✔
899
  if (pSrcBlock == NULL) {
3,395!
900
    for (int32_t k = 0; k < numOfOutput; ++k) {
×
901
      int32_t outputSlotId = pExpr[k].base.resSchema.slotId;
×
902

903
      if (pExpr[k].pExpr->nodeType != QUERY_NODE_VALUE) {
×
904
        qError("project failed at: %s:%d", __func__, __LINE__);
×
905
        code = TSDB_CODE_INVALID_PARA;
×
906
        TSDB_CHECK_CODE(code, lino, _exit);
×
907
      }
908
      SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
×
909
      if (pColInfoData == NULL) {
×
910
        code = terrno;
×
911
        TSDB_CHECK_CODE(code, lino, _exit);
×
912
      }
913

914
      int32_t type = pExpr[k].base.pParam[0].param.nType;
×
915
      if (TSDB_DATA_TYPE_NULL == type) {
×
916
        colDataSetNNULL(pColInfoData, 0, 1);
917
      } else {
918
        code = colDataSetVal(pColInfoData, 0, taosVariantGet(&pExpr[k].base.pParam[0].param, type), false);
×
919
        TSDB_CHECK_CODE(code, lino, _exit);
×
920
      }
921
    }
922

923
    pResult->info.rows = 1;
×
924
    goto _exit;
×
925
  }
926

927
  if (pResult != pSrcBlock) {
3,395✔
928
    pResult->info.id.groupId = pSrcBlock->info.id.groupId;
3,347✔
929
    memcpy(pResult->info.parTbName, pSrcBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
3,347✔
930
    qTrace("%s, parName:%s,groupId:%" PRIu64, __FUNCTION__, pSrcBlock->info.parTbName, pResult->info.id.groupId);
3,347!
931
  }
932

933
  // if the source equals to the destination, it is to create a new column as the result of scalar
934
  // function or some operators.
935
  bool createNewColModel = (pResult == pSrcBlock);
3,395✔
936
  if (createNewColModel) {
3,395✔
937
    code = blockDataEnsureCapacity(pResult, pResult->info.rows);
48✔
938
    if (code) {
48!
939
      TSDB_CHECK_CODE(code, lino, _exit);
×
940
    }
941
  }
942

943
  int32_t numOfRows = 0;
3,395✔
944

945
  for (int32_t k = 0; k < numOfOutput; ++k) {
10,193✔
946
    int32_t               outputSlotId = pExpr[k].base.resSchema.slotId;
6,816✔
947
    SqlFunctionCtx*       pfCtx = &pCtx[k];
6,816✔
948
    SInputColumnInfoData* pInputData = &pfCtx->input;
6,816✔
949

950
    if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) {  // it is a project query
6,816✔
951
      SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
6,136✔
952
      if (pColInfoData == NULL) {
6,136!
953
        code = terrno;
×
954
        TSDB_CHECK_CODE(code, lino, _exit);
18!
955
      }
956

957
      if (pResult->info.rows > 0 && !createNewColModel) {
6,136!
958
        int32_t ret = 0;
×
959

960
        if (pInputData->pData[0] == NULL) {
×
961
          int32_t slotId = pfCtx->param[0].pCol->slotId;
×
962

963
          SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId);
×
964
          if (pInput == NULL) {
×
965
            code = terrno;
×
966
            TSDB_CHECK_CODE(code, lino, _exit);
×
967
          }
968

969
          ret = colDataMergeCol(pColInfoData, pResult->info.rows, (int32_t*)&pResult->info.capacity, pInput,
×
970
                                pSrcBlock->info.rows);
×
971
        } else {
972
          ret = colDataMergeCol(pColInfoData, pResult->info.rows, (int32_t*)&pResult->info.capacity,
×
973
                                pInputData->pData[0], pInputData->numOfRows);
×
974
        }
975

976
        if (ret < 0) {
×
977
          code = ret;
×
978
        }
979

980
        TSDB_CHECK_CODE(code, lino, _exit);
×
981
      } else {
982
        if (pInputData->pData[0] == NULL) {
6,136!
983
          int32_t slotId = pfCtx->param[0].pCol->slotId;
×
984

985
          SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId);
×
986
          if (pInput == NULL) {
×
987
            code = terrno;
×
988
            TSDB_CHECK_CODE(code, lino, _exit);
×
989
          }
990

991
          code = colDataAssign(pColInfoData, pInput, pSrcBlock->info.rows, &pResult->info);
×
992
          numOfRows = pSrcBlock->info.rows;
×
993
        } else {
994
          code = colDataAssign(pColInfoData, pInputData->pData[0], pInputData->numOfRows, &pResult->info);
6,136✔
995
          numOfRows = pInputData->numOfRows;
6,136✔
996
        }
997

998
        TSDB_CHECK_CODE(code, lino, _exit);
6,136!
999
      }
1000
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) {
680!
1001
      SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
×
1002
      if (pColInfoData == NULL) {
×
1003
        code = terrno;
×
1004
        TSDB_CHECK_CODE(code, lino, _exit);
×
1005
      }
1006

1007
      int32_t offset = createNewColModel ? 0 : pResult->info.rows;
×
1008

1009
      int32_t type = pExpr[k].base.pParam[0].param.nType;
×
1010
      if (TSDB_DATA_TYPE_NULL == type) {
×
1011
        colDataSetNNULL(pColInfoData, offset, pSrcBlock->info.rows);
×
1012
      } else {
1013
        char* p = taosVariantGet(&pExpr[k].base.pParam[0].param, type);
×
1014
        for (int32_t i = 0; i < pSrcBlock->info.rows; ++i) {
×
1015
          code = colDataSetVal(pColInfoData, i + offset, p, false);
×
1016
          TSDB_CHECK_CODE(code, lino, _exit);
×
1017
        }
1018
      }
1019

1020
      numOfRows = pSrcBlock->info.rows;
×
1021
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_OPERATOR) {
680✔
1022
      SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
130✔
1023
      if (pBlockList == NULL) {
130!
1024
        code = terrno;
×
1025
        TSDB_CHECK_CODE(code, lino, _exit);
×
1026
      }
1027

1028
      void* px = taosArrayPush(pBlockList, &pSrcBlock);
130✔
1029
      if (px == NULL) {
130!
1030
        code = terrno;
×
1031
        taosArrayDestroy(pBlockList);
×
1032
        TSDB_CHECK_CODE(code, lino, _exit);
×
1033
      }
1034

1035
      SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId);
130✔
1036
      if (pResColData == NULL) {
130!
1037
        code = terrno;
×
1038
        taosArrayDestroy(pBlockList);
×
1039
        TSDB_CHECK_CODE(code, lino, _exit);
×
1040
      }
1041

1042
      SColumnInfoData idata = {.info = pResColData->info, .hasNull = true};
130✔
1043

1044
      SScalarParam dest = {.columnData = &idata};
130✔
1045
      code = scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest, pExtraParams, NULL);
130✔
1046
      if (code != TSDB_CODE_SUCCESS) {
130!
1047
        taosArrayDestroy(pBlockList);
×
1048
        TSDB_CHECK_CODE(code, lino, _exit);
×
1049
      }
1050

1051
      int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
130✔
1052
      if (pResult->info.capacity <= 0) {
130!
1053
        qError("project failed at: %s:%d", __func__, __LINE__);
×
1054
        code = TSDB_CODE_INVALID_PARA;
×
1055
        TSDB_CHECK_CODE(code, lino, _exit);
×
1056
      }
1057

1058
      int32_t ret =
1059
          colDataMergeCol(pResColData, startOffset, (int32_t*)&pResult->info.capacity, &idata, dest.numOfRows);
130✔
1060
      if (ret < 0) {
130!
1061
        code = ret;
×
1062
      }
1063

1064
      colDataDestroy(&idata);
130✔
1065
      TSDB_CHECK_CODE(code, lino, _exit);
130!
1066

1067
      numOfRows = dest.numOfRows;
130✔
1068
      taosArrayDestroy(pBlockList);
130✔
1069
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) {
550!
1070
      // _rowts/_c0, not tbname column
1071
      if (fmIsPseudoColumnFunc(pfCtx->functionId) && (!fmIsScanPseudoColumnFunc(pfCtx->functionId)) &&
550!
1072
          !fmIsPlaceHolderFunc(pfCtx->functionId)) {
×
1073
        if (fmIsGroupIdFunc(pfCtx->functionId)) {
×
1074
          SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
×
1075
          TSDB_CHECK_NULL(pColInfoData, code, lino, _exit, terrno);
×
1076
          code = colDataSetVal(pColInfoData, pResult->info.rows, (const char*)&pSrcBlock->info.id.groupId, false);
×
1077
          TSDB_CHECK_CODE(code, lino, _exit);
×
1078
        }
1079
      } else if (fmIsIndefiniteRowsFunc(pfCtx->functionId)) {
550✔
1080
        SResultRowEntryInfo* pResInfo = GET_RES_INFO(pfCtx);
246✔
1081
        code = pfCtx->fpSet.init(pfCtx, pResInfo);
246✔
1082
        TSDB_CHECK_CODE(code, lino, _exit);
246!
1083
        pfCtx->pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId);
246✔
1084
        if (pfCtx->pOutput == NULL) {
246!
1085
          code = terrno;
×
1086
          TSDB_CHECK_CODE(code, lino, _exit);
×
1087
        }
1088

1089
        pfCtx->offset = createNewColModel ? 0 : pResult->info.rows;  // set the start offset
246!
1090

1091
        // set the timestamp(_rowts) output buffer
1092
        if (taosArrayGetSize(pPseudoList) > 0) {
246!
1093
          int32_t* outputColIndex = taosArrayGet(pPseudoList, 0);
×
1094
          if (outputColIndex == NULL) {
×
1095
            code = terrno;
×
1096
            TSDB_CHECK_CODE(code, lino, _exit);
×
1097
          }
1098

1099
          pfCtx->pTsOutput = (SColumnInfoData*)pCtx[*outputColIndex].pOutput;
×
1100
        }
1101

1102
        // link pDstBlock to set selectivity value
1103
        if (pfCtx->subsidiaries.num > 0) {
246!
1104
          pfCtx->pDstBlock = pResult;
×
1105
        }
1106

1107
        code = pfCtx->fpSet.process(pfCtx);
246✔
1108
        if (code != TSDB_CODE_SUCCESS) {
246✔
1109
          if (pCtx[k].fpSet.cleanup != NULL) {
18!
1110
            pCtx[k].fpSet.cleanup(&pCtx[k]);
×
1111
          }
1112
          TSDB_CHECK_CODE(code, lino, _exit);
18!
1113
        }
1114

1115
        numOfRows = pResInfo->numOfRes;
228✔
1116
        if (fmIsProcessByRowFunc(pfCtx->functionId)) {
228✔
1117
          if (NULL == processByRowFunctionCtx) {
168!
1118
            processByRowFunctionCtx = taosArrayInit(1, sizeof(SqlFunctionCtx*));
168✔
1119
            if (!processByRowFunctionCtx) {
168!
1120
              code = terrno;
×
1121
              TSDB_CHECK_CODE(code, lino, _exit);
×
1122
            }
1123
          }
1124

1125
          void* px = taosArrayPush(processByRowFunctionCtx, &pfCtx);
168✔
1126
          if (px == NULL) {
168!
1127
            code = terrno;
×
1128
            TSDB_CHECK_CODE(code, lino, _exit);
×
1129
          }
1130
        }
1131
      } else if (fmIsAggFunc(pfCtx->functionId)) {
304!
1132
        // selective value output should be set during corresponding function execution
1133
        if (!doSelectFunc && fmIsSelectValueFunc(pfCtx->functionId)) {
×
1134
          continue;
×
1135
        }
1136
        // _group_key function for "partition by tbname" + csum(col_name) query
1137
        SColumnInfoData* pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId);
×
1138
        if (pOutput == NULL) {
×
1139
          code = terrno;
×
1140
          TSDB_CHECK_CODE(code, lino, _exit);
×
1141
        }
1142

1143
        int32_t slotId = pfCtx->param[0].pCol->slotId;
×
1144

1145
        // todo handle the json tag
1146
        SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId);
×
1147
        if (pInput == NULL) {
×
1148
          code = terrno;
×
1149
          TSDB_CHECK_CODE(code, lino, _exit);
×
1150
        }
1151

1152
        for (int32_t f = 0; f < pSrcBlock->info.rows; ++f) {
×
1153
          bool isNull = colDataIsNull_s(pInput, f);
×
1154
          if (isNull) {
×
1155
            colDataSetNULL(pOutput, pResult->info.rows + f);
×
1156
          } else {
1157
            char* data = colDataGetData(pInput, f);
×
1158
            code = colDataSetVal(pOutput, pResult->info.rows + f, data, isNull);
×
1159
            TSDB_CHECK_CODE(code, lino, _exit);
×
1160
          }
1161
        }
1162

1163
      } else {
1164
        SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
304✔
1165
        if (pBlockList == NULL) {
304!
1166
          code = terrno;
×
1167
          TSDB_CHECK_CODE(code, lino, _exit);
×
1168
        }
1169

1170
        void* px = taosArrayPush(pBlockList, &pSrcBlock);
304✔
1171
        if (px == NULL) {
304!
1172
          code = terrno;
×
1173
          TSDB_CHECK_CODE(code, lino, _exit);
×
1174
        }
1175

1176
        SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId);
304✔
1177
        if (pResColData == NULL) {
304!
1178
          taosArrayDestroy(pBlockList);
×
1179
          code = terrno;
×
1180
          TSDB_CHECK_CODE(code, lino, _exit);
×
1181
        }
1182

1183
        SColumnInfoData idata = {.info = pResColData->info, .hasNull = true};
304✔
1184

1185
        SScalarParam dest = {.columnData = &idata};
304✔
1186
        code = scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest, pExtraParams, NULL);
304✔
1187
        if (code != TSDB_CODE_SUCCESS) {
304!
1188
          taosArrayDestroy(pBlockList);
×
1189
          TSDB_CHECK_CODE(code, lino, _exit);
×
1190
        }
1191

1192
        int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
304✔
1193
        if (pResult->info.capacity <= 0) {
304!
1194
          qError("project failed at: %s:%d", __func__, __LINE__);
×
1195
          code = TSDB_CODE_INVALID_PARA;
×
1196
          TSDB_CHECK_CODE(code, lino, _exit);
×
1197
        }
1198
        int32_t ret =
1199
            colDataMergeCol(pResColData, startOffset, (int32_t*)&pResult->info.capacity, &idata, dest.numOfRows);
304✔
1200
        if (ret < 0) {
304!
1201
          code = ret;
×
1202
        }
1203

1204
        colDataDestroy(&idata);
304✔
1205

1206
        numOfRows = dest.numOfRows;
304✔
1207
        taosArrayDestroy(pBlockList);
304✔
1208
        TSDB_CHECK_CODE(code, lino, _exit);
304!
1209
      }
1210
    } else {
1211
      return TSDB_CODE_OPS_NOT_SUPPORT;
×
1212
    }
1213
  }
1214

1215
  if (processByRowFunctionCtx && taosArrayGetSize(processByRowFunctionCtx) > 0) {
3,377!
1216
    SqlFunctionCtx** pfCtx = taosArrayGet(processByRowFunctionCtx, 0);
168✔
1217
    if (pfCtx == NULL) {
168!
1218
      code = terrno;
×
1219
      TSDB_CHECK_CODE(code, lino, _exit);
×
1220
    }
1221

1222
    code = (*pfCtx)->fpSet.processFuncByRow(processByRowFunctionCtx);
168✔
1223
    TSDB_CHECK_CODE(code, lino, _exit);
168✔
1224
    numOfRows = (*pfCtx)->resultInfo->numOfRes;
150✔
1225
  }
1226

1227
  if (!createNewColModel) {
3,359✔
1228
    pResult->info.rows += numOfRows;
3,311✔
1229
  }
1230

1231
_exit:
48✔
1232
  if (processByRowFunctionCtx) {
3,395✔
1233
    taosArrayDestroy(processByRowFunctionCtx);
168✔
1234
  }
1235
  if (code) {
3,395✔
1236
    qError("project apply functions failed at: %s:%d", __func__, lino);
36!
1237
  }
1238
  return code;
3,395✔
1239
}
1240

1241
int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
3,395✔
1242
                              int32_t numOfOutput, SArray* pPseudoList, const void* pExtraParams) {
1243
  return projectApplyFunctionsWithSelect(pExpr, pResult, pSrcBlock, pCtx, numOfOutput, pPseudoList, pExtraParams, false);
3,395✔
1244
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc