• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5010

29 Mar 2026 04:32AM UTC coverage: 72.292% (+0.03%) from 72.26%
#5010

push

travis-ci

web-flow
refactor: do some internal refactor for TDgpt. (#34955)

253774 of 351039 relevant lines covered (72.29%)

133420324.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.94
/source/libs/executor/src/sortoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "operator.h"
19
#include "querytask.h"
20
#include "tdatablock.h"
21

22
typedef struct SSortOpGroupIdCalc {
23
  STupleHandle* pSavedTuple;
24
  SArray*       pSortColsArr;
25
  char*         keyBuf;
26
  int32_t       lastKeysLen; // default to be 0
27
  uint64_t      lastGroupId;
28
  bool          excludePKCol;
29
} SSortOpGroupIdCalc;
30

31
typedef struct SSortOperatorInfo {
32
  SOptrBasicInfo      binfo;
33
  uint32_t            sortBufSize;  // max buffer size for in-memory sort
34
  SArray*             pSortInfo;
35
  SSortHandle*        pSortHandle;
36
  SColMatchInfo       matchInfo;
37
  int32_t             bufPageSize;
38
  int64_t             startTs;      // sort start time
39
  uint64_t            sortElapsed;  // sort elapsed time, time to flush to disk not included.
40
  SLimitInfo          limitInfo;
41
  uint64_t            maxTupleLength;
42
  int64_t             maxRows;
43
  SSortOpGroupIdCalc* pGroupIdCalc;
44
} SSortOperatorInfo;
45

46
static int32_t doSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
47
static int32_t doOpenSortOperator(SOperatorInfo* pOperator);
48
static int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len);
49
static int32_t doGroupSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
50

51
static void destroySortOperatorInfo(void* param);
52
static void calcSortOperMaxTupleLength(SSortOperatorInfo* pSortOperInfo, SNodeList* pSortKeys);
53

54
static void destroySortOpGroupIdCalc(SSortOpGroupIdCalc* pCalc);
55

56
static int32_t resetSortOperState(SOperatorInfo* pOper) {
101,132✔
57
  SSortOperatorInfo* pInfo = pOper->info;
101,132✔
58
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
101,132✔
59
  pOper->status = OP_NOT_OPENED;
101,132✔
60

61
  resetBasicOperatorState(&pInfo->binfo);
101,132✔
62
  destroySqlFunctionCtx(pOper->exprSupp.pCtx, pOper->exprSupp.pExprInfo, pOper->exprSupp.numOfExprs);
101,132✔
63
  taosMemoryFreeClear(pOper->exprSupp.rowEntryInfoOffset);
101,132✔
64
  pOper->exprSupp.pCtx =
101,132✔
65
      createSqlFunctionCtx(pOper->exprSupp.pExprInfo, pOper->exprSupp.numOfExprs, &pOper->exprSupp.rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore);
101,132✔
66

67
  tsortDestroySortHandle(pInfo->pSortHandle);
101,132✔
68
  pInfo->pSortHandle = NULL;
101,132✔
69

70
  if (pInfo->pGroupIdCalc) {
101,132✔
71
    pInfo->pGroupIdCalc->lastGroupId = 0;
×
72
    pInfo->pGroupIdCalc->lastKeysLen = 0;
×
73
  }
74

75
  return 0;
101,132✔
76
}
77

78
// todo add limit/offset impl
79
int32_t createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
33,250,180✔
80
  QRY_PARAM_CHECK(pOptrInfo);
33,250,180✔
81

82
  int32_t code = 0;
33,264,353✔
83
  int32_t lino = 0;
33,264,353✔
84

85
  SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo));
33,264,353✔
86
  SOperatorInfo*     pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
33,182,853✔
87
  if (pInfo == NULL || pOperator == NULL) {
33,181,410✔
88
    code = terrno;
457✔
89
    goto _error;
×
90
  }
91
  initOperatorCostInfo(pOperator);
33,181,139✔
92

93
  pOperator->pTaskInfo = pTaskInfo;
33,250,010✔
94
  SDataBlockDescNode* pDescNode = pSortNode->node.pOutputDataBlockDesc;
33,265,940✔
95

96
  int32_t numOfCols = 0;
33,249,576✔
97
  code = createExprInfo(pSortNode->pExprs, NULL, &pOperator->exprSupp.pExprInfo, &numOfCols);
33,256,190✔
98
  QUERY_CHECK_CODE(code, lino, _error);
33,198,154✔
99

100
  pOperator->exprSupp.numOfExprs = numOfCols;
33,198,154✔
101
  int32_t numOfOutputCols = 0;
33,171,908✔
102
  code =
103
      extractColMatchInfo(pSortNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo);
33,239,131✔
104
  if (code != TSDB_CODE_SUCCESS) {
33,260,864✔
105
    goto _error;
×
106
  }
107
  
108
  calcSortOperMaxTupleLength(pInfo, pSortNode->pSortKeys);
33,260,864✔
109
  pInfo->maxRows = -1;
33,249,536✔
110
  if (pSortNode->node.pLimit && ((SLimitNode*)pSortNode->node.pLimit)->limit) {
33,251,907✔
111
    SLimitNode* pLimit = (SLimitNode*)pSortNode->node.pLimit;
4,834,207✔
112
    if (pLimit->limit->datum.i > 0) {
4,828,878✔
113
      pInfo->maxRows = pLimit->limit->datum.i + (pLimit->offset ? pLimit->offset->datum.i : 0);
4,821,977✔
114
    }
115
  }
116

117
  pOperator->exprSupp.pCtx =
33,221,743✔
118
      createSqlFunctionCtx(pOperator->exprSupp.pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore);
33,215,715✔
119
  QUERY_CHECK_NULL(pOperator->exprSupp.pCtx, code, lino, _error, terrno);
33,223,689✔
120
  initResultSizeInfo(&pOperator->resultInfo, 1024);
33,190,730✔
121
  code = filterInitFromNode((SNode*)pSortNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
33,231,924✔
122
                            pTaskInfo->pStreamRuntimeInfo);
33,223,248✔
123
  if (code != TSDB_CODE_SUCCESS) {
33,171,891✔
124
    goto _error;
×
125
  }
126

127
  pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
33,171,891✔
128
  QUERY_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno);
33,278,006✔
129

130
  pInfo->pSortInfo = createSortInfo(pSortNode->pSortKeys);
33,240,996✔
131
  TSDB_CHECK_NULL(pInfo->pSortInfo, code, lino, _error, terrno);
33,258,568✔
132

133
  if (pSortNode->calcGroupId) {
33,174,548✔
134
    int32_t keyLen;
114,224✔
135
    SSortOpGroupIdCalc* pGroupIdCalc = pInfo->pGroupIdCalc = taosMemoryCalloc(1, sizeof(SSortOpGroupIdCalc));
114,224✔
136
    if (!pGroupIdCalc) {
113,740✔
137
      code = terrno;
×
138
      goto _error;
×
139
    }
140
    SNodeList* pSortColsNodeArr = makeColsNodeArrFromSortKeys(pSortNode->pSortKeys);
113,740✔
141
    if (!pSortColsNodeArr) code = terrno;
114,224✔
142
    if (TSDB_CODE_SUCCESS == code) {
114,224✔
143
      pGroupIdCalc->pSortColsArr = makeColumnArrayFromList(pSortColsNodeArr);
114,224✔
144
      if (!pGroupIdCalc->pSortColsArr) code = terrno;
114,224✔
145
      nodesClearList(pSortColsNodeArr);
114,224✔
146
    }
147
    if (TSDB_CODE_SUCCESS == code) {
114,708✔
148
      // PK ts col should always at last, see partColOptCreateSort
149
      if (pSortNode->excludePkCol) taosArrayPop(pGroupIdCalc->pSortColsArr);
113,740✔
150
      code = extractKeysLen(pGroupIdCalc->pSortColsArr, &keyLen);
113,256✔
151
      QUERY_CHECK_CODE(code, lino, _error);
113,256✔
152
    }
153
    if (TSDB_CODE_SUCCESS == code) {
114,224✔
154
      pGroupIdCalc->lastKeysLen = 0;
114,224✔
155
      pGroupIdCalc->keyBuf = taosMemoryCalloc(1, keyLen);
113,740✔
156
      if (!pGroupIdCalc->keyBuf) {
113,740✔
157
        code = terrno;
×
158
      }
159
    }
160
  }
161
  if (code != TSDB_CODE_SUCCESS) goto _error;
33,240,047✔
162

163
  pInfo->binfo.inputTsOrder = pSortNode->node.inputTsOrder;
33,240,047✔
164
  pInfo->binfo.outputTsOrder = pSortNode->node.outputTsOrder;
33,236,586✔
165
  initLimitInfo(pSortNode->node.pLimit, pSortNode->node.pSlimit, &pInfo->limitInfo);
33,171,937✔
166

167
  setOperatorInfo(pOperator, "SortOperator", QUERY_NODE_PHYSICAL_PLAN_SORT, true, OP_NOT_OPENED, pInfo, pTaskInfo);
33,235,321✔
168

169

170
  // lazy evaluation for the following parameter since the input datablock is not known till now.
171
  //  pInfo->bufPageSize  = rowSize < 1024 ? 1024 * 2 : rowSize * 2;
172
  //  there are headers, so pageSize = rowSize + header pInfo->sortBufSize  = pInfo->bufPageSize * 16;
173
  // TODO dynamic set the available sort buffer
174

175
  pOperator->fpSet =
176
      createOperatorFpSet(doOpenSortOperator, doSort, NULL, destroySortOperatorInfo, optrDefaultBufFn, getExplainExecInfo, optrDefaultGetNextExtFn, NULL);
33,254,775✔
177

178
  setOperatorResetStateFn(pOperator, resetSortOperState);
33,210,686✔
179
  code = appendDownstream(pOperator, &downstream, 1);
33,215,875✔
180
  if (code != TSDB_CODE_SUCCESS) {
33,173,244✔
181
    goto _error;
×
182
  }
183

184
  *pOptrInfo = pOperator;
33,173,244✔
185
  return TSDB_CODE_SUCCESS;
33,169,157✔
186

187
_error:
×
188
  if (pInfo != NULL) {
×
189
    destroySortOperatorInfo(pInfo);
×
190
  }
191
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
192
  pTaskInfo->code = code;
×
193
  return code;
×
194
}
195

196
int32_t appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) {
2,147,483,647✔
197
  int32_t code = 0;
2,147,483,647✔
198
  for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
2,147,483,647✔
199
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
2,147,483,647✔
200
    if (pColInfo == NULL) {
2,147,483,647✔
201
      return terrno;
×
202
    }
203

204
    bool isNull = tsortIsNullVal(pTupleHandle, i);
2,147,483,647✔
205
    if (isNull) {
2,147,483,647✔
206
      colDataSetNULL(pColInfo, pBlock->info.rows);
2,147,483,647✔
207
    } else {
208
      char* pData = NULL;
2,147,483,647✔
209
      tsortGetValue(pTupleHandle, i, (void**) &pData);
2,147,483,647✔
210

211
      if (pData != NULL) {
2,147,483,647✔
212
        code = colDataSetVal(pColInfo, pBlock->info.rows, pData, false);
2,147,483,647✔
213
        if (code) {
2,147,483,647✔
214
          return code;
×
215
        }
216
      }
217
    }
218
  }
219

220
  pBlock->info.dataLoad = 1;
2,147,483,647✔
221

222
  SDataBlockInfo info = {0};
2,147,483,647✔
223
  tsortGetBlockInfo(pTupleHandle, &info);
2,147,483,647✔
224

225
  pBlock->info.scanFlag = info.scanFlag;
2,147,483,647✔
226
  pBlock->info.rows += 1;
2,147,483,647✔
227
  return code;
2,147,483,647✔
228
}
229

230
/**
231
 * @brief get next tuple with group id attached, here assume that all tuples are sorted by group keys
232
 * @param [in, out] pBlock the output block, the group id will be saved in it
233
 * @retval NULL if next group tuple arrived and this new group tuple will be saved in pInfo.pSavedTuple
234
 */
235
static int32_t nextTupleWithGroupId(SSortHandle* pHandle, SSortOperatorInfo* pInfo, SSDataBlock* pBlock,
2,147,483,647✔
236
                                    STupleHandle** pTupleHandle) {
237
  QRY_PARAM_CHECK(pTupleHandle);
2,147,483,647✔
238

239
  int32_t       code = 0;
2,147,483,647✔
240
  STupleHandle* retTuple = pInfo->pGroupIdCalc->pSavedTuple;
2,147,483,647✔
241
  if (!retTuple) {
2,147,483,647✔
242
    code = tsortNextTuple(pHandle, &retTuple);
2,147,483,647✔
243
    if (code) {
2,147,483,647✔
244
      qError("failed to get next tuple, code:%s", tstrerror(code));
×
245
      return code;
×
246
    }
247
  }
248

249
  if (retTuple) {
2,147,483,647✔
250
    int32_t newGroup;
251
    if (pInfo->pGroupIdCalc->pSavedTuple) {
2,147,483,647✔
252
      newGroup = true;
1,614,140✔
253
      pInfo->pGroupIdCalc->pSavedTuple = NULL;
1,614,140✔
254
    } else {
255
      newGroup = tsortCompAndBuildKeys(pInfo->pGroupIdCalc->pSortColsArr, pInfo->pGroupIdCalc->keyBuf,
2,147,483,647✔
256
                                       &pInfo->pGroupIdCalc->lastKeysLen, retTuple);
2,147,483,647✔
257
    }
258

259
    bool emptyBlock = (pBlock->info.rows == 0);
2,147,483,647✔
260
    if (newGroup) {
2,147,483,647✔
261
      if (!emptyBlock) {
3,342,988✔
262
        // new group arrived, and we have already copied some tuples for cur group, save the new group tuple, return
263
        // NULL. Note that the keyBuf and lastKeysLen has been updated to new value
264
        pInfo->pGroupIdCalc->pSavedTuple = retTuple;
1,614,624✔
265
        retTuple = NULL;
1,614,624✔
266
      } else {
267
        // new group with empty block
268
        pInfo->pGroupIdCalc->lastGroupId = pBlock->info.id.groupId =
1,727,880✔
269
            calcGroupId(pInfo->pGroupIdCalc->keyBuf, pInfo->pGroupIdCalc->lastKeysLen);
1,728,364✔
270
      }
271
    } else {
272
      if (emptyBlock) {
2,147,483,647✔
273
        // new block but not new group, assign last group id to it
274
        pBlock->info.id.groupId = pInfo->pGroupIdCalc->lastGroupId;
986,876✔
275
      } else {
276
        // not new group and not empty block and ret NOT NULL, just return the tuple
277
      }
278
    }
279
  }
280

281
  *pTupleHandle = retTuple;
2,147,483,647✔
282
  return code;
2,147,483,647✔
283
}
284

285
static int32_t getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo,
79,414,647✔
286
                                SSortOperatorInfo* pInfo, SSDataBlock** pResBlock) {
287
  QRY_PARAM_CHECK(pResBlock);
79,414,647✔
288
  blockDataCleanup(pDataBlock);
79,415,180✔
289

290
  int32_t       lino = 0;
79,417,086✔
291
  int32_t       code = 0;
79,417,086✔
292
  STupleHandle* pTupleHandle = NULL;
79,417,086✔
293
  SSDataBlock*  p = NULL;
79,415,463✔
294

295
  code = tsortGetSortedDataBlock(pHandle, &p);
79,416,545✔
296
  if (p == NULL || (code != 0)) {
79,416,138✔
297
    return code;
5,874,285✔
298
  }
299

300
  code = blockDataEnsureCapacity(p, capacity);
73,541,853✔
301
  QUERY_CHECK_CODE(code, lino, _error);
73,542,398✔
302

303
  while (1) {
304
    if (pInfo->pGroupIdCalc) {
2,147,483,647✔
305
      code = nextTupleWithGroupId(pHandle, pInfo, p, &pTupleHandle);
2,147,483,647✔
306
    } else {
307
      code = tsortNextTuple(pHandle, &pTupleHandle);
2,147,483,647✔
308
    }
309

310
    TSDB_CHECK_CODE(code, lino, _error);
2,147,483,647✔
311
    if (pTupleHandle == NULL) {
2,147,483,647✔
312
      break;
54,663,101✔
313
    }
314

315
    code = appendOneRowToDataBlock(p, pTupleHandle);
2,147,483,647✔
316
    QUERY_CHECK_CODE(code, lino, _error);
2,147,483,647✔
317

318
    if (p->info.rows >= capacity) {
2,147,483,647✔
319
      break;
18,879,327✔
320
    }
321
  }
322

323
  QUERY_CHECK_CODE(code, lino, _error);
73,542,428✔
324

325
  if (p->info.rows > 0) {
73,542,428✔
326
    code = blockDataEnsureCapacity(pDataBlock, capacity);
47,706,012✔
327
    QUERY_CHECK_CODE(code, lino, _error);
47,707,129✔
328

329
    // todo extract function to handle this
330
    int32_t numOfCols = taosArrayGetSize(pColMatchInfo);
47,707,129✔
331
    for (int32_t i = 0; i < numOfCols; ++i) {
255,835,509✔
332
      SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, i);
208,127,531✔
333
      QUERY_CHECK_NULL(pmInfo, code, lino, _error, terrno);
208,128,077✔
334

335
      SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId);
208,128,077✔
336
      QUERY_CHECK_NULL(pSrc, code, lino, _error, terrno);
208,127,593✔
337

338
      SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->dstSlotId);
208,127,593✔
339
      QUERY_CHECK_NULL(pDst, code, lino, _error, terrno);
208,124,587✔
340

341
      code = colDataAssign(pDst, pSrc, p->info.rows, &pDataBlock->info);
208,124,587✔
342
      QUERY_CHECK_CODE(code, lino, _error);
208,127,493✔
343
    }
344

345
    pDataBlock->info.dataLoad = 1;
47,707,978✔
346
    pDataBlock->info.rows = p->info.rows;
47,707,613✔
347
    pDataBlock->info.scanFlag = p->info.scanFlag;
47,706,723✔
348
    pDataBlock->info.id.groupId = p->info.id.groupId;
47,705,110✔
349
  }
350

351
  blockDataDestroy(p);
73,538,821✔
352
  *pResBlock = (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
73,540,748✔
353
  return code;
73,539,710✔
354

355
  _error:
×
356
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
357

358
  blockDataDestroy(p);
×
359
  return code;
×
360
}
361

362
int32_t loadNextDataBlock(void* param, SSDataBlock** ppBlock) {
2,143,288,406✔
363
  SOperatorInfo* pOperator = (SOperatorInfo*)param;
2,143,288,406✔
364
  int32_t        code = pOperator->fpSet.getNextFn(pOperator, ppBlock);
2,143,288,406✔
365
  if (code) {
2,143,141,814✔
366
    qError("failed to get next data block from upstream, %s code:%s", __func__, tstrerror(code));
×
367
  } else {
368
    code = blockDataCheck(*ppBlock);
2,143,141,814✔
369
    if (code) {
2,143,185,443✔
370
      qError("failed to check block data, %s code:%s", __func__, tstrerror(code));
×
371
    }
372
  }
373
  return code;
2,143,172,570✔
374
}
375

376
// todo refactor: merged with fetch fp
377
void applyScalarFunction(SSDataBlock* pBlock, void* param) {
2,147,483,647✔
378
  SOperatorInfo*     pOperator = param;
2,147,483,647✔
379
  SSortOperatorInfo* pSort = pOperator->info;
2,147,483,647✔
380
  if (pOperator->exprSupp.pExprInfo != NULL && pOperator->exprSupp.numOfExprs > 0) {
2,147,483,647✔
381
    int32_t code = projectApplyFunctions(pOperator->exprSupp.pExprInfo, pBlock, pBlock, pOperator->exprSupp.pCtx,
5,141,551✔
382
                                         pOperator->exprSupp.numOfExprs, NULL,
383
                                         GET_STM_RTINFO(pOperator->pTaskInfo));
5,142,092✔
384
    if (code != TSDB_CODE_SUCCESS) {
5,142,775✔
385
      T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
386
    }
387
  }
388
}
2,147,483,647✔
389

390
int32_t doOpenSortOperator(SOperatorInfo* pOperator) {
79,466,974✔
391
  SSortOperatorInfo* pInfo = pOperator->info;
79,466,974✔
392
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;
79,480,198✔
393
  int32_t            code = TSDB_CODE_SUCCESS;
79,464,062✔
394
  int32_t            lino = 0;
79,464,062✔
395
  SSortSource* pSource =NULL;
79,464,062✔
396

397
  if (OPTR_IS_OPENED(pOperator)) {
79,464,062✔
398
    return code;
46,258,011✔
399
  }
400

401
  //  pInfo->binfo.pRes is not equalled to the input datablock.
402
  pInfo->pSortHandle = NULL;
33,205,407✔
403
  code =
404
      tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str, pInfo->maxRows,
66,405,820✔
405
                            pInfo->maxTupleLength, tsPQSortMemThreshold * 1024 * 1024, &pInfo->pSortHandle);
33,205,598✔
406
  QUERY_CHECK_CODE(code, lino, _end);
33,174,758✔
407

408
  tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, applyScalarFunction, pOperator);
33,174,758✔
409

410
  pSource = taosMemoryCalloc(1, sizeof(SSortSource));
33,129,374✔
411
  QUERY_CHECK_NULL(pSource, code, lino, _end, terrno);
33,156,227✔
412

413
  pSource->param = pOperator->pDownstream[0];
33,156,227✔
414
  pSource->onlyRef = true;
33,195,311✔
415

416
  code = tsortAddSource(pInfo->pSortHandle, pSource);
33,197,271✔
417
  QUERY_CHECK_CODE(code, lino, _end);
33,167,152✔
418
  pSource = NULL;
33,167,152✔
419

420
  code = tsortOpen(pInfo->pSortHandle);
33,167,152✔
421
  QUERY_CHECK_CODE(code, lino, _end);
33,119,844✔
422
  pOperator->status = OP_RES_TO_RETURN;
33,119,363✔
423
  OPTR_SET_OPENED(pOperator);
33,119,363✔
424

425
_end:
33,118,205✔
426
  if (pSource) {
33,118,205✔
427
    taosMemoryFree(pSource);
×
428
  }
429
  if (code != TSDB_CODE_SUCCESS) {
33,119,303✔
430
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
481✔
431
    pTaskInfo->code = code;
481✔
432
    T_LONG_JMP(pTaskInfo->env, code);
481✔
433
  }
434
  return code;
33,118,822✔
435
}
436

437
int32_t doSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
79,513,679✔
438
  QRY_PARAM_CHECK(pResBlock);
79,513,679✔
439
  int32_t code = TSDB_CODE_SUCCESS;
79,512,021✔
440
  int32_t lino = 0;
79,512,021✔
441
  if (pOperator->status == OP_EXEC_DONE) {
79,512,021✔
442
    return code;
34,884✔
443
  }
444

445
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;
79,470,947✔
446
  SSortOperatorInfo* pInfo = pOperator->info;
79,471,655✔
447

448
  code = pOperator->fpSet._openFn(pOperator);
79,460,396✔
449
  QUERY_CHECK_CODE(code, lino, _end);
79,374,653✔
450

451
  // multi-group case not handle here
452
  SSDataBlock* pBlock = NULL;
79,374,653✔
453
  while (1) {
38,896✔
454
    if (tsortIsClosed(pInfo->pSortHandle)) {
79,415,614✔
455
      code = TSDB_CODE_TSC_QUERY_CANCELLED;
×
456
      QUERY_CHECK_CODE(code, lino, _end);
×
457
    }
458

459
    recordOpExecBeforeDownstream(pOperator);
79,416,048✔
460
    code = getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity,
79,414,464✔
461
                                pInfo->matchInfo.pList, pInfo, &pBlock);
462
    recordOpExecAfterDownstream(pOperator, pBlock ? pBlock->info.rows : 0);
79,413,733✔
463
    QUERY_CHECK_CODE(code, lino, _end);
79,412,232✔
464
    if (pBlock == NULL) {
79,412,232✔
465
      setOperatorCompleted(pOperator);
31,708,201✔
466
      return code;
31,709,130✔
467
    }
468

469
    code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, &pInfo->matchInfo, NULL);
47,704,031✔
470
    QUERY_CHECK_CODE(code, lino, _end);
47,702,327✔
471

472
    if (blockDataGetNumOfRows(pBlock) == 0) {
47,702,327✔
473
      continue;
×
474
    }
475

476
    // there are bugs?
477
    bool limitReached = applyLimitOffset(&pInfo->limitInfo, pBlock, pTaskInfo);
47,702,267✔
478
    if (limitReached) {
47,701,708✔
479
      resetLimitInfoForNextGroup(&pInfo->limitInfo);
2,998,474✔
480
    }
481

482
    if (pBlock->info.rows > 0) {
47,701,159✔
483
      break;
47,664,290✔
484
    }
485
  }
486

487
  *pResBlock = blockDataGetNumOfRows(pBlock) > 0 ? pBlock : NULL;
47,664,290✔
488
_end:
47,664,150✔
489
  if (code != TSDB_CODE_SUCCESS) {
47,658,021✔
490
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
491
    pTaskInfo->code = code;
×
492
    T_LONG_JMP(pTaskInfo->env, code);
×
493
  }
494
  return code;
47,661,397✔
495
}
496

497
void destroySortOperatorInfo(void* param) {
33,272,267✔
498
  SSortOperatorInfo* pInfo = (SSortOperatorInfo*)param;
33,272,267✔
499
  blockDataDestroy(pInfo->binfo.pRes);
33,272,267✔
500
  pInfo->binfo.pRes = NULL;
33,275,590✔
501

502
  tsortDestroySortHandle(pInfo->pSortHandle);
33,275,590✔
503
  taosArrayDestroy(pInfo->pSortInfo);
33,280,157✔
504
  taosArrayDestroy(pInfo->matchInfo.pList);
33,276,323✔
505
  destroySortOpGroupIdCalc(pInfo->pGroupIdCalc);
33,277,558✔
506
  taosMemoryFreeClear(param);
33,271,546✔
507
}
33,279,608✔
508

509
int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
97,902✔
510
  SSortExecInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortExecInfo));
97,902✔
511
  if (pInfo == NULL) {
97,902✔
512
    return terrno;
×
513
  }
514

515
  SSortOperatorInfo* pOperatorInfo = (SSortOperatorInfo*)pOptr->info;
97,902✔
516

517
  *pInfo = tsortGetSortExecInfo(pOperatorInfo->pSortHandle);
97,902✔
518
  *pOptrExplain = pInfo;
97,902✔
519
  *len = sizeof(SSortExecInfo);
97,902✔
520
  return TSDB_CODE_SUCCESS;
97,902✔
521
}
522

523
static void calcSortOperMaxTupleLength(SSortOperatorInfo* pSortOperInfo, SNodeList* pSortKeys) {
33,252,287✔
524
  SColMatchInfo* pColItem = &pSortOperInfo->matchInfo;
33,252,287✔
525
  size_t         size = taosArrayGetSize(pColItem->pList);
33,268,960✔
526
  for (size_t i = 0; i < size; ++i) {
137,136,283✔
527
    SColMatchItem* pInfo = taosArrayGet(pColItem->pList, i);
103,914,559✔
528
    if (pInfo == NULL) {
103,885,970✔
529
      continue;
×
530
    }
531

532
    pSortOperInfo->maxTupleLength += pInfo->dataType.bytes;
103,885,970✔
533
  }
534

535
  size = LIST_LENGTH(pSortKeys);
33,221,724✔
536
  for (size_t i = 0; i < size; ++i) {
81,493,562✔
537
    SOrderByExprNode* pOrderExprNode = (SOrderByExprNode*)nodesListGetNode(pSortKeys, i);
48,239,222✔
538
    pSortOperInfo->maxTupleLength += ((SColumnNode*)pOrderExprNode->pExpr)->node.resType.bytes;
48,245,701✔
539
  }
540
}
33,254,340✔
541

542
static void destroySortOpGroupIdCalc(SSortOpGroupIdCalc* pCalc) {
33,279,617✔
543
  if (pCalc) {
33,279,617✔
544
    taosArrayDestroy(pCalc->pSortColsArr);
114,224✔
545
    taosMemoryFree(pCalc->keyBuf);
114,224✔
546
    taosMemoryFree(pCalc);
114,224✔
547
  }
548
}
33,279,617✔
549

550
//=====================================================================================
551
// Group Sort Operator
552
typedef enum EChildOperatorStatus { CHILD_OP_NEW_GROUP, CHILD_OP_SAME_GROUP, CHILD_OP_FINISHED } EChildOperatorStatus;
553

554
typedef struct SGroupSortOperatorInfo {
555
  SOptrBasicInfo       binfo;
556
  SArray*              pSortInfo;
557
  SColMatchInfo        matchInfo;
558
  int64_t              startTs;
559
  uint64_t             sortElapsed;
560
  bool                 hasGroupId;
561
  uint64_t             currGroupId;
562
  SSDataBlock*         prefetchedSortInput;
563
  SSortHandle*         pCurrSortHandle;
564
  EChildOperatorStatus childOpStatus;
565
  SSortExecInfo        sortExecInfo;
566
} SGroupSortOperatorInfo;
567

568
int32_t getGroupSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo,
2,170,852✔
569
                                SGroupSortOperatorInfo* pInfo, SSDataBlock** pResBlock) {
570
  QRY_PARAM_CHECK(pResBlock);
2,170,852✔
571

572
  blockDataCleanup(pDataBlock);
2,170,852✔
573
  int32_t code = blockDataEnsureCapacity(pDataBlock, capacity);
2,170,852✔
574
  if (code) {
2,170,852✔
575
    return code;
×
576
  }
577

578
  SSDataBlock* p = NULL;
2,170,852✔
579
  code = tsortGetSortedDataBlock(pHandle, &p);
2,170,852✔
580
  if (p == NULL || (code != 0)) {
2,170,852✔
581
    return code;
×
582
  }
583

584
  code = blockDataEnsureCapacity(p, capacity);
2,170,852✔
585
  if (code) {
2,170,852✔
586
    return code;
×
587
  }
588

589
  while (1) {
1,993,005,390✔
590
    STupleHandle* pTupleHandle = NULL;
1,995,176,242✔
591
    code = tsortNextTuple(pHandle, &pTupleHandle);
1,995,175,802✔
592
    if (pTupleHandle == NULL || code != 0) {
1,995,176,291✔
593
      break;
594
    }
595

596
    code = appendOneRowToDataBlock(p, pTupleHandle);
1,994,905,159✔
597
    if (code) {
1,994,905,110✔
598
      break;
×
599
    }
600

601
    if (p->info.rows >= capacity) {
1,994,905,110✔
602
      break;
1,899,720✔
603
    }
604
  }
605

606
  if (p->info.rows > 0) {
2,170,852✔
607
    int32_t numOfCols = taosArrayGetSize(pColMatchInfo);
2,035,286✔
608
    for (int32_t i = 0; i < numOfCols; ++i) {
6,105,858✔
609
      SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, i);
4,070,572✔
610
      if (pmInfo == NULL) {
4,070,572✔
611
        return terrno;
×
612
      }
613

614
      SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId);
4,070,572✔
615
      if (pSrc == NULL) {
4,070,572✔
616
        return terrno;
×
617
      }
618

619
      SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->dstSlotId);
4,070,572✔
620
      if (pDst == NULL) {
4,070,572✔
621
        return terrno;
×
622
      }
623

624
      code = colDataAssign(pDst, pSrc, p->info.rows, &pDataBlock->info);
4,070,572✔
625
      if (code) {
4,070,572✔
626
        return code;
×
627
      }
628
    }
629

630
    pDataBlock->info.rows = p->info.rows;
2,035,286✔
631
    pDataBlock->info.capacity = p->info.rows;
2,035,286✔
632
    pDataBlock->info.scanFlag = p->info.scanFlag;
2,035,286✔
633
  }
634

635
  blockDataDestroy(p);
2,170,852✔
636
  *pResBlock = (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
2,170,852✔
637
  return code;
2,170,852✔
638
}
639

640
typedef struct SGroupSortSourceParam {
641
  SOperatorInfo*          childOpInfo;
642
  SGroupSortOperatorInfo* grpSortOpInfo;
643
} SGroupSortSourceParam;
644

645
int32_t fetchNextGroupSortDataBlock(void* param, SSDataBlock** ppBlock) {
109,310,287✔
646
  int32_t                 code = 0;
109,310,287✔
647
  int32_t                 lino = 0;
109,310,287✔
648
  SGroupSortSourceParam*  source = param;
109,310,287✔
649
  SGroupSortOperatorInfo* grpSortOpInfo = source->grpSortOpInfo;
109,310,287✔
650
  SSDataBlock*            block = NULL;
109,310,287✔
651

652
  QRY_PARAM_CHECK(ppBlock);
109,310,732✔
653

654
  if (grpSortOpInfo->prefetchedSortInput) {
109,310,732✔
655
    block = grpSortOpInfo->prefetchedSortInput;
135,566✔
656
    grpSortOpInfo->prefetchedSortInput = NULL;
135,126✔
657
    *ppBlock = block;
135,566✔
658
  } else {
659
    SOperatorInfo* childOp = source->childOpInfo;
109,175,166✔
660
    code = childOp->fpSet.getNextFn(childOp, &block);
109,175,166✔
661
    QUERY_CHECK_CODE(code, lino, _end);
109,175,606✔
662

663
    if (block != NULL) {
109,175,606✔
664
      code = blockDataCheck(block);
109,092,540✔
665
      QUERY_CHECK_CODE(code, lino, _end);
109,092,540✔
666
      if (block->info.id.groupId == grpSortOpInfo->currGroupId) {
109,092,540✔
667
        grpSortOpInfo->childOpStatus = CHILD_OP_SAME_GROUP;
109,040,040✔
668
        *ppBlock = block;
109,040,040✔
669
      } else {
670
        grpSortOpInfo->childOpStatus = CHILD_OP_NEW_GROUP;
52,500✔
671
        grpSortOpInfo->prefetchedSortInput = block;
52,500✔
672
      }
673
    } else {
674
      grpSortOpInfo->childOpStatus = CHILD_OP_FINISHED;
83,066✔
675
    }
676
  }
677

678
  return code;
109,311,172✔
679
_end:
×
680
  if (code != 0) {
×
681
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
682
  }
683
  return code;
×
684
}
685

686
int32_t beginSortGroup(SOperatorInfo* pOperator) {
135,566✔
687
  SGroupSortOperatorInfo* pInfo = pOperator->info;
135,566✔
688
  SExecTaskInfo*          pTaskInfo = pOperator->pTaskInfo;
135,566✔
689

690
  //  pInfo->binfo.pRes is not equalled to the input datablock.
691
  pInfo->pCurrSortHandle = NULL;
135,121✔
692

693
  int32_t code = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str, 0, 0,
135,566✔
694
                                       0, &pInfo->pCurrSortHandle);
695
  if (code) {
135,566✔
696
    return code;
×
697
  }
698

699
  tsortSetFetchRawDataFp(pInfo->pCurrSortHandle, fetchNextGroupSortDataBlock, applyScalarFunction, pOperator);
135,566✔
700

701
  SSortSource*           ps = taosMemoryCalloc(1, sizeof(SSortSource));
135,566✔
702
  SGroupSortSourceParam* param = taosMemoryCalloc(1, sizeof(SGroupSortSourceParam));
135,566✔
703
  if (ps == NULL || param == NULL) {
135,566✔
704
    taosMemoryFree(ps);
×
705
    taosMemoryFree(param);
×
706
    return terrno;
×
707
  }
708

709
  param->childOpInfo = pOperator->pDownstream[0];
135,566✔
710
  param->grpSortOpInfo = pInfo;
135,566✔
711

712
  ps->param = param;
135,566✔
713
  ps->onlyRef = false;
135,566✔
714
  code = tsortAddSource(pInfo->pCurrSortHandle, ps);
135,566✔
715
  if (code != 0) {
135,566✔
716
    return code;
×
717
  }
718

719
  code = tsortOpen(pInfo->pCurrSortHandle);
135,566✔
720
  return code;
135,566✔
721
}
722

723
int32_t finishSortGroup(SOperatorInfo* pOperator) {
135,566✔
724
  SGroupSortOperatorInfo* pInfo = pOperator->info;
135,566✔
725

726
  SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pCurrSortHandle);
135,566✔
727

728
  pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod;
135,566✔
729
  pInfo->sortExecInfo.sortBuffer = sortExecInfo.sortBuffer;
135,566✔
730
  pInfo->sortExecInfo.loops += sortExecInfo.loops;
135,566✔
731
  pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes;
135,566✔
732
  pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes;
135,566✔
733

734
  tsortDestroySortHandle(pInfo->pCurrSortHandle);
135,566✔
735
  pInfo->pCurrSortHandle = NULL;
135,566✔
736

737
  return TSDB_CODE_SUCCESS;
135,566✔
738
}
739

740
int32_t doGroupSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
2,139,636✔
741
  QRY_PARAM_CHECK(pResBlock);
2,139,636✔
742
  SExecTaskInfo*          pTaskInfo = pOperator->pTaskInfo;
2,139,636✔
743
  SGroupSortOperatorInfo* pInfo = pOperator->info;
2,139,636✔
744
  int32_t                 code = TSDB_CODE_SUCCESS;
2,139,636✔
745
  int32_t                 lino = 0;
2,139,636✔
746

747
  if (pOperator->status == OP_EXEC_DONE) {
2,139,636✔
748
    return code;
×
749
  }
750

751
  code = pOperator->fpSet._openFn(pOperator);
2,139,636✔
752
  QUERY_CHECK_CODE(code, lino, _end);
2,139,636✔
753

754
  if (!pInfo->hasGroupId) {
2,139,636✔
755
    pInfo->hasGroupId = true;
104,350✔
756

757
    pInfo->prefetchedSortInput = getNextBlockFromDownstream(pOperator, 0);
104,350✔
758
    if (pInfo->prefetchedSortInput == NULL) {
103,905✔
759
      setOperatorCompleted(pOperator);
21,284✔
760
      return code;
21,284✔
761
    }
762

763
    pInfo->currGroupId = pInfo->prefetchedSortInput->info.id.groupId;
82,621✔
764
    pInfo->childOpStatus = CHILD_OP_NEW_GROUP;
82,621✔
765
    code = beginSortGroup(pOperator);
83,066✔
766
    QUERY_CHECK_CODE(code, lino, _end);
83,066✔
767
  }
768

769
  SSDataBlock* pBlock = NULL;
2,118,352✔
770
  while (pInfo->pCurrSortHandle != NULL) {
2,170,852✔
771
    if (tsortIsClosed(pInfo->pCurrSortHandle)) {
2,170,852✔
772
      code = TSDB_CODE_TSC_QUERY_CANCELLED;
×
773
      QUERY_CHECK_CODE(code, lino, _end);
×
774
    }
775

776
    // beginSortGroup would fetch all child blocks of pInfo->currGroupId;
777
    if (pInfo->childOpStatus == CHILD_OP_SAME_GROUP) {
2,170,852✔
778
      code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
779
      QUERY_CHECK_CODE(code, lino, _end);
×
780
    }
781

782
    recordOpExecBeforeDownstream(pOperator);
2,170,852✔
783
    code = getGroupSortedBlockData(pInfo->pCurrSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity,
2,170,852✔
784
                                     pInfo->matchInfo.pList, pInfo, &pBlock);
785
    recordOpExecAfterDownstream(pOperator, pBlock ? pBlock->info.rows : 0);
2,170,852✔
786
    QUERY_CHECK_CODE(code, lino, _end);
2,170,852✔
787
    if (pBlock != NULL) {
2,170,852✔
788
      pBlock->info.id.groupId = pInfo->currGroupId;
2,035,286✔
789
      *pResBlock = pBlock;
2,035,286✔
790
      return code;
2,035,286✔
791
    } else {
792
      if (pInfo->childOpStatus == CHILD_OP_NEW_GROUP) {
135,566✔
793
        (void) finishSortGroup(pOperator);
52,500✔
794
        pInfo->currGroupId = pInfo->prefetchedSortInput->info.id.groupId;
52,500✔
795
        code = beginSortGroup(pOperator);
52,500✔
796
        QUERY_CHECK_CODE(code, lino, _end);
52,500✔
797
      } else if (pInfo->childOpStatus == CHILD_OP_FINISHED) {
83,066✔
798
        (void) finishSortGroup(pOperator);
83,066✔
799
        setOperatorCompleted(pOperator);
83,066✔
800
        return code;
83,066✔
801
      }
802
    }
803
  }
804

805
_end:
×
806
  if (code != TSDB_CODE_SUCCESS) {
×
807
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
808
    pTaskInfo->code = code;
×
809
    T_LONG_JMP(pTaskInfo->env, code);
×
810
  }
811
  return code;
×
812
}
813

814
int32_t getGroupSortExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
×
815
  SGroupSortOperatorInfo* pInfo = (SGroupSortOperatorInfo*)pOptr->info;
×
816
  *pOptrExplain = &pInfo->sortExecInfo;
×
817
  *len = sizeof(SSortExecInfo);
×
818
  return TSDB_CODE_SUCCESS;
×
819
}
820

821
void destroyGroupSortOperatorInfo(void* param) {
104,350✔
822
  SGroupSortOperatorInfo* pInfo = (SGroupSortOperatorInfo*)param;
104,350✔
823
  blockDataDestroy(pInfo->binfo.pRes);
104,350✔
824
  pInfo->binfo.pRes = NULL;
104,350✔
825

826
  taosArrayDestroy(pInfo->pSortInfo);
104,350✔
827
  taosArrayDestroy(pInfo->matchInfo.pList);
104,350✔
828

829
  tsortDestroySortHandle(pInfo->pCurrSortHandle);
104,350✔
830
  pInfo->pCurrSortHandle = NULL;
104,350✔
831

832
  taosMemoryFreeClear(param);
104,350✔
833
}
104,350✔
834

835
static int32_t resetGroupSortOperState(SOperatorInfo* pOper) {
×
836
  SGroupSortOperatorInfo* pInfo = pOper->info;
×
837
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
×
838
  pOper->status = OP_NOT_OPENED;
×
839

840
  pInfo->currGroupId = 0;
×
841
  pInfo->hasGroupId = false;
×
842
  pInfo->prefetchedSortInput = NULL;
×
843
  pInfo->childOpStatus = CHILD_OP_NEW_GROUP;
×
844
  pInfo->sortExecInfo = (SSortExecInfo){0};
×
845
  
846
  resetBasicOperatorState(&pInfo->binfo);
×
847
  destroySqlFunctionCtx(pOper->exprSupp.pCtx, pOper->exprSupp.pExprInfo, pOper->exprSupp.numOfExprs);
×
848
  taosMemoryFreeClear(pOper->exprSupp.rowEntryInfoOffset);
×
849
  pOper->exprSupp.pCtx =
×
850
      createSqlFunctionCtx(pOper->exprSupp.pExprInfo, pOper->exprSupp.numOfExprs, &pOper->exprSupp.rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore);
×
851

852
  tsortDestroySortHandle(pInfo->pCurrSortHandle);
×
853
  pInfo->pCurrSortHandle = NULL;
×
854

855
  return 0;
×
856
}
857

858
int32_t createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode,
103,568✔
859
                                    SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
860
  QRY_PARAM_CHECK(pOptrInfo);
103,568✔
861
  int32_t code = 0;
103,959✔
862
  int32_t lino = 0;
103,959✔
863

864
  SGroupSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupSortOperatorInfo));
103,959✔
865
  SOperatorInfo*          pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
102,548✔
866
  if (pInfo == NULL || pOperator == NULL) {
103,959✔
867
    code = terrno;
445✔
868
    goto _error;
×
869
  }
870
  initOperatorCostInfo(pOperator);
103,514✔
871

872
  SExprSupp*          pSup = &pOperator->exprSupp;
103,012✔
873
  SDataBlockDescNode* pDescNode = pSortPhyNode->node.pOutputDataBlockDesc;
103,403✔
874

875
  int32_t    numOfCols = 0;
103,012✔
876
  SExprInfo* pExprInfo = NULL;
103,403✔
877
  code = createExprInfo(pSortPhyNode->pExprs, NULL, &pExprInfo, &numOfCols);
103,403✔
878
  QUERY_CHECK_CODE(code, lino, _error);
103,074✔
879

880
  pSup->pExprInfo = pExprInfo;
103,074✔
881
  pSup->numOfExprs = numOfCols;
103,074✔
882

883
  initResultSizeInfo(&pOperator->resultInfo, 1024);
103,519✔
884
  pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset,
103,910✔
885
                                                  &pTaskInfo->storageAPI.functionStore);
886
  QUERY_CHECK_NULL(pOperator->exprSupp.pCtx, code, lino, _error, terrno);
103,074✔
887

888
  pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
102,548✔
889
  QUERY_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno);
104,350✔
890

891
  code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
103,910✔
892
  TSDB_CHECK_CODE(code, lino, _error);
103,824✔
893

894
  pInfo->binfo.inputTsOrder = pSortPhyNode->node.inputTsOrder;
103,824✔
895
  pInfo->binfo.outputTsOrder = pSortPhyNode->node.outputTsOrder;
103,824✔
896

897
  int32_t numOfOutputCols = 0;
103,824✔
898
  code = extractColMatchInfo(pSortPhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID,
103,824✔
899
                             &pInfo->matchInfo);
900
  TSDB_CHECK_CODE(code, lino, _error);
103,824✔
901

902
  pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys);
103,824✔
903
  setOperatorInfo(pOperator, "GroupSortOperator", QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT, false, OP_NOT_OPENED, pInfo,
104,350✔
904
                  pTaskInfo);
905
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doGroupSort, NULL, destroyGroupSortOperatorInfo,
104,350✔
906
                                         optrDefaultBufFn, getGroupSortExplainExecInfo, optrDefaultGetNextExtFn, NULL);
907

908
  setOperatorResetStateFn(pOperator, resetGroupSortOperState);
103,298✔
909
                                         
910
  code = appendDownstream(pOperator, &downstream, 1);
103,824✔
911
  if (code != TSDB_CODE_SUCCESS) {
104,350✔
912
    goto _error;
×
913
  }
914

915
  *pOptrInfo = pOperator;
104,350✔
916
  return TSDB_CODE_SUCCESS;
104,350✔
917

918
_error:
×
919
  pTaskInfo->code = code;
×
920
  if (pInfo != NULL) {
×
921
    destroyGroupSortOperatorInfo(pInfo);
×
922
  }
923
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
924
  return code;
×
925
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc