• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4997

20 Mar 2026 06:10AM UTC coverage: 71.739% (-0.3%) from 72.069%
#4997

push

travis-ci

web-flow
feat: add query phase tracking for SHOW QUERIES (#34706)

148 of 183 new or added lines in 10 files covered. (80.87%)

9273 existing lines in 172 files now uncovered.

244572 of 340921 relevant lines covered (71.74%)

133392941.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.27
/source/libs/executor/src/projectoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "functionMgt.h"
19
#include "operator.h"
20
#include "querytask.h"
21
#include "taoserror.h"
22
#include "tdatablock.h"
23

24
typedef struct SProjectOperatorInfo {
25
  SOptrBasicInfo binfo;
26
  SAggSupporter  aggSup;
27
  SArray*        pPseudoColInfo;
28
  SLimitInfo     limitInfo;
29
  bool           mergeDataBlocks;
30
  SSDataBlock*   pFinalRes;
31
  bool           inputIgnoreGroup;
32
  bool           outputIgnoreGroup;
33
} SProjectOperatorInfo;
34

35
typedef struct SIndefOperatorInfo {
36
  SOptrBasicInfo binfo;
37
  SAggSupporter  aggSup;
38
  SArray*        pPseudoColInfo;
39
  SExprSupp      scalarSup;
40
  uint64_t       groupId;
41
  SSDataBlock*   pNextGroupRes;
42
} SIndefOperatorInfo;
43

44
static int32_t      doGenerateSourceData(SOperatorInfo* pOperator);
45
static int32_t      doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
46
static int32_t      doApplyIndefinitFunction(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
47
int32_t projectApplyOperator(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, int32_t outputSlotId, int32_t* numOfRows, bool createNewColModel, const void* pExtraParams);
48

49
static void destroyProjectOperatorInfo(void* param) {
141,651,302✔
50
  if (NULL == param) {
141,651,302✔
51
    return;
×
52
  }
53

54
  SProjectOperatorInfo* pInfo = (SProjectOperatorInfo*)param;
141,651,302✔
55
  cleanupBasicInfo(&pInfo->binfo);
141,651,302✔
56
  cleanupAggSup(&pInfo->aggSup);
141,652,801✔
57
  taosArrayDestroy(pInfo->pPseudoColInfo);
141,634,906✔
58

59
  blockDataDestroy(pInfo->pFinalRes);
141,626,759✔
60
  taosMemoryFreeClear(param);
141,645,610✔
61
}
62

63
static void destroyIndefinitOperatorInfo(void* param) {
3,060,043✔
64
  SIndefOperatorInfo* pInfo = (SIndefOperatorInfo*)param;
3,060,043✔
65
  if (pInfo == NULL) {
3,060,043✔
66
    return;
×
67
  }
68

69
  cleanupBasicInfo(&pInfo->binfo);
3,060,043✔
70
  taosArrayDestroy(pInfo->pPseudoColInfo);
3,059,386✔
71
  cleanupAggSup(&pInfo->aggSup);
3,059,386✔
72
  cleanupExprSupp(&pInfo->scalarSup);
3,059,386✔
73

74
  taosMemoryFreeClear(param);
3,060,925✔
75
}
76

77
void streamOperatorReleaseState(SOperatorInfo* pOperator) {
×
78
  SOperatorInfo* downstream = pOperator->pDownstream[0];
×
79
  if (downstream->fpSet.releaseStreamStateFn) {
×
80
    downstream->fpSet.releaseStreamStateFn(downstream);
×
81
  }
82
}
×
83

84
void streamOperatorReloadState(SOperatorInfo* pOperator) {
×
85
  SOperatorInfo* downstream = pOperator->pDownstream[0];
×
86
  if (downstream->fpSet.reloadStreamStateFn) {
×
87
    downstream->fpSet.reloadStreamStateFn(downstream);
×
88
  }
89
}
×
90

91
static int32_t resetProjectOperState(SOperatorInfo* pOper) {
8,254,802✔
92
  SProjectOperatorInfo* pProject = pOper->info;
8,254,802✔
93
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
8,255,037✔
94
  pOper->status = OP_NOT_OPENED;
8,254,802✔
95

96
  resetBasicOperatorState(&pProject->binfo);
8,255,037✔
97
  SProjectPhysiNode* pPhynode = (SProjectPhysiNode*)pOper->pPhyNode;
8,255,037✔
98

99
  pProject->limitInfo = (SLimitInfo){0};
8,255,272✔
100
  initLimitInfo(pPhynode->node.pLimit, pPhynode->node.pSlimit, &pProject->limitInfo);
8,255,037✔
101

102
  blockDataCleanup(pProject->pFinalRes);
8,255,037✔
103

104
  int32_t code = resetAggSup(&pOper->exprSupp, &pProject->aggSup, pTaskInfo, pPhynode->pProjections, NULL,
16,507,699✔
105
    sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, NULL,
8,255,272✔
106
    &pTaskInfo->storageAPI.functionStore);
107
  if (code == 0){
8,253,198✔
108
    code = setFunctionResultOutput(pOper, &pProject->binfo, &pProject->aggSup, MAIN_SCAN, pOper->exprSupp.numOfExprs);
8,254,118✔
109
  }
110
  return 0;
8,252,893✔
111
}
112

113
int32_t createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, SExecTaskInfo* pTaskInfo,
141,626,142✔
114
                                  SOperatorInfo** pOptrInfo) {
115
  QRY_PARAM_CHECK(pOptrInfo);
141,626,142✔
116

117
  int32_t code = TSDB_CODE_SUCCESS;
141,641,690✔
118
  SProjectOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SProjectOperatorInfo));
141,641,690✔
119
  SOperatorInfo*        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
141,508,907✔
120
  if (pInfo == NULL || pOperator == NULL) {
141,578,573✔
121
    code = terrno;
43✔
122
    goto _error;
×
123
  }
124

125
  pOperator->pPhyNode = pProjPhyNode;
141,580,150✔
126
  pOperator->exprSupp.hasWindowOrGroup = false;
141,615,555✔
127
  pOperator->pTaskInfo = pTaskInfo;
141,614,435✔
128
  initOperatorCostInfo(pOperator);
141,591,241✔
129

130
  int32_t    lino = 0;
141,640,610✔
131

132
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pProjPhyNode->node.pOutputDataBlockDesc);
141,640,610✔
133
  TSDB_CHECK_NULL(pResBlock, code, lino, _error, terrno);
141,668,133✔
134

135
  initLimitInfo(pProjPhyNode->node.pLimit, pProjPhyNode->node.pSlimit, &pInfo->limitInfo);
141,668,133✔
136

137
  pInfo->binfo.pRes = pResBlock;
141,663,732✔
138
  pInfo->pFinalRes = NULL;
141,664,410✔
139

140
  code = createOneDataBlock(pResBlock, false, &pInfo->pFinalRes);
141,641,593✔
141
  TSDB_CHECK_CODE(code, lino, _error);
141,629,440✔
142

143
  pInfo->binfo.inputTsOrder = pProjPhyNode->node.inputTsOrder;
141,629,440✔
144
  pInfo->binfo.outputTsOrder = pProjPhyNode->node.outputTsOrder;
141,628,498✔
145
  pInfo->inputIgnoreGroup = pProjPhyNode->inputIgnoreGroup;
141,630,134✔
146
  pInfo->outputIgnoreGroup = pProjPhyNode->ignoreGroupId;
141,625,804✔
147

148
  if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {
141,621,676✔
149
    pInfo->mergeDataBlocks = false;
330,677✔
150
  } else {
151
    if (!pProjPhyNode->ignoreGroupId) {
141,223,364✔
152
      pInfo->mergeDataBlocks = false;
2,111,930✔
153
    } else {
154
      pInfo->mergeDataBlocks = pProjPhyNode->mergeDataBlock;
139,163,908✔
155
    }
156
  }
157

158
  int32_t numOfRows = 4096;
141,634,582✔
159
  size_t  keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
141,634,582✔
160

161
  // Make sure the size of SSDataBlock will never exceed the size of 2MB.
162
  int32_t TWOMB = 2 * 1024 * 1024;
141,634,582✔
163
  if (numOfRows * pResBlock->info.rowSize > TWOMB) {
141,634,582✔
164
    numOfRows = TWOMB / pResBlock->info.rowSize;
4,517,976✔
165
  }
166

167
  initResultSizeInfo(&pOperator->resultInfo, numOfRows);
141,631,553✔
168
  
169
  int32_t    numOfCols = 0;
141,621,238✔
170
  SExprInfo* pExprInfo = NULL;
141,623,967✔
171
  code = createExprInfo(pProjPhyNode->pProjections, NULL, &pExprInfo, &numOfCols);
141,624,366✔
172
  TSDB_CHECK_CODE(code, lino, _error);
141,634,766✔
173
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
141,355,325✔
174
                    NULL, &pTaskInfo->storageAPI.functionStore);
175
  TSDB_CHECK_CODE(code, lino, _error);
141,320,344✔
176

177
  initBasicInfo(&pInfo->binfo, pResBlock);
141,320,344✔
178
  code = setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfCols);
141,311,532✔
179
  TSDB_CHECK_CODE(code, lino, _error);
141,319,731✔
180

181
  code = filterInitFromNode((SNode*)pProjPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
141,329,938✔
182
                            pTaskInfo->pStreamRuntimeInfo);
141,319,731✔
183
  TSDB_CHECK_CODE(code, lino, _error);
141,312,776✔
184

185
  code = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfCols, &pInfo->pPseudoColInfo);
141,312,776✔
186
  TSDB_CHECK_CODE(code, lino, _error);
141,292,308✔
187

188
  setOperatorInfo(pOperator, "ProjectOperator", QUERY_NODE_PHYSICAL_PLAN_PROJECT, false, OP_NOT_OPENED, pInfo,
141,292,308✔
189
                  pTaskInfo);
190
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doProjectOperation, NULL, destroyProjectOperatorInfo,
141,339,411✔
191
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
192
  setOperatorStreamStateFn(pOperator, streamOperatorReleaseState, streamOperatorReloadState);
141,272,498✔
193
  setOperatorResetStateFn(pOperator, resetProjectOperState);
141,334,041✔
194

195
  if (NULL != downstream) {
141,341,660✔
196
    code = appendDownstream(pOperator, &downstream, 1);
139,554,601✔
197
    if (code != TSDB_CODE_SUCCESS) {
139,495,566✔
UNCOV
198
      goto _error;
×
199
    }
200
  }
201

202
  *pOptrInfo = pOperator;
141,282,625✔
203
  return TSDB_CODE_SUCCESS;
141,311,879✔
204

205
_error:
279,441✔
206
  if (pInfo != NULL) destroyProjectOperatorInfo(pInfo);
279,441✔
207
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
279,441✔
208
  pTaskInfo->code = code;
279,441✔
209
  return code;
279,441✔
210
}
211

212
static int32_t discardGroupDataBlock(SSDataBlock* pBlock, SLimitInfo* pLimitInfo) {
278,766,352✔
213
  if (pLimitInfo->remainGroupOffset > 0) {
278,766,352✔
214
    // it is the first group
215
    if (pLimitInfo->currentGroupId == 0 || pLimitInfo->currentGroupId == pBlock->info.id.groupId) {
977,556✔
216
      pLimitInfo->currentGroupId = pBlock->info.id.groupId;
255,450✔
217
      return PROJECT_RETRIEVE_CONTINUE;
255,450✔
218
    } else if (pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
722,106✔
219
      // now it is the data from a new group
220
      pLimitInfo->remainGroupOffset -= 1;
722,106✔
221
      pLimitInfo->currentGroupId = pBlock->info.id.groupId;
722,106✔
222

223
      // ignore data block in current group
224
      if (pLimitInfo->remainGroupOffset > 0) {
722,106✔
225
        return PROJECT_RETRIEVE_CONTINUE;
679,406✔
226
      }
227

228
      pLimitInfo->currentGroupId = 0;
42,700✔
229
    }
230
  }
231

232
  return PROJECT_RETRIEVE_DONE;
277,843,070✔
233
}
234

235
static int32_t setInfoForNewGroup(SSDataBlock* pBlock, SLimitInfo* pLimitInfo, SOperatorInfo* pOperator) {
277,841,920✔
236
  // remainGroupOffset == 0
237
  // here check for a new group data, we need to handle the data of the previous group.
238
  if (!(pLimitInfo->remainGroupOffset == 0 || pLimitInfo->remainGroupOffset == -1)) {
277,841,920✔
239
    qError("project failed at: %s:%d", __func__, __LINE__);
×
UNCOV
240
    return TSDB_CODE_INVALID_PARA;
×
241
  }
242

243
  bool newGroup = false;
277,906,085✔
244
  if (0 == pBlock->info.id.groupId) {
277,906,085✔
245
    pLimitInfo->numOfOutputGroups = 1;
244,212,870✔
246
  } else if (pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
33,606,982✔
247
    pLimitInfo->currentGroupId = pBlock->info.id.groupId;
29,513,917✔
248
    pLimitInfo->numOfOutputGroups += 1;
29,513,290✔
249
    newGroup = true;
29,510,155✔
250
  } else {
251
    return PROJECT_RETRIEVE_CONTINUE;
4,099,335✔
252
  }
253

254
  if ((pLimitInfo->slimit.limit >= 0) && (pLimitInfo->slimit.limit < pLimitInfo->numOfOutputGroups)) {
273,767,523✔
255
    setOperatorCompleted(pOperator);
141,015✔
256
    return PROJECT_RETRIEVE_DONE;
141,015✔
257
  }
258

259
  // reset the value for a new group data
260
  // existing rows that belongs to previous group.
261
  if (newGroup) {
273,632,100✔
262
    resetLimitInfoForNextGroup(pLimitInfo);
29,367,886✔
263
  }
264

265
  return PROJECT_RETRIEVE_CONTINUE;
273,561,837✔
266
}
267

268
// todo refactor
269
static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SSDataBlock* pBlock,
278,454,324✔
270
                                    SOperatorInfo* pOperator) {
271
  // set current group id
272
  pLimitInfo->currentGroupId = groupId;
278,454,324✔
273
  bool limitReached = applyLimitOffset(pLimitInfo, pBlock, pOperator->pTaskInfo);
278,515,573✔
274
  if (pBlock->info.rows == 0 && 0 != pLimitInfo->limit.limit) {
278,497,832✔
275
    return PROJECT_RETRIEVE_CONTINUE;
4,725,037✔
276
  } else {
277
    if (limitReached && (pLimitInfo->slimit.limit >= 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
273,768,431✔
278
      setOperatorCompleted(pOperator);
80,167✔
279
    } else if (limitReached && groupId == 0) {
273,687,768✔
280
      setOperatorCompleted(pOperator);
8,096,075✔
281
    }
282
  }
283

284
  return PROJECT_RETRIEVE_DONE;
273,683,179✔
285
}
286

287
int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
382,838,520✔
288
  QRY_PARAM_CHECK(pResBlock);
382,838,520✔
289

290
  SProjectOperatorInfo* pProjectInfo = pOperator->info;
382,906,945✔
291
  SOptrBasicInfo*       pInfo = &pProjectInfo->binfo;
382,904,778✔
292
  SExprSupp*            pSup = &pOperator->exprSupp;
382,914,657✔
293
  SSDataBlock*          pRes = pInfo->pRes;
382,902,977✔
294
  SSDataBlock*          pFinalRes = pProjectInfo->pFinalRes;
382,923,928✔
295
  int32_t               code = 0;
382,935,231✔
296
  int32_t               lino = 0;
382,935,231✔
297
  int32_t               order = pInfo->inputTsOrder;
382,935,231✔
298
  int32_t               scanFlag = 0;
382,930,292✔
299

300
  blockDataCleanup(pFinalRes);
382,930,292✔
301
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
382,893,308✔
302

303
  if (pOperator->status == OP_EXEC_DONE) {
382,858,839✔
304
    return code;
80,226,907✔
305
  }
306

307
  SOperatorInfo* downstream = pOperator->numOfDownstream > 0 ? pOperator->pDownstream[0] : NULL;
302,674,557✔
308
  SLimitInfo*    pLimitInfo = &pProjectInfo->limitInfo;
302,714,855✔
309

310
  if (downstream == NULL) {
302,729,931✔
311
    code = doGenerateSourceData(pOperator);
1,797,630✔
312
    QUERY_CHECK_CODE(code, lino, _end);
1,797,630✔
313

314
    if (pProjectInfo->outputIgnoreGroup) {
1,797,630✔
315
      pRes->info.id.groupId = 0;
1,797,630✔
316
    }
317

318
    *pResBlock = (pRes->info.rows > 0)? pRes:NULL;
1,797,630✔
319
    return code;
1,797,630✔
320
  }
321

322
  while (1) {
114,403,739✔
323
    while (1) {
5,659,893✔
324
      blockDataCleanup(pRes);
420,995,933✔
325

326
      // The downstream exec may change the value of the newgroup, so use a local variable instead.
327
      SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
420,930,894✔
328
      if (pBlock == NULL) {
417,879,560✔
329
        qDebug("set op close, exec %d, status %d rows %" PRId64 , pTaskInfo->execModel, pOperator->status, pFinalRes->info.rows);
139,144,279✔
330
        setOperatorCompleted(pOperator);
139,151,070✔
331
        break;
139,148,725✔
332
      }
333
//      if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {
334
//        qDebug("set status recv");
335
//        pOperator->status = OP_EXEC_RECV;
336
//      }
337

338
      if (pProjectInfo->inputIgnoreGroup) {
278,735,281✔
339
        pBlock->info.id.groupId = 0;
4,988,564✔
340
      }
341

342
      int32_t status = discardGroupDataBlock(pBlock, pLimitInfo);
278,750,023✔
343
      if (status == PROJECT_RETRIEVE_CONTINUE) {
278,776,035✔
344
        continue;
934,856✔
345
      }
346

347
      (void) setInfoForNewGroup(pBlock, pLimitInfo, pOperator);
277,841,179✔
348
      if (pOperator->status == OP_EXEC_DONE) {
277,842,661✔
349
        break;
141,015✔
350
      }
351

352
      if (pProjectInfo->mergeDataBlocks) {
277,784,859✔
353
        pFinalRes->info.scanFlag = scanFlag = pBlock->info.scanFlag;
125,300,790✔
354
      } else {
355
        pRes->info.scanFlag = scanFlag = pBlock->info.scanFlag;
152,480,090✔
356
      }
357

358
      code = setInputDataBlock(pSup, pBlock, order, scanFlag, false);
277,687,984✔
359
      QUERY_CHECK_CODE(code, lino, _end);
277,779,368✔
360

361
      code = blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
277,779,368✔
362
      QUERY_CHECK_CODE(code, lino, _end);
277,752,945✔
363

364
      code = projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs,
277,670,839✔
365
                                   pProjectInfo->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo));
277,752,945✔
366
      QUERY_CHECK_CODE(code, lino, _end);
277,734,205✔
367

368
      status = doIngroupLimitOffset(pLimitInfo, pBlock->info.id.groupId, pInfo->pRes, pOperator);
276,654,965✔
369
      if (status == PROJECT_RETRIEVE_CONTINUE) {
276,608,250✔
370
        continue;
4,725,037✔
371
      }
372

373
      break;
271,883,213✔
374
    }
375

376
    if (pProjectInfo->mergeDataBlocks) {
411,172,953✔
377
      if (pRes->info.rows > 0) {
216,979,667✔
378
        pFinalRes->info.id.groupId = 0;  // clear groupId
122,183,778✔
379
        pFinalRes->info.version = pRes->info.version;
122,183,778✔
380

381
        // continue merge data, ignore the group id
382
        code = blockDataMerge(pFinalRes, pRes);
122,183,543✔
383
        QUERY_CHECK_CODE(code, lino, _end);
122,183,778✔
384

385
        if (pFinalRes->info.rows + pRes->info.rows <= pOperator->resultInfo.threshold && (pOperator->status != OP_EXEC_DONE)) {
122,183,778✔
386
          continue;
114,367,193✔
387
        }
388
      }
389

390
      // do apply filter
391
      code = doFilter(pFinalRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
102,612,983✔
392
      QUERY_CHECK_CODE(code, lino, _end);
102,612,709✔
393

394
      // when apply the limit/offset for each group, pRes->info.rows may be 0, due to limit constraint.
395
      if (pFinalRes->info.rows > 0 || (pOperator->status == OP_EXEC_DONE)) {
102,612,709✔
396
        qDebug("project return %" PRId64 " rows, status %d", pFinalRes->info.rows, pOperator->status);
102,576,163✔
397
        break;
102,575,928✔
398
      }
399
    } else {
400
      // do apply filter
401
      if (pRes->info.rows > 0) {
194,246,589✔
402
        code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
149,675,464✔
403
        QUERY_CHECK_CODE(code, lino, _end);
149,630,515✔
404

405
        if (pRes->info.rows == 0) {
149,630,515✔
UNCOV
406
          continue;
×
407
        }
408
      }
409

410
      // no results generated
411
      break;
194,225,949✔
412
    }
413
  }
414

415
  SSDataBlock* p = pProjectInfo->mergeDataBlocks ? pFinalRes : pRes;
296,801,877✔
416
  p->info.dataLoad = 1;
296,842,399✔
417

418
  if (pProjectInfo->outputIgnoreGroup) {
296,788,288✔
419
    p->info.id.groupId = 0;
281,232,920✔
420
  }
421

422
  if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
296,766,316✔
423
    printDataBlock(p, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo), pTaskInfo->id.queryId);
5,788,186✔
424
  }
425

426
  *pResBlock = (p->info.rows > 0)? p:NULL;
296,784,190✔
427

428
_end:
297,858,572✔
429
  if (code != TSDB_CODE_SUCCESS) {
297,858,572✔
430
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
1,079,240✔
431
    pTaskInfo->code = code;
1,079,240✔
432
    T_LONG_JMP(pTaskInfo->env, code);
1,079,240✔
433
  }
434
  return code;
296,779,332✔
435
}
436

UNCOV
437
static int32_t resetIndefinitOutputOperState(SOperatorInfo* pOper) {
×
UNCOV
438
  SIndefOperatorInfo* pInfo = pOper->info;
×
UNCOV
439
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
×
UNCOV
440
  SIndefRowsFuncPhysiNode* pPhynode = (SIndefRowsFuncPhysiNode*)pOper->pPhyNode;
×
UNCOV
441
  pOper->status = OP_NOT_OPENED;
×
442

UNCOV
443
  resetBasicOperatorState(&pInfo->binfo);
×
444

UNCOV
445
  pInfo->groupId = 0;
×
446
  pInfo->pNextGroupRes = NULL;
×
447
  int32_t code = resetAggSup(&pOper->exprSupp, &pInfo->aggSup, pTaskInfo, pPhynode->pFuncs, NULL,
×
448
    sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, NULL,
×
449
    &pTaskInfo->storageAPI.functionStore);
450
  if (code == 0){
×
UNCOV
451
    code = setFunctionResultOutput(pOper, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, pOper->exprSupp.numOfExprs);
×
452
  }
453

454
  if (code == 0) {
×
455
    code = resetExprSupp(&pInfo->scalarSup, pTaskInfo, pPhynode->pExprs, NULL,
×
456
                         &pTaskInfo->storageAPI.functionStore);
457
  }
UNCOV
458
  return 0;
×
459
}
460

461
int32_t createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode,
3,060,925✔
462
                                                 SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
463
  QRY_PARAM_CHECK(pOptrInfo);
3,060,925✔
464
  int32_t code = 0;
3,060,925✔
465
  int32_t lino = 0;
3,060,925✔
466
  int32_t numOfRows = 4096;
3,060,925✔
467
  size_t  keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
3,060,925✔
468

469
  SIndefOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIndefOperatorInfo));
3,060,925✔
470
  SOperatorInfo*      pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
3,058,907✔
471
  if (pInfo == NULL || pOperator == NULL) {
3,059,916✔
UNCOV
472
    code = terrno;
×
UNCOV
473
    goto _error;
×
474
  }
475

476
  pOperator->pPhyNode = pNode;
3,060,925✔
477
  pOperator->pTaskInfo = pTaskInfo;
3,060,925✔
478
  initOperatorCostInfo(pOperator);
3,060,925✔
479

480
  SExprSupp* pSup = &pOperator->exprSupp;
3,060,412✔
481
  pSup->hasWindowOrGroup = false;
3,060,412✔
482

483
  SIndefRowsFuncPhysiNode* pPhyNode = (SIndefRowsFuncPhysiNode*)pNode;
3,060,412✔
484

485
  if (pPhyNode->pExprs != NULL) {
3,060,412✔
486
    int32_t    num = 0;
22,910✔
487
    SExprInfo* pSExpr = NULL;
22,910✔
488
    code = createExprInfo(pPhyNode->pExprs, NULL, &pSExpr, &num);
22,910✔
489
    QUERY_CHECK_CODE(code, lino, _error);
22,910✔
490

491
    code = initExprSupp(&pInfo->scalarSup, pSExpr, num, &pTaskInfo->storageAPI.functionStore);
22,910✔
492
    if (code != TSDB_CODE_SUCCESS) {
22,910✔
UNCOV
493
      goto _error;
×
494
    }
495
  }
496

497
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->node.pOutputDataBlockDesc);
3,060,412✔
498
  TSDB_CHECK_NULL(pResBlock, code, lino, _error, terrno);
3,060,925✔
499

500
  // Make sure the size of SSDataBlock will never exceed the size of 2MB.
501
  int32_t TWOMB = 2 * 1024 * 1024;
3,060,925✔
502
  if (numOfRows * pResBlock->info.rowSize > TWOMB) {
3,060,925✔
UNCOV
503
    numOfRows = TWOMB / pResBlock->info.rowSize;
×
504
  }
505

506
  initBasicInfo(&pInfo->binfo, pResBlock);
3,060,925✔
507
  initResultSizeInfo(&pOperator->resultInfo, numOfRows);
3,060,412✔
508
  code = blockDataEnsureCapacity(pResBlock, numOfRows);
3,060,925✔
509
  TSDB_CHECK_CODE(code, lino, _error);
3,060,556✔
510

511
  int32_t    numOfExpr = 0;
3,060,556✔
512
  SExprInfo* pExprInfo = NULL;
3,060,556✔
513
  code = createExprInfo(pPhyNode->pFuncs, NULL, &pExprInfo, &numOfExpr);
3,060,556✔
514
  TSDB_CHECK_CODE(code, lino, _error);
3,059,315✔
515

516
  code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str,
3,059,315✔
517
                            NULL, &pTaskInfo->storageAPI.functionStore);
518
  TSDB_CHECK_CODE(code, lino, _error);
3,059,916✔
519

520
  code = setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr);
3,059,916✔
521
  TSDB_CHECK_CODE(code, lino, _error);
3,060,925✔
522

523
  code = filterInitFromNode((SNode*)pPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
3,058,890✔
524
                            pTaskInfo->pStreamRuntimeInfo);
3,060,925✔
525
  TSDB_CHECK_CODE(code, lino, _error);
3,060,925✔
526

527
  pInfo->binfo.pRes = pResBlock;
3,060,925✔
528
  pInfo->binfo.inputTsOrder = pNode->inputTsOrder;
3,059,386✔
529
  pInfo->binfo.outputTsOrder = pNode->outputTsOrder;
3,057,927✔
530
  code = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr, &pInfo->pPseudoColInfo);
3,059,034✔
531
  TSDB_CHECK_CODE(code, lino, _error);
3,060,556✔
532

533
  setOperatorInfo(pOperator, "IndefinitOperator", QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC, false, OP_NOT_OPENED, pInfo,
3,060,556✔
534
                  pTaskInfo);
535
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doApplyIndefinitFunction, NULL, destroyIndefinitOperatorInfo,
3,060,412✔
536
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
537
                                         
538
  setOperatorResetStateFn(pOperator, resetIndefinitOutputOperState);
3,057,920✔
539
  code = appendDownstream(pOperator, &downstream, 1);
3,057,776✔
540
  if (code != TSDB_CODE_SUCCESS) {
3,058,658✔
UNCOV
541
    goto _error;
×
542
  }
543

544
  *pOptrInfo = pOperator;
3,058,658✔
545
  return TSDB_CODE_SUCCESS;
3,059,540✔
546

UNCOV
547
_error:
×
UNCOV
548
  if (pInfo != NULL) destroyIndefinitOperatorInfo(pInfo);
×
549
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
UNCOV
550
  pTaskInfo->code = code;
×
UNCOV
551
  return code;
×
552
}
553

554
static void doHandleDataBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, SOperatorInfo* downstream,
47,913,849✔
555
                              SExecTaskInfo* pTaskInfo) {
556
  SIndefOperatorInfo* pIndefInfo = pOperator->info;
47,913,849✔
557
  SOptrBasicInfo*     pInfo = &pIndefInfo->binfo;
47,913,849✔
558
  SExprSupp*          pSup = &pOperator->exprSupp;
47,913,336✔
559

560
  int32_t order = pInfo->inputTsOrder;
47,913,849✔
561
  int32_t scanFlag = pBlock->info.scanFlag;
47,913,849✔
562
  int32_t code = TSDB_CODE_SUCCESS;
47,913,336✔
563

564
  // there is an scalar expression that needs to be calculated before apply the group aggregation.
565
  SExprSupp* pScalarSup = &pIndefInfo->scalarSup;
47,913,336✔
566
  if (pScalarSup->pExprInfo != NULL) {
47,912,310✔
567
    code = projectApplyFunctions(pScalarSup->pExprInfo, pBlock, pBlock, pScalarSup->pCtx, pScalarSup->numOfExprs,
69,071✔
568
                                 pIndefInfo->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo));
69,071✔
569
    if (code != TSDB_CODE_SUCCESS) {
69,071✔
UNCOV
570
      T_LONG_JMP(pTaskInfo->env, code);
×
571
    }
572
  }
573

574
  code = setInputDataBlock(pSup, pBlock, order, scanFlag, false);
47,912,823✔
575
  if (code) {
47,913,849✔
UNCOV
576
    T_LONG_JMP(pTaskInfo->env, code);
×
577
  }
578

579
  code = blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
47,913,849✔
580
  if (code != TSDB_CODE_SUCCESS) {
47,912,310✔
UNCOV
581
    T_LONG_JMP(pTaskInfo->env, code);
×
582
  }
583

584
  code = projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs,
47,912,823✔
585
                               pIndefInfo->pPseudoColInfo, GET_STM_RTINFO(pOperator->pTaskInfo));
47,912,310✔
586
  if (code != TSDB_CODE_SUCCESS) {
47,913,849✔
587
    T_LONG_JMP(pTaskInfo->env, code);
17,774✔
588
  }
589
}
47,896,075✔
590

591
int32_t doApplyIndefinitFunction(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
10,324,025✔
592
  QRY_PARAM_CHECK(pResBlock);
10,324,025✔
593
  SIndefOperatorInfo* pIndefInfo = pOperator->info;
10,324,025✔
594
  SOptrBasicInfo*     pInfo = &pIndefInfo->binfo;
10,323,512✔
595
  SExprSupp*          pSup = &pOperator->exprSupp;
10,323,512✔
596
  int32_t             code = TSDB_CODE_SUCCESS;
10,321,748✔
597
  int32_t             lino = 0;
10,321,748✔
598
  SSDataBlock*        pRes = pInfo->pRes;
10,321,748✔
599

600
  blockDataCleanup(pRes);
10,321,027✔
601

602
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
10,322,278✔
603
  if (pOperator->status == OP_EXEC_DONE) {
10,323,512✔
604
    return code;
1,863,508✔
605
  }
606

607
  SOperatorInfo* downstream = pOperator->pDownstream[0];
8,460,004✔
608

609
  while (1) {
2,851,420✔
610
    // here we need to handle the existsed group results
611
    if (pIndefInfo->pNextGroupRes != NULL) {  // todo extract method
11,309,435✔
612
      for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
20,642,240✔
613
        SqlFunctionCtx* pCtx = &pSup->pCtx[k];
15,310,374✔
614

615
        SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
15,310,374✔
616
        pResInfo->initialized = false;
15,310,374✔
617
        pCtx->pOutput = NULL;
15,310,374✔
618
      }
619

620
      doHandleDataBlock(pOperator, pIndefInfo->pNextGroupRes, downstream, pTaskInfo);
5,331,866✔
621
      pIndefInfo->pNextGroupRes = NULL;
5,331,866✔
622
    }
623

624
    if (pInfo->pRes->info.rows < pOperator->resultInfo.threshold) {
11,311,424✔
625
      while (1) {
39,618,599✔
626
        // The downstream exec may change the value of the newgroup, so use a local variable instead.
627
        SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
50,930,023✔
628
        if (pBlock == NULL) {
50,930,040✔
629
          setOperatorCompleted(pOperator);
2,995,110✔
630
          break;
2,995,110✔
631
        }
632
        pInfo->pRes->info.scanFlag = pBlock->info.scanFlag;
47,934,930✔
633

634
        if (pIndefInfo->groupId == 0 && pBlock->info.id.groupId != 0) {
47,934,417✔
635
          pIndefInfo->groupId = pBlock->info.id.groupId;  // this is the initial group result
391,989✔
636
        } else {
637
          if (pIndefInfo->groupId != pBlock->info.id.groupId) {  // reset output buffer and computing status
47,542,428✔
638
            pIndefInfo->groupId = pBlock->info.id.groupId;
5,352,947✔
639
            pIndefInfo->pNextGroupRes = pBlock;
5,352,947✔
640
            break;
5,352,947✔
641
          }
642
        }
643

644
        doHandleDataBlock(pOperator, pBlock, downstream, pTaskInfo);
42,581,983✔
645
        if (pInfo->pRes->info.rows >= pOperator->resultInfo.threshold) {
42,564,209✔
646
          break;
2,945,610✔
647
        }
648
      }
649
    }
650

651
    code = doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
11,292,641✔
652
    QUERY_CHECK_CODE(code, lino, _end);
11,293,794✔
653

654
    size_t rows = pInfo->pRes->info.rows;
11,293,794✔
655
    if (rows > 0 || pOperator->status == OP_EXEC_DONE) {
11,294,163✔
656
      break;
657
    } else {
658
      blockDataCleanup(pInfo->pRes);
2,851,420✔
659
    }
660
  }
661

662
  *pResBlock = (pInfo->pRes->info.rows> 0) ? pInfo->pRes : NULL;
8,442,743✔
663

664
_end:
8,441,861✔
665
  if (code != TSDB_CODE_SUCCESS) {
8,441,861✔
UNCOV
666
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
667
    pTaskInfo->code = code;
×
UNCOV
668
    T_LONG_JMP(pTaskInfo->env, code);
×
669
  }
670
  return code;
8,441,861✔
671
}
672

673
int32_t initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size) {
152,657,389✔
674
  int32_t code = TSDB_CODE_SUCCESS;
152,657,389✔
675
  for (int32_t j = 0; j < size; ++j) {
716,152,534✔
676
    struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(&pCtx[j]);
563,567,540✔
677
    if (isRowEntryInitialized(pResInfo) || fmIsPseudoColumnFunc(pCtx[j].functionId) || pCtx[j].functionId == -1 ||
613,330,264✔
678
        fmIsScalarFunc(pCtx[j].functionId)) {
49,786,927✔
679
      continue;
559,879,751✔
680
    }
681

682
    code = pCtx[j].fpSet.init(&pCtx[j], pCtx[j].resultInfo);
3,673,777✔
683
    if (code) {
3,615,394✔
UNCOV
684
      return code;
×
685
    }
686
  }
687

688
  return 0;
152,584,994✔
689
}
690

691
/*
692
 * The start of each column SResultRowEntryInfo is denote by RowCellInfoOffset.
693
 * Note that in case of top/bottom query, the whole multiple rows of result is treated as only one row of results.
694
 * +------------+-----------------result column 1------------+------------------result column 2-----------+
695
 * | SResultRow | SResultRowEntryInfo | intermediate buffer1 | SResultRowEntryInfo | intermediate buffer 2|
696
 * +------------+--------------------------------------------+--------------------------------------------+
697
 *           offset[0]                                  offset[1]                                   offset[2]
698
 */
699
// TODO refactor: some function move away
700
int32_t setFunctionResultOutput(struct SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t stage,
152,651,637✔
701
                             int32_t numOfExprs) {
702
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
152,651,637✔
703
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
152,610,987✔
704
  int32_t*        rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset;
152,643,957✔
705

706
  SResultRowInfo* pResultRowInfo = &pInfo->resultRowInfo;
152,673,705✔
707
  initResultRowInfo(pResultRowInfo);
152,666,030✔
708

709
  int64_t     tid = 0;
152,600,249✔
710
  int64_t     groupId = 0;
152,601,680✔
711
  SResultRow* pRow = doSetResultOutBufByKey(pSup->pResultBuf, pResultRowInfo, (char*)&tid, sizeof(tid), true, groupId,
152,601,680✔
712
                                            pTaskInfo, false, pSup, true);
713
  if (pRow == NULL || pTaskInfo->code != 0) {
152,656,532✔
714
    return pTaskInfo->code;
30,492✔
715
  }
716

717
  for (int32_t i = 0; i < numOfExprs; ++i) {
716,173,923✔
718
    struct SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, rowEntryInfoOffset);
563,557,819✔
719
    cleanupResultRowEntry(pEntry);
563,557,713✔
720

721
    pCtx[i].resultInfo = pEntry;
563,526,126✔
722
    pCtx[i].scanFlag = stage;
563,541,313✔
723
  }
724

725
  return initCtxOutputBuffer(pCtx, numOfExprs);
152,616,104✔
726
}
727

728
int32_t setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols, SArray** pResList) {
144,404,274✔
729
  QRY_PARAM_CHECK(pResList);
144,404,274✔
730
  SArray* pList = taosArrayInit(4, sizeof(int32_t));
144,404,926✔
731
  if (pList == NULL) {
144,391,957✔
UNCOV
732
    return terrno;
×
733
  }
734

735
  for (int32_t i = 0; i < numOfCols; ++i) {
676,208,934✔
736
    if (fmIsPseudoColumnFunc(pCtx[i].functionId) && !fmIsPlaceHolderFunc(pCtx[i].functionId)) {
531,911,525✔
UNCOV
737
      void* px = taosArrayPush(pList, &i);
×
UNCOV
738
      if (px == NULL) {
×
UNCOV
739
        return terrno;
×
740
      }
741
    }
742
  }
743

744
  *pResList = pList;
144,345,045✔
745
  return 0;
144,351,367✔
746
}
747

748
int32_t doGenerateSourceData(SOperatorInfo* pOperator) {
1,797,630✔
749
  SProjectOperatorInfo* pProjectInfo = pOperator->info;
1,797,630✔
750

751
  SExprSupp*   pSup = &pOperator->exprSupp;
1,797,630✔
752
  SSDataBlock* pRes = pProjectInfo->binfo.pRes;
1,797,630✔
753
  SExprInfo*   pExpr = pSup->pExprInfo;
1,797,630✔
754
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1,797,630✔
755

756
  int32_t code = blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);
1,797,630✔
757
  if (code) {
1,797,630✔
758
    return code;
×
759
  }
760

761
  for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
3,599,119✔
762
    int32_t outputSlotId = pExpr[k].base.resSchema.slotId;
1,801,489✔
763

764
    if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) {
1,801,489✔
765
      SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, outputSlotId);
1,801,489✔
766
      if (pColInfoData == NULL) {
1,801,489✔
UNCOV
767
        return terrno;
×
768
      }
769

770
      int32_t type = pExpr[k].base.pParam[0].param.nType;
1,801,489✔
771
      if (TSDB_DATA_TYPE_NULL == type) {
1,801,489✔
772
        colDataSetNNULL(pColInfoData, 0, 1);
773
      } else {
774
        code = colDataSetVal(pColInfoData, 0, taosVariantGet(&pExpr[k].base.pParam[0].param, type), false);
1,798,543✔
775
        if (code) {
1,798,543✔
UNCOV
776
          return code;
×
777
        }
778
      }
779
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) {
×
UNCOV
780
      SqlFunctionCtx* pfCtx = &pSup->pCtx[k];
×
781

782
      // UDF scalar functions will be calculated here, for example, select foo(n) from (select 1 n).
783
      // UDF aggregate functions will be handled in agg operator.
UNCOV
784
      if (fmIsScalarFunc(pfCtx->functionId)) {
×
UNCOV
785
        SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
×
UNCOV
786
        if (pBlockList == NULL) {
×
UNCOV
787
          return terrno;
×
788
        }
789

UNCOV
790
        void* px = taosArrayPush(pBlockList, &pRes);
×
UNCOV
791
        if (px == NULL) {
×
UNCOV
792
          return terrno;
×
793
        }
794

UNCOV
795
        SColumnInfoData* pResColData = taosArrayGet(pRes->pDataBlock, outputSlotId);
×
UNCOV
796
        if (pResColData == NULL) {
×
797
          return terrno;
×
798
        }
799

800
        SColumnInfoData  idata = {.info = pResColData->info, .hasNull = true};
×
801

UNCOV
802
        SScalarParam dest = {.columnData = &idata};
×
UNCOV
803
        gTaskScalarExtra.pStreamInfo = GET_STM_RTINFO(pOperator->pTaskInfo);
×
UNCOV
804
        gTaskScalarExtra.pStreamRange = NULL;
×
805
        code = scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest, &gTaskScalarExtra);
×
806
        if (code != TSDB_CODE_SUCCESS) {
×
807
          taosArrayDestroy(pBlockList);
×
808
          return code;
×
809
        }
810

811
        int32_t startOffset = pRes->info.rows;
×
812
        if (pRes->info.capacity <= 0) {
×
813
          qError("project failed at: %s:%d", __func__, __LINE__);
×
UNCOV
814
          return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
815
        }
816
        code = colDataAssign(pResColData, &idata, dest.numOfRows, &pRes->info);
×
817
        if (code) {
×
818
          return code;
×
819
        }
820

821
        colDataDestroy(&idata);
×
UNCOV
822
        taosArrayDestroy(pBlockList);
×
823
      } else {
824
        return TSDB_CODE_OPS_NOT_SUPPORT;
×
825
      }
826
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_OPERATOR) {
×
827
      TAOS_CHECK_RETURN(projectApplyOperator(&pExpr[k], pRes, NULL, outputSlotId, NULL, false, &gTaskScalarExtra));
×
828
    } else {
829
      return TSDB_CODE_OPS_NOT_SUPPORT;
×
830
    }
831
  }
832

833
  pRes->info.rows = 1;
1,797,630✔
834
  code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
1,797,630✔
835
  if (code) {
1,797,630✔
UNCOV
836
    pTaskInfo->code = code;
×
837
    return code;
×
838
  }
839

840
  (void) doIngroupLimitOffset(&pProjectInfo->limitInfo, 0, pRes, pOperator);
1,797,630✔
841

842
  setOperatorCompleted(pOperator);
1,797,630✔
843

844
  return code;
1,797,630✔
845
}
846

847
static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, SArray* pPseudoList) {
469,134,819✔
848
  size_t num = (pPseudoList != NULL) ? taosArrayGetSize(pPseudoList) : 0;
469,134,819✔
849
  for (int32_t i = 0; i < num; ++i) {
469,266,026✔
850
    pCtx[i].pOutput = taosArrayGet(pResult->pDataBlock, i);
×
UNCOV
851
    if (pCtx[i].pOutput == NULL) {
×
UNCOV
852
      qError("failed to get the output buf, ptr is null");
×
853
    }
854
  }
855
}
469,266,026✔
856

857
int32_t projectApplyColumn(SSDataBlock* pResult, SSDataBlock* pSrcBlock, int32_t outputSlotId, SqlFunctionCtx* pfCtx, int32_t* numOfRows, bool createNewColModel) {
920,338,984✔
858
  int32_t code = 0, lino = 0;
920,338,984✔
859
  SInputColumnInfoData* pInputData = &pfCtx->input;
920,338,984✔
860
  SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
920,357,687✔
861
  TSDB_CHECK_NULL(pColInfoData, code, lino, _exit, terrno);
920,407,405✔
862

863
  if (pResult->info.rows > 0 && !createNewColModel) {
920,407,405✔
864
    if (pInputData->pData[0] == NULL) {
3,480✔
865
      int32_t slotId = pfCtx->param[0].pCol->slotId;
3,480✔
866

867
      SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId);
3,480✔
868
      TSDB_CHECK_NULL(pInput, code, lino, _exit, terrno);
3,480✔
869

870
      TAOS_CHECK_EXIT(colDataMergeCol(pColInfoData, pResult->info.rows, (int32_t*)&pResult->info.capacity, pInput,
3,480✔
871
                            pSrcBlock->info.rows));
872
      *numOfRows = pSrcBlock->info.rows;
3,480✔
873
      return code;
3,480✔
874
    }
875
    
876
    TAOS_CHECK_EXIT(colDataMergeCol(pColInfoData, pResult->info.rows, (int32_t*)&pResult->info.capacity,
×
877
                          pInputData->pData[0], pInputData->numOfRows));
878
    *numOfRows = pInputData->numOfRows;
×
UNCOV
879
    return code;
×
880
  } 
881
  
882
  if (pInputData->pData[0] == NULL) {
920,411,545✔
883
    int32_t slotId = pfCtx->param[0].pCol->slotId;
3,435✔
884

885
    SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId);
3,435✔
886
    TSDB_CHECK_NULL(pInput, code, lino, _exit, terrno);
3,435✔
887

888
    TAOS_CHECK_EXIT(colDataAssign(pColInfoData, pInput, pSrcBlock->info.rows, &pResult->info));
3,435✔
889
    *numOfRows = pSrcBlock->info.rows;
3,435✔
890

891
    return code;
3,435✔
892
  }
893
  
894
  TAOS_CHECK_EXIT(colDataAssign(pColInfoData, pInputData->pData[0], pInputData->numOfRows, &pResult->info));
920,312,722✔
895
  *numOfRows = pInputData->numOfRows;
920,325,591✔
896

897
_exit:
920,381,988✔
898

899
  if (code) {
920,381,988✔
UNCOV
900
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
901
  }
902
  
903
  return code;
920,384,955✔
904
}
905

906

907
int32_t projectApplyValue(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, int32_t outputSlotId, int32_t* numOfRows, bool createNewColModel) {
17,172,240✔
908
  int32_t code = 0, lino = 0;
17,172,240✔
909
  SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
17,172,240✔
910
  TSDB_CHECK_NULL(pColInfoData, code, lino, _exit, terrno);
17,172,655✔
911

912
  int32_t offset = createNewColModel ? 0 : pResult->info.rows;
17,172,655✔
913
  int32_t type = pExpr->base.pParam[0].param.nType;
17,173,071✔
914
  if (TSDB_DATA_TYPE_NULL == type) {
17,165,595✔
915
    colDataSetNNULL(pColInfoData, offset, pSrcBlock->info.rows);
752,668✔
916
  } else {
917
    char* p = taosVariantGet(&pExpr->base.pParam[0].param, type);
16,412,927✔
918
    for (int32_t i = 0; i < pSrcBlock->info.rows; ++i) {
2,147,483,647✔
919
      TAOS_CHECK_EXIT(colDataSetVal(pColInfoData, i + offset, p, false));
2,147,483,647✔
920
    }
921
  }
922

923
  *numOfRows = pSrcBlock->info.rows;
4,051✔
924

925
_exit:
17,175,188✔
926

927
  if (code) {
17,175,188✔
UNCOV
928
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
929
  }
930
  
931
  return code;
17,174,772✔
932
}
933

934

935

936
int32_t projectApplyOperator(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, int32_t outputSlotId, int32_t* numOfRows, bool createNewColModel, const void* pExtraParams) {
78,868,846✔
937
  int32_t code = 0, lino = 0;
78,868,846✔
938
  SArray* pBlockList = NULL;
78,868,846✔
939
  if (NULL != pSrcBlock) {
78,868,846✔
940
    pBlockList = taosArrayInit(4, POINTER_BYTES);
78,870,947✔
941
    TSDB_CHECK_NULL(pBlockList, code, lino, _exit, terrno);
78,874,182✔
942

943
    void* px = taosArrayPush(pBlockList, &pSrcBlock);
78,870,429✔
944
    TSDB_CHECK_NULL(px, code, lino, _exit, terrno);
78,870,429✔
945
  }
946
  
947
  SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId);
78,868,328✔
948
  TSDB_CHECK_NULL(pResColData, code, lino, _exit, terrno);
78,868,934✔
949

950
  SColumnInfoData idata = {.info = pResColData->info, .hasNull = true};
78,868,934✔
951
  SScalarParam dest = {.columnData = &idata};
78,869,457✔
952
  gTaskScalarExtra.pStreamInfo = (void*)pExtraParams;
78,866,766✔
953
  gTaskScalarExtra.pStreamRange = NULL;
78,866,766✔
954
  TAOS_CHECK_EXIT(scalarCalculate(pExpr->pExpr->_optrRoot.pRootNode, pBlockList, &dest, &gTaskScalarExtra));
78,866,190✔
955

956
  if (pResult->info.rows > 0 && !createNewColModel) {
77,688,897✔
UNCOV
957
    code = colDataMergeCol(pResColData, pResult->info.rows, (int32_t*)&pResult->info.capacity, &idata, dest.numOfRows);
×
958
  } else {
959
    code = colDataAssign(pResColData, &idata, dest.numOfRows, &pResult->info);
77,694,917✔
960
  }
961

962
  colDataDestroy(&idata);
77,696,515✔
963
  TAOS_CHECK_EXIT(code);
77,692,804✔
964

965
  if (numOfRows) {
77,692,804✔
966
    *numOfRows = dest.numOfRows;
77,687,314✔
967
  }
968
  
969
_exit:
78,877,019✔
970

971
  if (code < 0) {
78,871,065✔
972
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
1,178,725✔
973
  }
974

975
  taosArrayDestroy(pBlockList);
78,871,065✔
976
  
977
  return code;
78,869,228✔
978
}
979

980

981
int32_t projectApplyFunction(SqlFunctionCtx* pCtx, SqlFunctionCtx* pfCtx, SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, 
496,974,352✔
982
                                    int32_t outputSlotId, int32_t* numOfRows, bool createNewColModel, const void* pExtraParams, 
983
                                    SArray* pPseudoList, SArray** processByRowFunctionCtx, bool doSelectFunc) {
984
  int32_t code = 0, lino = 0;
496,974,352✔
985
  SArray* pBlockList = NULL;
496,974,352✔
986
  SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId);
496,974,352✔
987
  TSDB_CHECK_NULL(pResColData, code, lino, _exit, terrno);
496,991,054✔
988

989
  if (fmIsPlaceHolderFunc(pfCtx->functionId) && pExtraParams && pfCtx->pExpr->base.pParamList && 1 == pfCtx->pExpr->base.pParamList->length) {
496,991,054✔
990
    TAOS_CHECK_EXIT(scalarAssignPlaceHolderRes(pResColData, pResult->info.rows, pSrcBlock->info.rows, pfCtx->functionId, pExtraParams));
2,648,474✔
991
    *numOfRows = pSrcBlock->info.rows;
2,648,474✔
992

993
    return code;
2,648,474✔
994
  }
995

996
  if (fmIsScalarFunc(pfCtx->functionId) || fmIsPlaceHolderFunc(pfCtx->functionId)) {
494,342,170✔
997
    pBlockList = taosArrayInit(4, POINTER_BYTES);
366,595,081✔
998
    TSDB_CHECK_NULL(pBlockList, code, lino, _exit, terrno);
366,574,079✔
999

1000
    void* px = taosArrayPush(pBlockList, &pSrcBlock);
366,567,548✔
1001
    TSDB_CHECK_NULL(px, code, lino, _exit, terrno);
366,567,548✔
1002

1003
    SColumnInfoData idata = {.info = pResColData->info, .hasNull = true};
366,567,548✔
1004
    SScalarParam dest = {.columnData = &idata};
366,569,193✔
1005
    gTaskScalarExtra.pStreamInfo = (void*)pExtraParams;
366,591,397✔
1006
    gTaskScalarExtra.pStreamRange = NULL;
366,591,397✔
1007
    TAOS_CHECK_EXIT(scalarCalculate((SNode*)pExpr->pExpr->_function.pFunctNode, pBlockList, &dest, &gTaskScalarExtra));
366,585,208✔
1008

1009
    if (pResult->info.rows > 0 && !createNewColModel) {
366,509,489✔
1010
      code = colDataMergeCol(pResColData, pResult->info.rows, (int32_t*)&pResult->info.capacity, &idata, dest.numOfRows);
1,440✔
1011
    } else {
1012
      SColumnInfo oriInfo = pResColData->info;
366,545,359✔
1013
      code = colDataAssign(pResColData, &idata, dest.numOfRows, &pResult->info);
366,548,244✔
1014
      // restore the original column info to satisfy the output column schema
1015
      pResColData->info = oriInfo;
366,556,172✔
1016
    }
1017

1018
    colDataDestroy(&idata);
366,541,439✔
1019
    taosArrayDestroy(pBlockList);
366,548,721✔
1020
    TAOS_CHECK_EXIT(code);
366,544,751✔
1021

1022
    *numOfRows = dest.numOfRows;
366,544,751✔
1023

1024
    return code;
366,526,945✔
1025
  }
1026

1027
  if (fmIsIndefiniteRowsFunc(pfCtx->functionId)) {
127,755,039✔
1028
    SResultRowEntryInfo* pResInfo = GET_RES_INFO(pfCtx);
47,976,706✔
1029
    TAOS_CHECK_EXIT(pfCtx->fpSet.init(pfCtx, pResInfo));
47,976,706✔
1030

1031

1032
    pfCtx->pOutput = (char*)pResColData;
47,977,219✔
1033
    TSDB_CHECK_NULL(pfCtx->pOutput, code, lino, _exit, terrno);
47,977,219✔
1034

1035
    pfCtx->offset = createNewColModel ? 0 : pResult->info.rows;  // set the start offset
47,977,219✔
1036

1037
    // set the timestamp(_rowts) output buffer
1038
    if (taosArrayGetSize(pPseudoList) > 0) {
47,977,219✔
UNCOV
1039
      int32_t* outputColIndex = taosArrayGet(pPseudoList, 0);
×
UNCOV
1040
      TSDB_CHECK_NULL(outputColIndex, code, lino, _exit, terrno);
×
1041

UNCOV
1042
      pfCtx->pTsOutput = (SColumnInfoData*)pCtx[*outputColIndex].pOutput;
×
1043
    }
1044

1045
    // link pDstBlock to set selectivity value
1046
    if (pfCtx->subsidiaries.num > 0) {
47,976,706✔
1047
      pfCtx->pDstBlock = pResult;
44,169,924✔
1048
    }
1049

1050
    code = pfCtx->fpSet.process(pfCtx);
47,977,219✔
1051
    if (code != TSDB_CODE_SUCCESS) {
47,977,219✔
1052
      if (pfCtx->fpSet.cleanup != NULL) {
9,555✔
UNCOV
1053
        pfCtx->fpSet.cleanup(pfCtx);
×
1054
      }
1055
      TAOS_CHECK_EXIT(code);
9,555✔
1056
    }
1057

1058
    *numOfRows = pResInfo->numOfRes;
47,967,664✔
1059
    
1060
    if (fmIsProcessByRowFunc(pfCtx->functionId)) {
47,967,664✔
1061
      if (NULL == *processByRowFunctionCtx) {
45,452,183✔
1062
        *processByRowFunctionCtx = taosArrayInit(1, sizeof(SqlFunctionCtx*));
45,401,773✔
1063
        TSDB_CHECK_NULL(*processByRowFunctionCtx, code, lino, _exit, terrno);
45,401,773✔
1064
      }
1065

1066
      void* px = taosArrayPush(*processByRowFunctionCtx, &pfCtx);
45,452,183✔
1067
      TSDB_CHECK_NULL(px, code, lino, _exit, terrno);
45,452,183✔
1068
    }
1069

1070
    return code;
47,967,664✔
1071
  } 
1072

1073
  if (fmIsAggFunc(pfCtx->functionId)) {
79,777,820✔
1074
    // selective value output should be set during corresponding function execution
1075
    if (!doSelectFunc && fmIsSelectValueFunc(pfCtx->functionId)) {
79,777,820✔
1076
      return code;
44,418,204✔
1077
    }
1078
    
1079
    // _group_key function for "partition by tbname" + csum(col_name) query
1080
    int32_t slotId = pfCtx->param[0].pCol->slotId;
35,359,616✔
1081

1082
    // todo handle the json tag
1083
    SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId);
35,359,616✔
1084
    TSDB_CHECK_NULL(pInput, code, lino, _exit, terrno);
35,359,616✔
1085

1086
    for (int32_t f = 0; f < pSrcBlock->info.rows; ++f) {
2,147,483,647✔
1087
      bool isNull = colDataIsNull_s(pInput, f);
2,147,483,647✔
1088
      if (isNull) {
2,147,483,647✔
1089
        colDataSetNULL(pResColData, pResult->info.rows + f);
248,257,130✔
1090
      } else {
1091
        char* data = colDataGetData(pInput, f);
2,147,483,647✔
1092
        TAOS_CHECK_EXIT(colDataSetVal(pResColData, pResult->info.rows + f, data, isNull));
2,147,483,647✔
1093
      }
1094
    }
1095

1096
    *numOfRows = pSrcBlock->info.rows;
35,359,616✔
1097

1098
    return code;
35,359,616✔
1099
  } 
1100
  
UNCOV
1101
  if (fmIsGroupIdFunc(pfCtx->functionId)) {
×
UNCOV
1102
    for (int32_t f = 0; f < pSrcBlock->info.rows; ++f) {
×
UNCOV
1103
      TAOS_CHECK_EXIT(colDataSetVal(pResColData, pResult->info.rows + f, (const char*)&pSrcBlock->info.id.groupId, false));
×
1104
    }
1105

UNCOV
1106
    *numOfRows = pSrcBlock->info.rows;
×
UNCOV
1107
    return code;
×
1108
  }
1109
  
UNCOV
1110
_exit:
×
1111

1112
  if (code) {
45,673✔
1113
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
45,673✔
1114
  }
1115

1116
  taosArrayDestroy(pBlockList);
45,673✔
1117
  
1118
  return code;
45,673✔
1119
}
1120

1121

1122
int32_t projectApplyFunctionsWithSelect(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock,
469,151,512✔
1123
                                        SqlFunctionCtx* pCtx, int32_t numOfOutput, SArray* pPseudoList,
1124
                                        const void* pExtraParams, bool doSelectFunc, bool hasIndefRowsFunc) {
1125
  int32_t lino = 0;
469,151,512✔
1126
  int32_t code = TSDB_CODE_SUCCESS;
469,151,512✔
1127
  if (hasIndefRowsFunc) {
469,151,512✔
1128
    setPseudoOutputColInfo(pResult, pCtx, pPseudoList);
469,193,826✔
1129
  }
1130
  pResult->info.dataLoad = 1;
469,187,066✔
1131

1132
  SArray* processByRowFunctionCtx = NULL;
469,207,627✔
1133
  if (pSrcBlock == NULL) {
469,144,491✔
UNCOV
1134
    for (int32_t k = 0; k < numOfOutput; ++k) {
×
UNCOV
1135
      int32_t outputSlotId = pExpr[k].base.resSchema.slotId;
×
1136

UNCOV
1137
      if (pExpr[k].pExpr->nodeType != QUERY_NODE_VALUE) {
×
UNCOV
1138
        qError("project failed at: %s:%d", __func__, __LINE__);
×
UNCOV
1139
        TAOS_CHECK_EXIT(TSDB_CODE_INVALID_PARA);
×
1140
      }
UNCOV
1141
      SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
×
UNCOV
1142
      TSDB_CHECK_NULL(pColInfoData, code, lino, _exit, terrno);
×
1143

UNCOV
1144
      int32_t type = pExpr[k].base.pParam[0].param.nType;
×
UNCOV
1145
      if (TSDB_DATA_TYPE_NULL == type) {
×
1146
        colDataSetNNULL(pColInfoData, 0, 1);
1147
      } else {
UNCOV
1148
        TAOS_CHECK_EXIT(colDataSetVal(pColInfoData, 0, taosVariantGet(&pExpr[k].base.pParam[0].param, type), false));
×
1149
      }
1150
    }
1151

UNCOV
1152
    pResult->info.rows = 1;
×
UNCOV
1153
    goto _exit;
×
1154
  }
1155

1156
  if (pResult != pSrcBlock) {
469,144,491✔
1157
    pResult->info.id.groupId = pSrcBlock->info.id.groupId;
329,453,472✔
1158
    if (pSrcBlock->info.parTbName[0]) {
329,512,114✔
UNCOV
1159
      tstrncpy(pResult->info.parTbName, pSrcBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
×
1160
    }
1161
    qTrace("%s, parName:%s,groupId:%" PRIu64, __FUNCTION__, pSrcBlock->info.parTbName, pResult->info.id.groupId);
329,557,950✔
1162
  }
1163

1164
  // if the source equals to the destination, it is to create a new column as the result of scalar
1165
  // function or some operators.
1166
  bool createNewColModel = (pResult == pSrcBlock);
469,122,895✔
1167
  if (createNewColModel) {
469,122,895✔
1168
    TAOS_CHECK_EXIT(blockDataEnsureCapacity(pResult, pResult->info.rows));
139,696,133✔
1169
  }
1170

1171
  int32_t numOfRows = 0;
469,115,292✔
1172

1173
  for (int32_t k = 0; k < numOfOutput; ++k) {
1,981,282,289✔
1174
    int32_t               outputSlotId = pExpr[k].base.resSchema.slotId;
1,513,292,512✔
1175
    SqlFunctionCtx*       pfCtx = &pCtx[k];
1,513,423,728✔
1176
    switch (pExpr[k].pExpr->nodeType) {
1,513,394,797✔
1177
      case QUERY_NODE_COLUMN: {
920,339,335✔
1178
        TAOS_CHECK_EXIT(projectApplyColumn(pResult, pSrcBlock, outputSlotId, pfCtx, &numOfRows, createNewColModel));
920,339,335✔
1179
        break;
920,392,722✔
1180
      } 
1181
      case QUERY_NODE_VALUE: {
17,174,170✔
1182
        TAOS_CHECK_EXIT(projectApplyValue(&pExpr[k], pResult, pSrcBlock, outputSlotId, &numOfRows, createNewColModel));
17,174,170✔
1183
        break;
17,174,693✔
1184
      } 
1185
      case QUERY_NODE_OPERATOR: {
78,874,687✔
1186
        TAOS_CHECK_EXIT(projectApplyOperator(&pExpr[k], pResult, pSrcBlock, outputSlotId, &numOfRows, createNewColModel, pExtraParams));
78,874,687✔
1187
        break;
77,692,077✔
1188
      } 
1189
      case QUERY_NODE_FUNCTION: {
496,983,011✔
1190
        TAOS_CHECK_EXIT(projectApplyFunction(pCtx, pfCtx, &pExpr[k], pResult, pSrcBlock, outputSlotId, &numOfRows, createNewColModel, pExtraParams, pPseudoList, &processByRowFunctionCtx, doSelectFunc));
496,983,011✔
1191
        break;
496,910,811✔
1192
      }
UNCOV
1193
      default: {
×
UNCOV
1194
        qError("invalid project expr nodeType:%d", pExpr[k].pExpr->nodeType);
×
1195
        TAOS_CHECK_EXIT(TSDB_CODE_OPS_NOT_SUPPORT);
152✔
1196
      }
1197
    }
1198
  }
1199

1200
  if (processByRowFunctionCtx && taosArrayGetSize(processByRowFunctionCtx) > 0) {
467,989,777✔
1201
    SqlFunctionCtx** pfCtx = taosArrayGet(processByRowFunctionCtx, 0);
45,401,773✔
1202
    TSDB_CHECK_NULL(pfCtx, code, lino, _exit, terrno);
45,401,773✔
1203

1204
    TAOS_CHECK_EXIT((*pfCtx)->fpSet.processFuncByRow(processByRowFunctionCtx));
45,401,773✔
1205
    numOfRows = (*pfCtx)->resultInfo->numOfRes;
45,393,554✔
1206
  }
1207

1208
  if (!createNewColModel) {
467,979,035✔
1209
    pResult->info.rows += numOfRows;
328,429,374✔
1210
  }
1211

1212
_exit:
469,232,153✔
1213
  if (processByRowFunctionCtx) {
469,214,098✔
1214
    taosArrayDestroy(processByRowFunctionCtx);
45,401,773✔
1215
  }
1216
  if (code) {
469,214,098✔
1217
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
1,232,617✔
1218
  }
1219
  return code;
469,214,098✔
1220
}
1221

1222
int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
469,146,901✔
1223
                              int32_t numOfOutput, SArray* pPseudoList, const void* pExtraParams) {
1224
  return projectApplyFunctionsWithSelect(pExpr, pResult, pSrcBlock, pCtx, numOfOutput, pPseudoList, pExtraParams, false, true);
469,146,901✔
1225
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc