• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4976

06 Mar 2026 09:48AM UTC coverage: 68.446% (+0.08%) from 68.37%
#4976

push

travis-ci

web-flow
feat(TDgpt): support multiple input data columns for anomaly detection. (#34606)

0 of 93 new or added lines in 9 files covered. (0.0%)

5718 existing lines in 144 files now uncovered.

211146 of 308486 relevant lines covered (68.45%)

136170362.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.53
/source/libs/executor/src/projectoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "functionMgt.h"
19
#include "operator.h"
20
#include "querytask.h"
21
#include "taoserror.h"
22
#include "tdatablock.h"
23

24
typedef struct SProjectOperatorInfo {
25
  SOptrBasicInfo binfo;
26
  SAggSupporter  aggSup;
27
  SArray*        pPseudoColInfo;
28
  SLimitInfo     limitInfo;
29
  bool           mergeDataBlocks;
30
  SSDataBlock*   pFinalRes;
31
  bool           inputIgnoreGroup;
32
  bool           outputIgnoreGroup;
33
} SProjectOperatorInfo;
34

35
typedef struct SIndefOperatorInfo {
36
  SOptrBasicInfo binfo;
37
  SAggSupporter  aggSup;
38
  SArray*        pPseudoColInfo;
39
  SExprSupp      scalarSup;
40
  uint64_t       groupId;
41
  SSDataBlock*   pNextGroupRes;
42
} SIndefOperatorInfo;
43

44
static int32_t      doGenerateSourceData(SOperatorInfo* pOperator);
45
static int32_t      doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
46
static int32_t      doApplyIndefinitFunction(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
47
int32_t projectApplyOperator(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, int32_t outputSlotId, int32_t* numOfRows, bool createNewColModel, const void* pExtraParams);
48

49
static void destroyProjectOperatorInfo(void* param) {
139,950,814✔
50
  if (NULL == param) {
139,950,814✔
UNCOV
51
    return;
×
52
  }
53

54
  SProjectOperatorInfo* pInfo = (SProjectOperatorInfo*)param;
139,950,814✔
55
  cleanupBasicInfo(&pInfo->binfo);
139,950,814✔
56
  cleanupAggSup(&pInfo->aggSup);
139,953,545✔
57
  taosArrayDestroy(pInfo->pPseudoColInfo);
139,931,756✔
58

59
  blockDataDestroy(pInfo->pFinalRes);
139,930,715✔
60
  taosMemoryFreeClear(param);
139,936,782✔
61
}
62

63
static void destroyIndefinitOperatorInfo(void* param) {
2,963,985✔
64
  SIndefOperatorInfo* pInfo = (SIndefOperatorInfo*)param;
2,963,985✔
65
  if (pInfo == NULL) {
2,963,985✔
UNCOV
66
    return;
×
67
  }
68

69
  cleanupBasicInfo(&pInfo->binfo);
2,963,985✔
70
  taosArrayDestroy(pInfo->pPseudoColInfo);
2,964,839✔
71
  cleanupAggSup(&pInfo->aggSup);
2,964,483✔
72
  cleanupExprSupp(&pInfo->scalarSup);
2,963,629✔
73

74
  taosMemoryFreeClear(param);
2,963,985✔
75
}
76

77
void streamOperatorReleaseState(SOperatorInfo* pOperator) {
×
78
  SOperatorInfo* downstream = pOperator->pDownstream[0];
×
79
  if (downstream->fpSet.releaseStreamStateFn) {
×
UNCOV
80
    downstream->fpSet.releaseStreamStateFn(downstream);
×
81
  }
UNCOV
82
}
×
83

84
void streamOperatorReloadState(SOperatorInfo* pOperator) {
×
85
  SOperatorInfo* downstream = pOperator->pDownstream[0];
×
86
  if (downstream->fpSet.reloadStreamStateFn) {
×
UNCOV
87
    downstream->fpSet.reloadStreamStateFn(downstream);
×
88
  }
UNCOV
89
}
×
90

91
static int32_t resetProjectOperState(SOperatorInfo* pOper) {
7,673,104✔
92
  SProjectOperatorInfo* pProject = pOper->info;
7,673,104✔
93
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
7,673,104✔
94
  pOper->status = OP_NOT_OPENED;
7,673,104✔
95

96
  resetBasicOperatorState(&pProject->binfo);
7,673,104✔
97
  SProjectPhysiNode* pPhynode = (SProjectPhysiNode*)pOper->pPhyNode;
7,673,104✔
98

99
  pProject->limitInfo = (SLimitInfo){0};
7,673,104✔
100
  initLimitInfo(pPhynode->node.pLimit, pPhynode->node.pSlimit, &pProject->limitInfo);
7,673,104✔
101

102
  blockDataCleanup(pProject->pFinalRes);
7,672,436✔
103

104
  int32_t code = resetAggSup(&pOper->exprSupp, &pProject->aggSup, pTaskInfo, pPhynode->pProjections, NULL,
15,343,456✔
105
    sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
7,673,104✔
106
    &pTaskInfo->storageAPI.functionStore);
107
  if (code == 0){
7,671,188✔
108
    code = setFunctionResultOutput(pOper, &pProject->binfo, &pProject->aggSup, MAIN_SCAN, pOper->exprSupp.numOfExprs);
7,671,956✔
109
  }
110
  return 0;
7,671,891✔
111
}
112

113
int32_t createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, SExecTaskInfo* pTaskInfo,
139,915,498✔
114
                                  SOperatorInfo** pOptrInfo) {
115
  QRY_PARAM_CHECK(pOptrInfo);
139,915,498✔
116

117
  int32_t code = TSDB_CODE_SUCCESS;
139,929,732✔
118
  SProjectOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SProjectOperatorInfo));
139,929,732✔
119
  SOperatorInfo*        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
139,768,385✔
120
  if (pInfo == NULL || pOperator == NULL) {
139,822,414✔
121
    code = terrno;
925✔
UNCOV
122
    goto _error;
×
123
  }
124

125
  pOperator->pPhyNode = pProjPhyNode;
139,821,489✔
126
  pOperator->exprSupp.hasWindowOrGroup = false;
139,840,576✔
127
  pOperator->pTaskInfo = pTaskInfo;
139,844,544✔
128

129
  int32_t    lino = 0;
139,862,484✔
130

131
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pProjPhyNode->node.pOutputDataBlockDesc);
139,862,484✔
132
  TSDB_CHECK_NULL(pResBlock, code, lino, _error, terrno);
139,944,456✔
133

134
  initLimitInfo(pProjPhyNode->node.pLimit, pProjPhyNode->node.pSlimit, &pInfo->limitInfo);
139,944,456✔
135

136
  pInfo->binfo.pRes = pResBlock;
139,957,071✔
137
  pInfo->pFinalRes = NULL;
139,957,605✔
138

139
  code = createOneDataBlock(pResBlock, false, &pInfo->pFinalRes);
139,937,418✔
140
  TSDB_CHECK_CODE(code, lino, _error);
139,935,864✔
141

142
  pInfo->binfo.inputTsOrder = pProjPhyNode->node.inputTsOrder;
139,935,864✔
143
  pInfo->binfo.outputTsOrder = pProjPhyNode->node.outputTsOrder;
139,917,544✔
144
  pInfo->inputIgnoreGroup = pProjPhyNode->inputIgnoreGroup;
139,911,675✔
145
  pInfo->outputIgnoreGroup = pProjPhyNode->ignoreGroupId;
139,892,251✔
146

147
  if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {
139,920,033✔
148
    pInfo->mergeDataBlocks = false;
326,487✔
149
  } else {
150
    if (!pProjPhyNode->ignoreGroupId) {
139,543,599✔
151
      pInfo->mergeDataBlocks = false;
2,991,135✔
152
    } else {
153
      pInfo->mergeDataBlocks = pProjPhyNode->mergeDataBlock;
136,610,759✔
154
    }
155
  }
156

157
  int32_t numOfRows = 4096;
139,948,184✔
158
  size_t  keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
139,948,184✔
159

160
  // Make sure the size of SSDataBlock will never exceed the size of 2MB.
161
  int32_t TWOMB = 2 * 1024 * 1024;
139,948,184✔
162
  if (numOfRows * pResBlock->info.rowSize > TWOMB) {
139,948,184✔
163
    numOfRows = TWOMB / pResBlock->info.rowSize;
4,549,141✔
164
  }
165

166
  initResultSizeInfo(&pOperator->resultInfo, numOfRows);
139,903,019✔
167
  
168
  int32_t    numOfCols = 0;
139,929,961✔
169
  SExprInfo* pExprInfo = NULL;
139,931,587✔
170
  code = createExprInfo(pProjPhyNode->pProjections, NULL, &pExprInfo, &numOfCols);
139,938,102✔
171
  TSDB_CHECK_CODE(code, lino, _error);
139,891,123✔
172
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
279,267,618✔
173
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
139,657,435✔
174
  TSDB_CHECK_CODE(code, lino, _error);
139,628,044✔
175

176
  initBasicInfo(&pInfo->binfo, pResBlock);
139,628,044✔
177
  code = setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfCols);
139,590,250✔
178
  TSDB_CHECK_CODE(code, lino, _error);
139,624,531✔
179

180
  code = filterInitFromNode((SNode*)pProjPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
139,649,005✔
181
                            pTaskInfo->pStreamRuntimeInfo);
139,624,531✔
182
  TSDB_CHECK_CODE(code, lino, _error);
139,630,266✔
183

184
  code = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfCols, &pInfo->pPseudoColInfo);
139,630,266✔
185
  TSDB_CHECK_CODE(code, lino, _error);
139,595,964✔
186

187
  setOperatorInfo(pOperator, "ProjectOperator", QUERY_NODE_PHYSICAL_PLAN_PROJECT, false, OP_NOT_OPENED, pInfo,
139,595,964✔
188
                  pTaskInfo);
189
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doProjectOperation, NULL, destroyProjectOperatorInfo,
139,634,502✔
190
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
191
  setOperatorStreamStateFn(pOperator, streamOperatorReleaseState, streamOperatorReloadState);
139,603,890✔
192
  setOperatorResetStateFn(pOperator, resetProjectOperState);
139,579,753✔
193

194
  if (NULL != downstream) {
139,587,267✔
195
    code = appendDownstream(pOperator, &downstream, 1);
137,849,653✔
196
    if (code != TSDB_CODE_SUCCESS) {
137,874,284✔
UNCOV
197
      goto _error;
×
198
    }
199
  }
200

201
  *pOptrInfo = pOperator;
139,611,898✔
202
  return TSDB_CODE_SUCCESS;
139,609,290✔
203

204
_error:
271,272✔
205
  if (pInfo != NULL) destroyProjectOperatorInfo(pInfo);
271,272✔
206
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
271,272✔
207
  pTaskInfo->code = code;
271,272✔
208
  return code;
271,272✔
209
}
210

211
static int32_t discardGroupDataBlock(SSDataBlock* pBlock, SLimitInfo* pLimitInfo) {
289,822,499✔
212
  if (pLimitInfo->remainGroupOffset > 0) {
289,822,499✔
213
    // it is the first group
214
    if (pLimitInfo->currentGroupId == 0 || pLimitInfo->currentGroupId == pBlock->info.id.groupId) {
949,670✔
215
      pLimitInfo->currentGroupId = pBlock->info.id.groupId;
235,869✔
216
      return PROJECT_RETRIEVE_CONTINUE;
235,869✔
217
    } else if (pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
713,801✔
218
      // now it is the data from a new group
219
      pLimitInfo->remainGroupOffset -= 1;
713,801✔
220
      pLimitInfo->currentGroupId = pBlock->info.id.groupId;
713,801✔
221

222
      // ignore data block in current group
223
      if (pLimitInfo->remainGroupOffset > 0) {
713,801✔
224
        return PROJECT_RETRIEVE_CONTINUE;
669,715✔
225
      }
226

227
      pLimitInfo->currentGroupId = 0;
44,086✔
228
    }
229
  }
230

231
  return PROJECT_RETRIEVE_DONE;
288,920,037✔
232
}
233

234
static int32_t setInfoForNewGroup(SSDataBlock* pBlock, SLimitInfo* pLimitInfo, SOperatorInfo* pOperator) {
288,906,985✔
235
  // remainGroupOffset == 0
236
  // here check for a new group data, we need to handle the data of the previous group.
237
  if (!(pLimitInfo->remainGroupOffset == 0 || pLimitInfo->remainGroupOffset == -1)) {
288,906,985✔
238
    qError("project failed at: %s:%d", __func__, __LINE__);
×
UNCOV
239
    return TSDB_CODE_INVALID_PARA;
×
240
  }
241

242
  bool newGroup = false;
288,970,489✔
243
  if (0 == pBlock->info.id.groupId) {
288,970,489✔
244
    pLimitInfo->numOfOutputGroups = 1;
250,786,744✔
245
  } else if (pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
38,118,468✔
246
    pLimitInfo->currentGroupId = pBlock->info.id.groupId;
28,398,867✔
247
    pLimitInfo->numOfOutputGroups += 1;
28,401,275✔
248
    newGroup = true;
28,400,605✔
249
  } else {
250
    return PROJECT_RETRIEVE_CONTINUE;
9,728,699✔
251
  }
252

253
  if ((pLimitInfo->slimit.limit >= 0) && (pLimitInfo->slimit.limit < pLimitInfo->numOfOutputGroups)) {
279,219,330✔
254
    setOperatorCompleted(pOperator);
227,697✔
255
    return PROJECT_RETRIEVE_DONE;
227,697✔
256
  }
257

258
  // reset the value for a new group data
259
  // existing rows that belongs to previous group.
260
  if (newGroup) {
278,977,517✔
261
    resetLimitInfoForNextGroup(pLimitInfo);
28,167,490✔
262
  }
263

264
  return PROJECT_RETRIEVE_CONTINUE;
278,959,446✔
265
}
266

267
// todo refactor
268
static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SSDataBlock* pBlock,
289,398,491✔
269
                                    SOperatorInfo* pOperator) {
270
  // set current group id
271
  pLimitInfo->currentGroupId = groupId;
289,398,491✔
272
  bool limitReached = applyLimitOffset(pLimitInfo, pBlock, pOperator->pTaskInfo);
289,452,773✔
273
  if (pBlock->info.rows == 0 && 0 != pLimitInfo->limit.limit) {
289,428,566✔
274
    return PROJECT_RETRIEVE_CONTINUE;
7,970,680✔
275
  } else {
276
    if (limitReached && (pLimitInfo->slimit.limit >= 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
281,463,057✔
277
      setOperatorCompleted(pOperator);
118,997✔
278
    } else if (limitReached && groupId == 0) {
281,344,547✔
279
      setOperatorCompleted(pOperator);
8,086,722✔
280
    }
281
  }
282

283
  return PROJECT_RETRIEVE_DONE;
281,372,016✔
284
}
285

286
int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
393,963,628✔
287
  QRY_PARAM_CHECK(pResBlock);
393,963,628✔
288

289
  SProjectOperatorInfo* pProjectInfo = pOperator->info;
393,988,836✔
290
  SOptrBasicInfo*       pInfo = &pProjectInfo->binfo;
393,940,675✔
291
  SExprSupp*            pSup = &pOperator->exprSupp;
393,988,954✔
292
  SSDataBlock*          pRes = pInfo->pRes;
393,958,628✔
293
  SSDataBlock*          pFinalRes = pProjectInfo->pFinalRes;
393,930,375✔
294
  int32_t               code = 0;
393,968,268✔
295
  int32_t               lino = 0;
393,968,268✔
296
  int64_t               st = 0;
393,968,268✔
297
  int32_t               order = pInfo->inputTsOrder;
393,968,268✔
298
  int32_t               scanFlag = 0;
393,981,568✔
299

300
  blockDataCleanup(pFinalRes);
393,981,568✔
301
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
393,937,293✔
302

303
  if (pOperator->status == OP_EXEC_DONE) {
393,956,384✔
304
    return code;
78,515,543✔
305
  }
306

307
  if (pOperator->cost.openCost == 0) {
315,438,361✔
308
    st = taosGetTimestampUs();
139,549,360✔
309
  }
310

311
  SOperatorInfo* downstream = pOperator->numOfDownstream > 0 ? pOperator->pDownstream[0] : NULL;
315,438,768✔
312
  SLimitInfo*    pLimitInfo = &pProjectInfo->limitInfo;
315,450,063✔
313

314
  if (downstream == NULL) {
315,451,749✔
315
    code = doGenerateSourceData(pOperator);
1,724,791✔
316
    QUERY_CHECK_CODE(code, lino, _end);
1,724,791✔
317

318
    if (pProjectInfo->outputIgnoreGroup) {
1,724,791✔
319
      pRes->info.id.groupId = 0;
1,724,791✔
320
    }
321

322
    *pResBlock = (pRes->info.rows > 0)? pRes:NULL;
1,724,791✔
323
    return code;
1,724,791✔
324
  }
325

326
  while (1) {
113,127,175✔
327
    while (1) {
8,876,264✔
328
      blockDataCleanup(pRes);
435,730,397✔
329

330
      // The downstream exec may change the value of the newgroup, so use a local variable instead.
331
      SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
435,693,940✔
332
      if (pBlock == NULL) {
433,311,771✔
333
        qDebug("set op close, exec %d, status %d rows %" PRId64 , pTaskInfo->execModel, pOperator->status, pFinalRes->info.rows);
143,562,307✔
334
        setOperatorCompleted(pOperator);
143,570,483✔
335
        break;
143,574,049✔
336
      }
337
//      if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {
338
//        qDebug("set status recv");
339
//        pOperator->status = OP_EXEC_RECV;
340
//      }
341

342
      // for stream interval
343
      if (pBlock->info.type == STREAM_RETRIEVE || pBlock->info.type == STREAM_DELETE_RESULT ||
289,749,464✔
344
          pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_CREATE_CHILD_TABLE ||
289,873,364✔
345
          pBlock->info.type == STREAM_CHECKPOINT || pBlock->info.type == STREAM_NOTIFY_EVENT) {
289,860,883✔
346

347
        *pResBlock = pBlock;
5,321✔
UNCOV
348
        return code;
×
349
      }
350

351
      if (pProjectInfo->inputIgnoreGroup) {
289,845,152✔
352
        pBlock->info.id.groupId = 0;
4,854,198✔
353
      }
354

355
      int32_t status = discardGroupDataBlock(pBlock, pLimitInfo);
289,843,248✔
356
      if (status == PROJECT_RETRIEVE_CONTINUE) {
289,812,467✔
357
        continue;
905,584✔
358
      }
359

360
      (void) setInfoForNewGroup(pBlock, pLimitInfo, pOperator);
288,906,883✔
361
      if (pOperator->status == OP_EXEC_DONE) {
288,896,497✔
362
        break;
227,697✔
363
      }
364

365
      if (pProjectInfo->mergeDataBlocks) {
288,714,295✔
366
        pFinalRes->info.scanFlag = scanFlag = pBlock->info.scanFlag;
124,350,217✔
367
      } else {
368
        pRes->info.scanFlag = scanFlag = pBlock->info.scanFlag;
164,389,505✔
369
      }
370

371
      code = setInputDataBlock(pSup, pBlock, order, scanFlag, false);
288,670,954✔
372
      QUERY_CHECK_CODE(code, lino, _end);
288,732,048✔
373

374
      code = blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
288,732,048✔
375
      QUERY_CHECK_CODE(code, lino, _end);
288,739,692✔
376

377
      code = projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs,
288,712,168✔
378
                                   pProjectInfo->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo));
288,739,692✔
379
      QUERY_CHECK_CODE(code, lino, _end);
288,701,475✔
380

381
      status = doIngroupLimitOffset(pLimitInfo, pBlock->info.id.groupId, pInfo->pRes, pOperator);
287,671,367✔
382
      if (status == PROJECT_RETRIEVE_CONTINUE) {
287,654,822✔
383
        continue;
7,970,680✔
384
      }
385

386
      break;
279,684,142✔
387
    }
388

389
    if (pProjectInfo->mergeDataBlocks) {
423,485,888✔
390
      if (pRes->info.rows > 0) {
214,301,313✔
391
        pFinalRes->info.id.groupId = 0;  // clear groupId
121,638,767✔
392
        pFinalRes->info.version = pRes->info.version;
121,638,543✔
393

394
        // continue merge data, ignore the group id
395
        code = blockDataMerge(pFinalRes, pRes);
121,638,543✔
396
        QUERY_CHECK_CODE(code, lino, _end);
121,638,767✔
397

398
        if (pFinalRes->info.rows + pRes->info.rows <= pOperator->resultInfo.threshold && (pOperator->status != OP_EXEC_DONE)) {
121,638,767✔
399
          continue;
113,091,322✔
400
        }
401
      }
402

403
      // do apply filter
404
      code = doFilter(pFinalRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
101,209,991✔
405
      QUERY_CHECK_CODE(code, lino, _end);
101,210,215✔
406

407
      // when apply the limit/offset for each group, pRes->info.rows may be 0, due to limit constraint.
408
      if (pFinalRes->info.rows > 0 || (pOperator->status == OP_EXEC_DONE)) {
101,210,215✔
409
        qDebug("project return %" PRId64 " rows, status %d", pFinalRes->info.rows, pOperator->status);
101,174,362✔
410
        break;
101,174,362✔
411
      }
412
    } else {
413
      // do apply filter
414
      if (pRes->info.rows > 0) {
209,192,012✔
415
        code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
157,978,462✔
416
        QUERY_CHECK_CODE(code, lino, _end);
157,941,280✔
417

418
        if (pRes->info.rows == 0) {
157,941,280✔
UNCOV
419
          continue;
×
420
        }
421
      }
422

423
      // no results generated
424
      break;
209,213,079✔
425
    }
426
  }
427

428
  SSDataBlock* p = pProjectInfo->mergeDataBlocks ? pFinalRes : pRes;
310,387,441✔
429
  pOperator->resultInfo.totalRows += p->info.rows;
310,339,237✔
430
  p->info.dataLoad = 1;
310,307,112✔
431

432
  if (pOperator->cost.openCost == 0) {
310,406,289✔
433
    pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
135,952,470✔
434
  }
435

436
  if (pProjectInfo->outputIgnoreGroup) {
310,369,988✔
437
    p->info.id.groupId = 0;
293,127,612✔
438
  }
439

440
  if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
310,281,351✔
441
    printDataBlock(p, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo), pTaskInfo->id.queryId);
5,878,394✔
442
  }
443

444
  *pResBlock = (p->info.rows > 0)? p:NULL;
310,383,310✔
445

446
_end:
311,422,925✔
447
  if (code != TSDB_CODE_SUCCESS) {
311,422,925✔
448
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
1,030,108✔
449
    pTaskInfo->code = code;
1,030,108✔
450
    T_LONG_JMP(pTaskInfo->env, code);
1,030,108✔
451
  }
452
  return code;
310,392,817✔
453
}
454

455
static int32_t resetIndefinitOutputOperState(SOperatorInfo* pOper) {
×
456
  SIndefOperatorInfo* pInfo = pOper->info;
×
457
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
×
458
  SIndefRowsFuncPhysiNode* pPhynode = (SIndefRowsFuncPhysiNode*)pOper->pPhyNode;
×
UNCOV
459
  pOper->status = OP_NOT_OPENED;
×
460

UNCOV
461
  resetBasicOperatorState(&pInfo->binfo);
×
462

463
  pInfo->groupId = 0;
×
464
  pInfo->pNextGroupRes = NULL;
×
465
  int32_t code = resetAggSup(&pOper->exprSupp, &pInfo->aggSup, pTaskInfo, pPhynode->pFuncs, NULL,
×
UNCOV
466
    sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
×
467
    &pTaskInfo->storageAPI.functionStore);
468
  if (code == 0){
×
UNCOV
469
    code = setFunctionResultOutput(pOper, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, pOper->exprSupp.numOfExprs);
×
470
  }
471

472
  if (code == 0) {
×
UNCOV
473
    code = resetExprSupp(&pInfo->scalarSup, pTaskInfo, pPhynode->pExprs, NULL,
×
474
                         &pTaskInfo->storageAPI.functionStore);
475
  }
UNCOV
476
  return 0;
×
477
}
478

479
int32_t createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode,
2,964,476✔
480
                                                 SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
481
  QRY_PARAM_CHECK(pOptrInfo);
2,964,476✔
482
  int32_t code = 0;
2,964,974✔
483
  int32_t lino = 0;
2,964,974✔
484
  int32_t numOfRows = 4096;
2,964,974✔
485
  size_t  keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
2,964,974✔
486

487
  SIndefOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIndefOperatorInfo));
2,964,974✔
488
  SOperatorInfo*      pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2,963,934✔
489
  if (pInfo == NULL || pOperator == NULL) {
2,964,058✔
490
    code = terrno;
×
UNCOV
491
    goto _error;
×
492
  }
493

494
  pOperator->pPhyNode = pNode;
2,964,476✔
495
  pOperator->pTaskInfo = pTaskInfo;
2,964,476✔
496

497
  SExprSupp* pSup = &pOperator->exprSupp;
2,964,476✔
498
  pSup->hasWindowOrGroup = false;
2,964,974✔
499

500
  SIndefRowsFuncPhysiNode* pPhyNode = (SIndefRowsFuncPhysiNode*)pNode;
2,964,476✔
501

502
  if (pPhyNode->pExprs != NULL) {
2,964,476✔
503
    int32_t    num = 0;
22,225✔
504
    SExprInfo* pSExpr = NULL;
22,225✔
505
    code = createExprInfo(pPhyNode->pExprs, NULL, &pSExpr, &num);
22,225✔
506
    QUERY_CHECK_CODE(code, lino, _error);
22,225✔
507

508
    code = initExprSupp(&pInfo->scalarSup, pSExpr, num, &pTaskInfo->storageAPI.functionStore);
22,225✔
509
    if (code != TSDB_CODE_SUCCESS) {
22,225✔
UNCOV
510
      goto _error;
×
511
    }
512
  }
513

514
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->node.pOutputDataBlockDesc);
2,964,476✔
515
  TSDB_CHECK_NULL(pResBlock, code, lino, _error, terrno);
2,964,974✔
516

517
  // Make sure the size of SSDataBlock will never exceed the size of 2MB.
518
  int32_t TWOMB = 2 * 1024 * 1024;
2,964,974✔
519
  if (numOfRows * pResBlock->info.rowSize > TWOMB) {
2,964,974✔
UNCOV
520
    numOfRows = TWOMB / pResBlock->info.rowSize;
×
521
  }
522

523
  initBasicInfo(&pInfo->binfo, pResBlock);
2,964,476✔
524
  initResultSizeInfo(&pOperator->resultInfo, numOfRows);
2,964,556✔
525
  code = blockDataEnsureCapacity(pResBlock, numOfRows);
2,964,866✔
526
  TSDB_CHECK_CODE(code, lino, _error);
2,964,341✔
527

528
  int32_t    numOfExpr = 0;
2,964,341✔
529
  SExprInfo* pExprInfo = NULL;
2,965,337✔
530
  code = createExprInfo(pPhyNode->pFuncs, NULL, &pExprInfo, &numOfExpr);
2,965,337✔
531
  TSDB_CHECK_CODE(code, lino, _error);
2,963,622✔
532

533
  code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str,
5,927,884✔
534
                            pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
2,964,476✔
535
  TSDB_CHECK_CODE(code, lino, _error);
2,963,764✔
536

537
  code = setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr);
2,963,764✔
538
  TSDB_CHECK_CODE(code, lino, _error);
2,962,997✔
539

540
  code = filterInitFromNode((SNode*)pPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
2,964,981✔
541
                            pTaskInfo->pStreamRuntimeInfo);
2,962,997✔
542
  TSDB_CHECK_CODE(code, lino, _error);
2,964,138✔
543

544
  pInfo->binfo.pRes = pResBlock;
2,964,138✔
545
  pInfo->binfo.inputTsOrder = pNode->inputTsOrder;
2,963,764✔
546
  pInfo->binfo.outputTsOrder = pNode->outputTsOrder;
2,964,120✔
547
  code = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr, &pInfo->pPseudoColInfo);
2,963,408✔
548
  TSDB_CHECK_CODE(code, lino, _error);
2,963,498✔
549

550
  setOperatorInfo(pOperator, "IndefinitOperator", QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC, false, OP_NOT_OPENED, pInfo,
2,963,498✔
551
                  pTaskInfo);
552
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doApplyIndefinitFunction, NULL, destroyIndefinitOperatorInfo,
2,964,352✔
553
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
554
                                         
555
  setOperatorResetStateFn(pOperator, resetIndefinitOutputOperState);
2,963,277✔
556
  code = appendDownstream(pOperator, &downstream, 1);
2,963,996✔
557
  if (code != TSDB_CODE_SUCCESS) {
2,964,421✔
UNCOV
558
    goto _error;
×
559
  }
560

561
  *pOptrInfo = pOperator;
2,964,421✔
562
  return TSDB_CODE_SUCCESS;
2,964,421✔
563

564
_error:
×
565
  if (pInfo != NULL) destroyIndefinitOperatorInfo(pInfo);
×
566
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
567
  pTaskInfo->code = code;
×
UNCOV
568
  return code;
×
569
}
570

571
static void doHandleDataBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, SOperatorInfo* downstream,
46,658,880✔
572
                              SExecTaskInfo* pTaskInfo) {
573
  SIndefOperatorInfo* pIndefInfo = pOperator->info;
46,658,880✔
574
  SOptrBasicInfo*     pInfo = &pIndefInfo->binfo;
46,658,880✔
575
  SExprSupp*          pSup = &pOperator->exprSupp;
46,658,880✔
576

577
  int32_t order = pInfo->inputTsOrder;
46,658,880✔
578
  int32_t scanFlag = pBlock->info.scanFlag;
46,658,880✔
579
  int32_t code = TSDB_CODE_SUCCESS;
46,659,378✔
580

581
  // there is an scalar expression that needs to be calculated before apply the group aggregation.
582
  SExprSupp* pScalarSup = &pIndefInfo->scalarSup;
46,659,378✔
583
  if (pScalarSup->pExprInfo != NULL) {
46,658,880✔
584
    code = projectApplyFunctions(pScalarSup->pExprInfo, pBlock, pBlock, pScalarSup->pCtx, pScalarSup->numOfExprs,
67,236✔
585
                                 pIndefInfo->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo));
67,236✔
586
    if (code != TSDB_CODE_SUCCESS) {
67,236✔
UNCOV
587
      T_LONG_JMP(pTaskInfo->env, code);
×
588
    }
589
  }
590

591
  code = setInputDataBlock(pSup, pBlock, order, scanFlag, false);
46,658,880✔
592
  if (code) {
46,658,880✔
UNCOV
593
    T_LONG_JMP(pTaskInfo->env, code);
×
594
  }
595

596
  code = blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
46,658,880✔
597
  if (code != TSDB_CODE_SUCCESS) {
46,657,895✔
UNCOV
598
    T_LONG_JMP(pTaskInfo->env, code);
×
599
  }
600

601
  code = projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs,
46,658,880✔
602
                               pIndefInfo->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo));
46,657,895✔
603
  if (code != TSDB_CODE_SUCCESS) {
46,659,378✔
604
    T_LONG_JMP(pTaskInfo->env, code);
17,201✔
605
  }
606
}
46,642,177✔
607

608
int32_t doApplyIndefinitFunction(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
10,169,381✔
609
  QRY_PARAM_CHECK(pResBlock);
10,169,381✔
610
  SIndefOperatorInfo* pIndefInfo = pOperator->info;
10,168,669✔
611
  SOptrBasicInfo*     pInfo = &pIndefInfo->binfo;
10,169,381✔
612
  SExprSupp*          pSup = &pOperator->exprSupp;
10,171,168✔
613
  int64_t             st = 0;
10,170,812✔
614
  int32_t             code = TSDB_CODE_SUCCESS;
10,170,812✔
615
  int32_t             lino = 0;
10,170,812✔
616
  SSDataBlock*        pRes = pInfo->pRes;
10,170,812✔
617

618
  blockDataCleanup(pRes);
10,167,317✔
619

620
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
10,170,598✔
621
  if (pOperator->status == OP_EXEC_DONE) {
10,170,235✔
622
    return code;
1,870,984✔
623
  }
624

625
  if (pOperator->cost.openCost == 0) {
8,299,472✔
626
    st = taosGetTimestampUs();
2,964,981✔
627
  }
628

629
  SOperatorInfo* downstream = pOperator->pDownstream[0];
8,300,326✔
630

631
  while (1) {
2,764,059✔
632
    // here we need to handle the existsed group results
633
    if (pIndefInfo->pNextGroupRes != NULL) {  // todo extract method
11,063,887✔
634
      for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
20,287,395✔
635
        SqlFunctionCtx* pCtx = &pSup->pCtx[k];
15,048,980✔
636

637
        SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
15,048,980✔
638
        pResInfo->initialized = false;
15,048,980✔
639
        pCtx->pOutput = NULL;
15,048,980✔
640
      }
641

642
      doHandleDataBlock(pOperator, pIndefInfo->pNextGroupRes, downstream, pTaskInfo);
5,238,415✔
643
      pIndefInfo->pNextGroupRes = NULL;
5,238,415✔
644
    }
645

646
    if (pInfo->pRes->info.rows < pOperator->resultInfo.threshold) {
11,063,531✔
647
      while (1) {
38,515,145✔
648
        // The downstream exec may change the value of the newgroup, so use a local variable instead.
649
        SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
49,575,537✔
650
        if (pBlock == NULL) {
49,580,242✔
651
          setOperatorCompleted(pOperator);
2,901,462✔
652
          break;
2,901,462✔
653
        }
654
        pInfo->pRes->info.scanFlag = pBlock->info.scanFlag;
46,678,780✔
655

656
        if (pIndefInfo->groupId == 0 && pBlock->info.id.groupId != 0) {
46,678,780✔
657
          pIndefInfo->groupId = pBlock->info.id.groupId;  // this is the initial group result
380,922✔
658
        } else {
659
          if (pIndefInfo->groupId != pBlock->info.id.groupId) {  // reset output buffer and computing status
46,298,356✔
660
            pIndefInfo->groupId = pBlock->info.id.groupId;
5,258,315✔
661
            pIndefInfo->pNextGroupRes = pBlock;
5,258,315✔
662
            break;
5,258,315✔
663
          }
664
        }
665

666
        doHandleDataBlock(pOperator, pBlock, downstream, pTaskInfo);
41,420,963✔
667
        if (pInfo->pRes->info.rows >= pOperator->resultInfo.threshold) {
41,403,762✔
668
          break;
2,887,763✔
669
        }
670
      }
671
    }
672

673
    code = doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
11,049,604✔
674
    QUERY_CHECK_CODE(code, lino, _end);
11,047,540✔
675

676
    size_t rows = pInfo->pRes->info.rows;
11,047,540✔
677
    if (rows > 0 || pOperator->status == OP_EXEC_DONE) {
11,047,540✔
678
      break;
679
    } else {
680
      blockDataCleanup(pInfo->pRes);
2,764,059✔
681
    }
682
  }
683

684
  size_t rows = pInfo->pRes->info.rows;
8,283,481✔
685
  pOperator->resultInfo.totalRows += rows;
8,283,481✔
686

687
  if (pOperator->cost.openCost == 0) {
8,282,129✔
688
    pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
2,948,136✔
689
  }
690

691
  *pResBlock = (rows > 0) ? pInfo->pRes : NULL;
8,282,769✔
692

693
_end:
8,283,125✔
694
  if (code != TSDB_CODE_SUCCESS) {
8,283,125✔
695
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
696
    pTaskInfo->code = code;
×
UNCOV
697
    T_LONG_JMP(pTaskInfo->env, code);
×
698
  }
699
  return code;
8,283,125✔
700
}
701

702
int32_t initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size) {
150,275,821✔
703
  int32_t code = TSDB_CODE_SUCCESS;
150,275,821✔
704
  for (int32_t j = 0; j < size; ++j) {
717,117,599✔
705
    struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(&pCtx[j]);
566,934,858✔
706
    if (isRowEntryInitialized(pResInfo) || fmIsPseudoColumnFunc(pCtx[j].functionId) || pCtx[j].functionId == -1 ||
616,009,797✔
707
        fmIsScalarFunc(pCtx[j].functionId)) {
49,073,153✔
708
      continue;
563,373,236✔
709
    }
710

711
    code = pCtx[j].fpSet.init(&pCtx[j], pCtx[j].resultInfo);
3,555,475✔
712
    if (code) {
3,468,904✔
UNCOV
713
      return code;
×
714
    }
715
  }
716

717
  return 0;
150,182,741✔
718
}
719

720
/*
721
 * The start of each column SResultRowEntryInfo is denote by RowCellInfoOffset.
722
 * Note that in case of top/bottom query, the whole multiple rows of result is treated as only one row of results.
723
 * +------------+-----------------result column 1------------+------------------result column 2-----------+
724
 * | SResultRow | SResultRowEntryInfo | intermediate buffer1 | SResultRowEntryInfo | intermediate buffer 2|
725
 * +------------+--------------------------------------------+--------------------------------------------+
726
 *           offset[0]                                  offset[1]                                   offset[2]
727
 */
728
// TODO refactor: some function move away
729
int32_t setFunctionResultOutput(struct SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t stage,
150,278,257✔
730
                             int32_t numOfExprs) {
731
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
150,278,257✔
732
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
150,313,600✔
733
  int32_t*        rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset;
150,286,136✔
734

735
  SResultRowInfo* pResultRowInfo = &pInfo->resultRowInfo;
150,282,779✔
736
  initResultRowInfo(pResultRowInfo);
150,275,403✔
737

738
  int64_t     tid = 0;
150,212,444✔
739
  int64_t     groupId = 0;
150,214,874✔
740
  SResultRow* pRow = doSetResultOutBufByKey(pSup->pResultBuf, pResultRowInfo, (char*)&tid, sizeof(tid), true, groupId,
150,214,874✔
741
                                            pTaskInfo, false, pSup, true);
742
  if (pRow == NULL || pTaskInfo->code != 0) {
150,276,456✔
743
    return pTaskInfo->code;
93,061✔
744
  }
745

746
  for (int32_t i = 0; i < numOfExprs; ++i) {
717,172,429✔
747
    struct SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, rowEntryInfoOffset);
566,943,176✔
748
    cleanupResultRowEntry(pEntry);
566,935,601✔
749

750
    pCtx[i].resultInfo = pEntry;
566,908,513✔
751
    pCtx[i].scanFlag = stage;
566,900,834✔
752
  }
753

754
  return initCtxOutputBuffer(pCtx, numOfExprs);
150,229,253✔
755
}
756

757
int32_t setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols, SArray** pResList) {
142,605,283✔
758
  QRY_PARAM_CHECK(pResList);
142,605,283✔
759
  SArray* pList = taosArrayInit(4, sizeof(int32_t));
142,626,801✔
760
  if (pList == NULL) {
142,574,648✔
UNCOV
761
    return terrno;
×
762
  }
763

764
  for (int32_t i = 0; i < numOfCols; ++i) {
680,458,106✔
765
    if (fmIsPseudoColumnFunc(pCtx[i].functionId) && !fmIsPlaceHolderFunc(pCtx[i].functionId)) {
537,995,950✔
766
      void* px = taosArrayPush(pList, &i);
×
767
      if (px == NULL) {
×
UNCOV
768
        return terrno;
×
769
      }
770
    }
771
  }
772

773
  *pResList = pList;
142,536,726✔
774
  return 0;
142,545,062✔
775
}
776

777
int32_t doGenerateSourceData(SOperatorInfo* pOperator) {
1,724,791✔
778
  SProjectOperatorInfo* pProjectInfo = pOperator->info;
1,724,791✔
779

780
  SExprSupp*   pSup = &pOperator->exprSupp;
1,724,791✔
781
  SSDataBlock* pRes = pProjectInfo->binfo.pRes;
1,724,791✔
782
  SExprInfo*   pExpr = pSup->pExprInfo;
1,724,791✔
783
  int64_t      st = taosGetTimestampUs();
1,724,791✔
784
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1,724,791✔
785

786
  int32_t code = blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);
1,724,791✔
787
  if (code) {
1,724,791✔
UNCOV
788
    return code;
×
789
  }
790

791
  for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
3,453,307✔
792
    int32_t outputSlotId = pExpr[k].base.resSchema.slotId;
1,728,516✔
793

794
    if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) {
1,728,516✔
795
      SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, outputSlotId);
1,728,516✔
796
      if (pColInfoData == NULL) {
1,728,516✔
UNCOV
797
        return terrno;
×
798
      }
799

800
      int32_t type = pExpr[k].base.pParam[0].param.nType;
1,728,516✔
801
      if (TSDB_DATA_TYPE_NULL == type) {
1,728,516✔
802
        colDataSetNNULL(pColInfoData, 0, 1);
803
      } else {
804
        code = colDataSetVal(pColInfoData, 0, taosVariantGet(&pExpr[k].base.pParam[0].param, type), false);
1,725,624✔
805
        if (code) {
1,725,624✔
UNCOV
806
          return code;
×
807
        }
808
      }
809
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) {
×
UNCOV
810
      SqlFunctionCtx* pfCtx = &pSup->pCtx[k];
×
811

812
      // UDF scalar functions will be calculated here, for example, select foo(n) from (select 1 n).
813
      // UDF aggregate functions will be handled in agg operator.
814
      if (fmIsScalarFunc(pfCtx->functionId)) {
×
815
        SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
×
816
        if (pBlockList == NULL) {
×
UNCOV
817
          return terrno;
×
818
        }
819

820
        void* px = taosArrayPush(pBlockList, &pRes);
×
821
        if (px == NULL) {
×
UNCOV
822
          return terrno;
×
823
        }
824

825
        SColumnInfoData* pResColData = taosArrayGet(pRes->pDataBlock, outputSlotId);
×
826
        if (pResColData == NULL) {
×
UNCOV
827
          return terrno;
×
828
        }
829

UNCOV
830
        SColumnInfoData  idata = {.info = pResColData->info, .hasNull = true};
×
831

832
        SScalarParam dest = {.columnData = &idata};
×
833
        gTaskScalarExtra.pStreamInfo = GET_STM_RTINFO(pOperator->pTaskInfo);
×
834
        gTaskScalarExtra.pStreamRange = NULL;
×
835
        code = scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest, &gTaskScalarExtra);
×
836
        if (code != TSDB_CODE_SUCCESS) {
×
837
          taosArrayDestroy(pBlockList);
×
UNCOV
838
          return code;
×
839
        }
840

841
        int32_t startOffset = pRes->info.rows;
×
842
        if (pRes->info.capacity <= 0) {
×
843
          qError("project failed at: %s:%d", __func__, __LINE__);
×
UNCOV
844
          return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
845
        }
846
        code = colDataAssign(pResColData, &idata, dest.numOfRows, &pRes->info);
×
847
        if (code) {
×
UNCOV
848
          return code;
×
849
        }
850

851
        colDataDestroy(&idata);
×
UNCOV
852
        taosArrayDestroy(pBlockList);
×
853
      } else {
UNCOV
854
        return TSDB_CODE_OPS_NOT_SUPPORT;
×
855
      }
856
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_OPERATOR) {
×
UNCOV
857
      TAOS_CHECK_RETURN(projectApplyOperator(&pExpr[k], pRes, NULL, outputSlotId, NULL, false, &gTaskScalarExtra));
×
858
    } else {
UNCOV
859
      return TSDB_CODE_OPS_NOT_SUPPORT;
×
860
    }
861
  }
862

863
  pRes->info.rows = 1;
1,724,791✔
864
  code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
1,724,791✔
865
  if (code) {
1,724,791✔
UNCOV
866
    pTaskInfo->code = code;
×
UNCOV
867
    return code;
×
868
  }
869

870
  (void) doIngroupLimitOffset(&pProjectInfo->limitInfo, 0, pRes, pOperator);
1,724,791✔
871

872
  pOperator->resultInfo.totalRows += pRes->info.rows;
1,724,791✔
873

874
  setOperatorCompleted(pOperator);
1,724,791✔
875
  if (pOperator->cost.openCost == 0) {
1,724,791✔
876
    pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
1,724,791✔
877
  }
878

879
  return code;
1,724,791✔
880
}
881

882
static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, SArray* pPseudoList) {
475,991,500✔
883
  size_t num = (pPseudoList != NULL) ? taosArrayGetSize(pPseudoList) : 0;
475,991,500✔
884
  for (int32_t i = 0; i < num; ++i) {
476,119,243✔
UNCOV
885
    pCtx[i].pOutput = taosArrayGet(pResult->pDataBlock, i);
×
UNCOV
886
    if (pCtx[i].pOutput == NULL) {
×
UNCOV
887
      qError("failed to get the output buf, ptr is null");
×
888
    }
889
  }
890
}
476,119,243✔
891

892
int32_t projectApplyColumn(SSDataBlock* pResult, SSDataBlock* pSrcBlock, int32_t outputSlotId, SqlFunctionCtx* pfCtx, int32_t* numOfRows, bool createNewColModel) {
1,024,749,233✔
893
  int32_t code = 0, lino = 0;
1,024,749,233✔
894
  SInputColumnInfoData* pInputData = &pfCtx->input;
1,024,749,233✔
895
  SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
1,024,771,910✔
896
  TSDB_CHECK_NULL(pColInfoData, code, lino, _exit, terrno);
1,024,819,900✔
897

898
  if (pResult->info.rows > 0 && !createNewColModel) {
1,024,819,900✔
899
    if (pInputData->pData[0] == NULL) {
3,378✔
900
      int32_t slotId = pfCtx->param[0].pCol->slotId;
3,378✔
901

902
      SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId);
3,378✔
903
      TSDB_CHECK_NULL(pInput, code, lino, _exit, terrno);
3,378✔
904

905
      TAOS_CHECK_EXIT(colDataMergeCol(pColInfoData, pResult->info.rows, (int32_t*)&pResult->info.capacity, pInput,
3,378✔
906
                            pSrcBlock->info.rows));
907
      *numOfRows = pSrcBlock->info.rows;
3,378✔
908
      return code;
3,378✔
909
    }
910
    
911
    TAOS_CHECK_EXIT(colDataMergeCol(pColInfoData, pResult->info.rows, (int32_t*)&pResult->info.capacity,
×
912
                          pInputData->pData[0], pInputData->numOfRows));
UNCOV
913
    *numOfRows = pInputData->numOfRows;
×
UNCOV
914
    return code;
×
915
  } 
916
  
917
  if (pInputData->pData[0] == NULL) {
1,024,829,315✔
918
    int32_t slotId = pfCtx->param[0].pCol->slotId;
3,340✔
919

920
    SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId);
3,340✔
921
    TSDB_CHECK_NULL(pInput, code, lino, _exit, terrno);
3,340✔
922

923
    TAOS_CHECK_EXIT(colDataAssign(pColInfoData, pInput, pSrcBlock->info.rows, &pResult->info));
3,340✔
924
    *numOfRows = pSrcBlock->info.rows;
3,340✔
925

926
    return code;
3,340✔
927
  }
928
  
929
  TAOS_CHECK_EXIT(colDataAssign(pColInfoData, pInputData->pData[0], pInputData->numOfRows, &pResult->info));
1,024,697,037✔
930
  *numOfRows = pInputData->numOfRows;
1,024,748,136✔
931

932
_exit:
1,024,772,489✔
933

934
  if (code) {
1,024,772,489✔
UNCOV
935
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
936
  }
937
  
938
  return code;
1,024,779,350✔
939
}
940

941

942
int32_t projectApplyValue(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, int32_t outputSlotId, int32_t* numOfRows, bool createNewColModel) {
18,817,740✔
943
  int32_t code = 0, lino = 0;
18,817,740✔
944
  SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
18,817,740✔
945
  TSDB_CHECK_NULL(pColInfoData, code, lino, _exit, terrno);
18,817,678✔
946

947
  int32_t offset = createNewColModel ? 0 : pResult->info.rows;
18,817,678✔
948
  int32_t type = pExpr->base.pParam[0].param.nType;
18,816,591✔
949
  if (TSDB_DATA_TYPE_NULL == type) {
18,812,580✔
950
    colDataSetNNULL(pColInfoData, offset, pSrcBlock->info.rows);
716,184✔
951
  } else {
952
    char* p = taosVariantGet(&pExpr->base.pParam[0].param, type);
18,096,396✔
953
    for (int32_t i = 0; i < pSrcBlock->info.rows; ++i) {
2,147,483,647✔
954
      TAOS_CHECK_EXIT(colDataSetVal(pColInfoData, i + offset, p, false));
2,147,483,647✔
955
    }
956
  }
957

958
  *numOfRows = pSrcBlock->info.rows;
4,153✔
959

960
_exit:
18,819,408✔
961

962
  if (code) {
18,819,408✔
UNCOV
963
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
964
  }
965
  
966
  return code;
18,818,746✔
967
}
968

969

970

971
int32_t projectApplyOperator(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, int32_t outputSlotId, int32_t* numOfRows, bool createNewColModel, const void* pExtraParams) {
79,602,216✔
972
  int32_t code = 0, lino = 0;
79,602,216✔
973
  SArray* pBlockList = NULL;
79,602,216✔
974
  if (NULL != pSrcBlock) {
79,602,216✔
975
    pBlockList = taosArrayInit(4, POINTER_BYTES);
79,603,772✔
976
    TSDB_CHECK_NULL(pBlockList, code, lino, _exit, terrno);
79,603,712✔
977

978
    void* px = taosArrayPush(pBlockList, &pSrcBlock);
79,599,118✔
979
    TSDB_CHECK_NULL(px, code, lino, _exit, terrno);
79,599,118✔
980
  }
981
  
982
  SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId);
79,597,562✔
983
  TSDB_CHECK_NULL(pResColData, code, lino, _exit, terrno);
79,601,112✔
984

985
  SColumnInfoData idata = {.info = pResColData->info, .hasNull = true};
79,601,112✔
986
  SScalarParam dest = {.columnData = &idata};
79,596,504✔
987
  gTaskScalarExtra.pStreamInfo = (void*)pExtraParams;
79,598,160✔
988
  gTaskScalarExtra.pStreamRange = NULL;
79,598,160✔
989
  TAOS_CHECK_EXIT(scalarCalculate(pExpr->pExpr->_optrRoot.pRootNode, pBlockList, &dest, &gTaskScalarExtra));
79,595,057✔
990

991
  if (pResult->info.rows > 0 && !createNewColModel) {
78,470,222✔
UNCOV
992
    code = colDataMergeCol(pResColData, pResult->info.rows, (int32_t*)&pResult->info.capacity, &idata, dest.numOfRows);
×
993
  } else {
994
    code = colDataAssign(pResColData, &idata, dest.numOfRows, &pResult->info);
78,473,829✔
995
  }
996

997
  colDataDestroy(&idata);
78,476,881✔
998
  TAOS_CHECK_EXIT(code);
78,474,309✔
999

1000
  if (numOfRows) {
78,474,309✔
1001
    *numOfRows = dest.numOfRows;
78,471,214✔
1002
  }
1003
  
1004
_exit:
79,607,898✔
1005

1006
  if (code < 0) {
79,602,200✔
1007
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
1,128,424✔
1008
  }
1009

1010
  taosArrayDestroy(pBlockList);
79,602,200✔
1011
  
1012
  return code;
79,599,117✔
1013
}
1014

1015

1016
int32_t projectApplyFunction(SqlFunctionCtx* pCtx, SqlFunctionCtx* pfCtx, SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, 
481,316,199✔
1017
                                    int32_t outputSlotId, int32_t* numOfRows, bool createNewColModel, const void* pExtraParams, 
1018
                                    SArray* pPseudoList, SArray** processByRowFunctionCtx, bool doSelectFunc) {
1019
  int32_t code = 0, lino = 0;
481,316,199✔
1020
  SArray* pBlockList = NULL;
481,316,199✔
1021
  SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId);
481,316,199✔
1022
  TSDB_CHECK_NULL(pResColData, code, lino, _exit, terrno);
481,399,912✔
1023

1024
  if (fmIsPlaceHolderFunc(pfCtx->functionId) && pExtraParams && pfCtx->pExpr->base.pParamList && 1 == pfCtx->pExpr->base.pParamList->length) {
481,399,912✔
1025
    TAOS_CHECK_EXIT(scalarAssignPlaceHolderRes(pResColData, pResult->info.rows, pSrcBlock->info.rows, pfCtx->functionId, pExtraParams));
2,789,434✔
1026
    *numOfRows = pSrcBlock->info.rows;
2,789,434✔
1027

1028
    return code;
2,789,434✔
1029
  }
1030

1031
  if (fmIsScalarFunc(pfCtx->functionId) || fmIsPlaceHolderFunc(pfCtx->functionId)) {
478,569,181✔
1032
    pBlockList = taosArrayInit(4, POINTER_BYTES);
353,854,917✔
1033
    TSDB_CHECK_NULL(pBlockList, code, lino, _exit, terrno);
353,808,581✔
1034

1035
    void* px = taosArrayPush(pBlockList, &pSrcBlock);
353,773,189✔
1036
    TSDB_CHECK_NULL(px, code, lino, _exit, terrno);
353,773,189✔
1037

1038
    SColumnInfoData idata = {.info = pResColData->info, .hasNull = true};
353,773,189✔
1039
    SScalarParam dest = {.columnData = &idata};
353,785,716✔
1040
    gTaskScalarExtra.pStreamInfo = (void*)pExtraParams;
353,845,675✔
1041
    gTaskScalarExtra.pStreamRange = NULL;
353,845,675✔
1042
    TAOS_CHECK_EXIT(scalarCalculate((SNode*)pExpr->pExpr->_function.pFunctNode, pBlockList, &dest, &gTaskScalarExtra));
353,837,438✔
1043

1044
    if (pResult->info.rows > 0 && !createNewColModel) {
353,638,714✔
1045
      code = colDataMergeCol(pResColData, pResult->info.rows, (int32_t*)&pResult->info.capacity, &idata, dest.numOfRows);
1,380✔
1046
    } else {
1047
      SColumnInfo oriInfo = pResColData->info;
353,804,531✔
1048
      code = colDataAssign(pResColData, &idata, dest.numOfRows, &pResult->info);
353,815,026✔
1049
      // restore the original column info to satisfy the output column schema
1050
      pResColData->info = oriInfo;
353,889,805✔
1051
    }
1052

1053
    colDataDestroy(&idata);
353,891,747✔
1054
    taosArrayDestroy(pBlockList);
353,777,069✔
1055
    TAOS_CHECK_EXIT(code);
353,794,259✔
1056

1057
    *numOfRows = dest.numOfRows;
353,794,259✔
1058

1059
    return code;
353,705,463✔
1060
  }
1061

1062
  if (fmIsIndefiniteRowsFunc(pfCtx->functionId)) {
124,693,261✔
1063
    SResultRowEntryInfo* pResInfo = GET_RES_INFO(pfCtx);
46,719,667✔
1064
    TAOS_CHECK_EXIT(pfCtx->fpSet.init(pfCtx, pResInfo));
46,719,169✔
1065

1066

1067
    pfCtx->pOutput = (char*)pResColData;
46,720,165✔
1068
    TSDB_CHECK_NULL(pfCtx->pOutput, code, lino, _exit, terrno);
46,720,165✔
1069

1070
    pfCtx->offset = createNewColModel ? 0 : pResult->info.rows;  // set the start offset
46,721,008✔
1071

1072
    // set the timestamp(_rowts) output buffer
1073
    if (taosArrayGetSize(pPseudoList) > 0) {
46,720,510✔
UNCOV
1074
      int32_t* outputColIndex = taosArrayGet(pPseudoList, 0);
×
UNCOV
1075
      TSDB_CHECK_NULL(outputColIndex, code, lino, _exit, terrno);
×
1076

UNCOV
1077
      pfCtx->pTsOutput = (SColumnInfoData*)pCtx[*outputColIndex].pOutput;
×
1078
    }
1079

1080
    // link pDstBlock to set selectivity value
1081
    if (pfCtx->subsidiaries.num > 0) {
46,721,008✔
1082
      pfCtx->pDstBlock = pResult;
43,037,127✔
1083
    }
1084

1085
    code = pfCtx->fpSet.process(pfCtx);
46,721,008✔
1086
    if (code != TSDB_CODE_SUCCESS) {
46,721,008✔
1087
      if (pfCtx->fpSet.cleanup != NULL) {
9,237✔
UNCOV
1088
        pfCtx->fpSet.cleanup(pfCtx);
×
1089
      }
1090
      TAOS_CHECK_EXIT(code);
9,237✔
1091
    }
1092

1093
    *numOfRows = pResInfo->numOfRes;
46,711,771✔
1094
    
1095
    if (fmIsProcessByRowFunc(pfCtx->functionId)) {
46,711,771✔
1096
      if (NULL == *processByRowFunctionCtx) {
44,264,228✔
1097
        *processByRowFunctionCtx = taosArrayInit(1, sizeof(SqlFunctionCtx*));
44,215,288✔
1098
        TSDB_CHECK_NULL(*processByRowFunctionCtx, code, lino, _exit, terrno);
44,215,288✔
1099
      }
1100

1101
      void* px = taosArrayPush(*processByRowFunctionCtx, &pfCtx);
44,264,228✔
1102
      TSDB_CHECK_NULL(px, code, lino, _exit, terrno);
44,264,228✔
1103
    }
1104

1105
    return code;
46,711,771✔
1106
  } 
1107

1108
  if (fmIsAggFunc(pfCtx->functionId)) {
77,973,594✔
1109
    // selective value output should be set during corresponding function execution
1110
    if (!doSelectFunc && fmIsSelectValueFunc(pfCtx->functionId)) {
77,973,594✔
1111
      return code;
43,204,760✔
1112
    }
1113
    
1114
    // _group_key function for "partition by tbname" + csum(col_name) query
1115
    int32_t slotId = pfCtx->param[0].pCol->slotId;
34,768,834✔
1116

1117
    // todo handle the json tag
1118
    SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId);
34,768,834✔
1119
    TSDB_CHECK_NULL(pInput, code, lino, _exit, terrno);
34,768,834✔
1120

1121
    for (int32_t f = 0; f < pSrcBlock->info.rows; ++f) {
2,147,483,647✔
1122
      bool isNull = colDataIsNull_s(pInput, f);
2,147,483,647✔
1123
      if (isNull) {
2,147,483,647✔
1124
        colDataSetNULL(pResColData, pResult->info.rows + f);
244,116,800✔
1125
      } else {
1126
        char* data = colDataGetData(pInput, f);
2,147,483,647✔
1127
        TAOS_CHECK_EXIT(colDataSetVal(pResColData, pResult->info.rows + f, data, isNull));
2,147,483,647✔
1128
      }
1129
    }
1130

1131
    *numOfRows = pSrcBlock->info.rows;
34,768,834✔
1132

1133
    return code;
34,768,834✔
1134
  } 
1135
  
UNCOV
1136
  if (fmIsGroupIdFunc(pfCtx->functionId)) {
×
1137
    for (int32_t f = 0; f < pSrcBlock->info.rows; ++f) {
×
UNCOV
1138
      TAOS_CHECK_EXIT(colDataSetVal(pResColData, pResult->info.rows + f, (const char*)&pSrcBlock->info.id.groupId, false));
×
1139
    }
1140

UNCOV
1141
    *numOfRows = pSrcBlock->info.rows;
×
UNCOV
1142
    return code;
×
1143
  }
1144
  
UNCOV
1145
_exit:
×
1146

1147
  if (code) {
42,285✔
1148
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
42,285✔
1149
  }
1150

1151
  taosArrayDestroy(pBlockList);
42,285✔
1152
  
1153
  return code;
42,285✔
1154
}
1155

1156

1157
int32_t projectApplyFunctionsWithSelect(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock,
475,998,159✔
1158
                                        SqlFunctionCtx* pCtx, int32_t numOfOutput, SArray* pPseudoList,
1159
                                        const void* pExtraParams, bool doSelectFunc, bool hasIndefRowsFunc) {
1160
  int32_t lino = 0;
475,998,159✔
1161
  int32_t code = TSDB_CODE_SUCCESS;
475,998,159✔
1162
  if (hasIndefRowsFunc) {
475,998,159✔
1163
    setPseudoOutputColInfo(pResult, pCtx, pPseudoList);
476,030,253✔
1164
  }
1165
  pResult->info.dataLoad = 1;
475,986,327✔
1166

1167
  SArray* processByRowFunctionCtx = NULL;
476,021,852✔
1168
  if (pSrcBlock == NULL) {
475,961,360✔
1169
    for (int32_t k = 0; k < numOfOutput; ++k) {
×
UNCOV
1170
      int32_t outputSlotId = pExpr[k].base.resSchema.slotId;
×
1171

1172
      if (pExpr[k].pExpr->nodeType != QUERY_NODE_VALUE) {
×
UNCOV
1173
        qError("project failed at: %s:%d", __func__, __LINE__);
×
UNCOV
1174
        TAOS_CHECK_EXIT(TSDB_CODE_INVALID_PARA);
×
1175
      }
UNCOV
1176
      SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
×
UNCOV
1177
      TSDB_CHECK_NULL(pColInfoData, code, lino, _exit, terrno);
×
1178

1179
      int32_t type = pExpr[k].base.pParam[0].param.nType;
×
1180
      if (TSDB_DATA_TYPE_NULL == type) {
×
1181
        colDataSetNNULL(pColInfoData, 0, 1);
1182
      } else {
UNCOV
1183
        TAOS_CHECK_EXIT(colDataSetVal(pColInfoData, 0, taosVariantGet(&pExpr[k].base.pParam[0].param, type), false));
×
1184
      }
1185
    }
1186

UNCOV
1187
    pResult->info.rows = 1;
×
UNCOV
1188
    goto _exit;
×
1189
  }
1190

1191
  if (pResult != pSrcBlock) {
475,961,360✔
1192
    pResult->info.id.groupId = pSrcBlock->info.id.groupId;
338,898,669✔
1193
    if (pSrcBlock->info.parTbName[0]) {
338,947,371✔
UNCOV
1194
      tstrncpy(pResult->info.parTbName, pSrcBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
×
1195
    }
1196
    qTrace("%s, parName:%s,groupId:%" PRIu64, __FUNCTION__, pSrcBlock->info.parTbName, pResult->info.id.groupId);
339,019,504✔
1197
  }
1198

1199
  // if the source equals to the destination, it is to create a new column as the result of scalar
1200
  // function or some operators.
1201
  bool createNewColModel = (pResult == pSrcBlock);
475,941,677✔
1202
  if (createNewColModel) {
475,941,677✔
1203
    TAOS_CHECK_EXIT(blockDataEnsureCapacity(pResult, pResult->info.rows));
137,102,781✔
1204
  }
1205

1206
  int32_t numOfRows = 0;
475,945,795✔
1207

1208
  for (int32_t k = 0; k < numOfOutput; ++k) {
2,079,123,934✔
1209
    int32_t               outputSlotId = pExpr[k].base.resSchema.slotId;
1,604,329,726✔
1210
    SqlFunctionCtx*       pfCtx = &pCtx[k];
1,604,547,058✔
1211
    switch (pExpr[k].pExpr->nodeType) {
1,604,557,439✔
1212
      case QUERY_NODE_COLUMN: {
1,024,734,533✔
1213
        TAOS_CHECK_EXIT(projectApplyColumn(pResult, pSrcBlock, outputSlotId, pfCtx, &numOfRows, createNewColModel));
1,024,734,533✔
1214
        break;
1,024,791,370✔
1215
      } 
1216
      case QUERY_NODE_VALUE: {
18,817,175✔
1217
        TAOS_CHECK_EXIT(projectApplyValue(&pExpr[k], pResult, pSrcBlock, outputSlotId, &numOfRows, createNewColModel));
18,817,175✔
1218
        break;
18,818,746✔
1219
      } 
1220
      case QUERY_NODE_OPERATOR: {
79,603,753✔
1221
        TAOS_CHECK_EXIT(projectApplyOperator(&pExpr[k], pResult, pSrcBlock, outputSlotId, &numOfRows, createNewColModel, pExtraParams));
79,603,753✔
1222
        break;
78,471,766✔
1223
      } 
1224
      case QUERY_NODE_FUNCTION: {
481,401,277✔
1225
        TAOS_CHECK_EXIT(projectApplyFunction(pCtx, pfCtx, &pExpr[k], pResult, pSrcBlock, outputSlotId, &numOfRows, createNewColModel, pExtraParams, pPseudoList, &processByRowFunctionCtx, doSelectFunc));
481,401,277✔
1226
        break;
481,155,140✔
1227
      }
UNCOV
1228
      default: {
×
UNCOV
1229
        qError("invalid project expr nodeType:%d", pExpr[k].pExpr->nodeType);
×
UNCOV
1230
        TAOS_CHECK_EXIT(TSDB_CODE_OPS_NOT_SUPPORT);
×
1231
      }
1232
    }
1233
  }
1234

1235
  if (processByRowFunctionCtx && taosArrayGetSize(processByRowFunctionCtx) > 0) {
474,794,208✔
1236
    SqlFunctionCtx** pfCtx = taosArrayGet(processByRowFunctionCtx, 0);
44,215,288✔
1237
    TSDB_CHECK_NULL(pfCtx, code, lino, _exit, terrno);
44,215,288✔
1238

1239
    TAOS_CHECK_EXIT((*pfCtx)->fpSet.processFuncByRow(processByRowFunctionCtx));
44,215,288✔
1240
    numOfRows = (*pfCtx)->resultInfo->numOfRes;
44,207,324✔
1241
  }
1242

1243
  if (!createNewColModel) {
474,837,376✔
1244
    pResult->info.rows += numOfRows;
337,901,551✔
1245
  }
1246

1247
_exit:
476,056,769✔
1248
  if (processByRowFunctionCtx) {
476,048,814✔
1249
    taosArrayDestroy(processByRowFunctionCtx);
44,215,288✔
1250
  }
1251
  if (code) {
476,048,814✔
1252
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
1,178,673✔
1253
  }
1254
  return code;
476,048,814✔
1255
}
1256

1257
int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
476,021,589✔
1258
                              int32_t numOfOutput, SArray* pPseudoList, const void* pExtraParams) {
1259
  return projectApplyFunctionsWithSelect(pExpr, pResult, pSrcBlock, pCtx, numOfOutput, pPseudoList, pExtraParams, false, true);
476,021,589✔
1260
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc