• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4931

16 Jan 2026 02:32AM UTC coverage: 66.749% (+0.03%) from 66.716%
#4931

push

travis-ci

web-flow
enh: interp supports using non-null prev/next values to fill (#34236)

281 of 327 new or added lines in 11 files covered. (85.93%)

1890 existing lines in 121 files now uncovered.

203303 of 304580 relevant lines covered (66.75%)

129941648.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.5
/source/libs/executor/src/sortoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "operator.h"
19
#include "querytask.h"
20
#include "tdatablock.h"
21

22
typedef struct SSortOpGroupIdCalc {
23
  STupleHandle* pSavedTuple;
24
  SArray*       pSortColsArr;
25
  char*         keyBuf;
26
  int32_t       lastKeysLen; // default to be 0
27
  uint64_t      lastGroupId;
28
  bool          excludePKCol;
29
} SSortOpGroupIdCalc;
30

31
typedef struct SSortOperatorInfo {
32
  SOptrBasicInfo      binfo;
33
  uint32_t            sortBufSize;  // max buffer size for in-memory sort
34
  SArray*             pSortInfo;
35
  SSortHandle*        pSortHandle;
36
  SColMatchInfo       matchInfo;
37
  int32_t             bufPageSize;
38
  int64_t             startTs;      // sort start time
39
  uint64_t            sortElapsed;  // sort elapsed time, time to flush to disk not included.
40
  SLimitInfo          limitInfo;
41
  uint64_t            maxTupleLength;
42
  int64_t             maxRows;
43
  SSortOpGroupIdCalc* pGroupIdCalc;
44
} SSortOperatorInfo;
45

46
static int32_t doSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
47
static int32_t doOpenSortOperator(SOperatorInfo* pOperator);
48
static int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len);
49
static int32_t doGroupSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
50

51
static void destroySortOperatorInfo(void* param);
52
static void calcSortOperMaxTupleLength(SSortOperatorInfo* pSortOperInfo, SNodeList* pSortKeys);
53

54
static void destroySortOpGroupIdCalc(SSortOpGroupIdCalc* pCalc);
55

56
static int32_t resetSortOperState(SOperatorInfo* pOper) {
144,052✔
57
  SSortOperatorInfo* pInfo = pOper->info;
144,052✔
58
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
144,052✔
59
  pOper->status = OP_NOT_OPENED;
144,052✔
60

61
  resetBasicOperatorState(&pInfo->binfo);
144,052✔
62
  destroySqlFunctionCtx(pOper->exprSupp.pCtx, pOper->exprSupp.pExprInfo, pOper->exprSupp.numOfExprs);
144,052✔
63
  taosMemoryFreeClear(pOper->exprSupp.rowEntryInfoOffset);
144,052✔
64
  pOper->exprSupp.pCtx =
144,052✔
65
      createSqlFunctionCtx(pOper->exprSupp.pExprInfo, pOper->exprSupp.numOfExprs, &pOper->exprSupp.rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore);
144,052✔
66

67
  tsortDestroySortHandle(pInfo->pSortHandle);
144,052✔
68
  pInfo->pSortHandle = NULL;
144,052✔
69

70
  if (pInfo->pGroupIdCalc) {
144,052✔
71
    pInfo->pGroupIdCalc->lastGroupId = 0;
×
72
    pInfo->pGroupIdCalc->lastKeysLen = 0;
×
73
  }
74

75
  return 0;
144,052✔
76
}
77

78
// todo add limit/offset impl
79
int32_t createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
50,906,908✔
80
  QRY_PARAM_CHECK(pOptrInfo);
50,906,908✔
81

82
  int32_t code = 0;
50,910,499✔
83
  int32_t lino = 0;
50,910,499✔
84

85
  SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo));
50,910,499✔
86
  SOperatorInfo*     pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
50,886,102✔
87
  if (pInfo == NULL || pOperator == NULL) {
50,883,250✔
88
    code = terrno;
943✔
89
    goto _error;
×
90
  }
91

92
  pOperator->pTaskInfo = pTaskInfo;
50,882,307✔
93
  SDataBlockDescNode* pDescNode = pSortNode->node.pOutputDataBlockDesc;
50,888,369✔
94

95
  int32_t numOfCols = 0;
50,889,684✔
96
  code = createExprInfo(pSortNode->pExprs, NULL, &pOperator->exprSupp.pExprInfo, &numOfCols);
50,905,127✔
97
  QUERY_CHECK_CODE(code, lino, _error);
50,897,518✔
98

99
  pOperator->exprSupp.numOfExprs = numOfCols;
50,897,518✔
100
  int32_t numOfOutputCols = 0;
50,901,777✔
101
  code =
102
      extractColMatchInfo(pSortNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo);
50,892,307✔
103
  if (code != TSDB_CODE_SUCCESS) {
50,914,351✔
104
    goto _error;
×
105
  }
106
  
107
  calcSortOperMaxTupleLength(pInfo, pSortNode->pSortKeys);
50,914,351✔
108
  pInfo->maxRows = -1;
50,910,491✔
109
  if (pSortNode->node.pLimit && ((SLimitNode*)pSortNode->node.pLimit)->limit) {
50,908,728✔
110
    SLimitNode* pLimit = (SLimitNode*)pSortNode->node.pLimit;
8,243,254✔
111
    if (pLimit->limit->datum.i > 0) {
8,242,382✔
112
      pInfo->maxRows = pLimit->limit->datum.i + (pLimit->offset ? pLimit->offset->datum.i : 0);
8,241,498✔
113
    }
114
  }
115

116
  pOperator->exprSupp.pCtx =
50,900,788✔
117
      createSqlFunctionCtx(pOperator->exprSupp.pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore);
50,900,422✔
118
  QUERY_CHECK_NULL(pOperator->exprSupp.pCtx, code, lino, _error, terrno);
50,902,489✔
119
  initResultSizeInfo(&pOperator->resultInfo, 1024);
50,905,745✔
120
  code = filterInitFromNode((SNode*)pSortNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
50,913,669✔
121
                            pTaskInfo->pStreamRuntimeInfo);
50,900,846✔
122
  if (code != TSDB_CODE_SUCCESS) {
50,888,415✔
123
    goto _error;
×
124
  }
125

126
  pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
50,888,415✔
127
  QUERY_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno);
50,913,997✔
128

129
  pInfo->pSortInfo = createSortInfo(pSortNode->pSortKeys);
50,907,605✔
130
  TSDB_CHECK_NULL(pInfo->pSortInfo, code, lino, _error, terrno);
50,913,736✔
131

132
  if (pSortNode->calcGroupId) {
50,891,050✔
133
    int32_t keyLen;
107,852✔
134
    SSortOpGroupIdCalc* pGroupIdCalc = pInfo->pGroupIdCalc = taosMemoryCalloc(1, sizeof(SSortOpGroupIdCalc));
107,852✔
135
    if (!pGroupIdCalc) {
107,852✔
136
      code = terrno;
×
137
      goto _error;
×
138
    }
139
    SNodeList* pSortColsNodeArr = makeColsNodeArrFromSortKeys(pSortNode->pSortKeys);
107,852✔
140
    if (!pSortColsNodeArr) code = terrno;
107,852✔
141
    if (TSDB_CODE_SUCCESS == code) {
107,852✔
142
      pGroupIdCalc->pSortColsArr = makeColumnArrayFromList(pSortColsNodeArr);
107,852✔
143
      if (!pGroupIdCalc->pSortColsArr) code = terrno;
107,852✔
144
      nodesClearList(pSortColsNodeArr);
107,852✔
145
    }
146
    if (TSDB_CODE_SUCCESS == code) {
107,852✔
147
      // PK ts col should always at last, see partColOptCreateSort
148
      if (pSortNode->excludePkCol) taosArrayPop(pGroupIdCalc->pSortColsArr);
107,852✔
149
      code = extractKeysLen(pGroupIdCalc->pSortColsArr, &keyLen);
107,852✔
150
      QUERY_CHECK_CODE(code, lino, _error);
107,852✔
151
    }
152
    if (TSDB_CODE_SUCCESS == code) {
107,852✔
153
      pGroupIdCalc->lastKeysLen = 0;
107,852✔
154
      pGroupIdCalc->keyBuf = taosMemoryCalloc(1, keyLen);
107,852✔
155
      if (!pGroupIdCalc->keyBuf) {
107,852✔
156
        code = terrno;
×
157
      }
158
    }
159
  }
160
  if (code != TSDB_CODE_SUCCESS) goto _error;
50,905,581✔
161

162
  pInfo->binfo.inputTsOrder = pSortNode->node.inputTsOrder;
50,905,581✔
163
  pInfo->binfo.outputTsOrder = pSortNode->node.outputTsOrder;
50,907,496✔
164
  initLimitInfo(pSortNode->node.pLimit, pSortNode->node.pSlimit, &pInfo->limitInfo);
50,886,661✔
165

166
  setOperatorInfo(pOperator, "SortOperator", QUERY_NODE_PHYSICAL_PLAN_SORT, true, OP_NOT_OPENED, pInfo, pTaskInfo);
50,900,628✔
167

168

169
  // lazy evaluation for the following parameter since the input datablock is not known till now.
170
  //  pInfo->bufPageSize  = rowSize < 1024 ? 1024 * 2 : rowSize * 2;
171
  //  there are headers, so pageSize = rowSize + header pInfo->sortBufSize  = pInfo->bufPageSize * 16;
172
  // TODO dynamic set the available sort buffer
173

174
  pOperator->fpSet =
175
      createOperatorFpSet(doOpenSortOperator, doSort, NULL, destroySortOperatorInfo, optrDefaultBufFn, getExplainExecInfo, optrDefaultGetNextExtFn, NULL);
50,911,737✔
176

177
  setOperatorResetStateFn(pOperator, resetSortOperState);
50,904,077✔
178
  code = appendDownstream(pOperator, &downstream, 1);
50,903,917✔
179
  if (code != TSDB_CODE_SUCCESS) {
50,907,080✔
180
    goto _error;
×
181
  }
182

183
  *pOptrInfo = pOperator;
50,907,080✔
184
  return TSDB_CODE_SUCCESS;
50,908,934✔
185

186
_error:
×
187
  if (pInfo != NULL) {
×
188
    destroySortOperatorInfo(pInfo);
×
189
  }
190
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
191
  pTaskInfo->code = code;
×
192
  return code;
×
193
}
194

195
int32_t appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) {
2,147,483,647✔
196
  int32_t code = 0;
2,147,483,647✔
197
  for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
2,147,483,647✔
198
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
2,147,483,647✔
199
    if (pColInfo == NULL) {
2,147,483,647✔
200
      return terrno;
×
201
    }
202

203
    bool isNull = tsortIsNullVal(pTupleHandle, i);
2,147,483,647✔
204
    if (isNull) {
2,147,483,647✔
205
      colDataSetNULL(pColInfo, pBlock->info.rows);
2,147,483,647✔
206
    } else {
207
      char* pData = NULL;
2,147,483,647✔
208
      tsortGetValue(pTupleHandle, i, (void**) &pData);
2,147,483,647✔
209

210
      if (pData != NULL) {
2,147,483,647✔
211
        code = colDataSetVal(pColInfo, pBlock->info.rows, pData, false);
2,147,483,647✔
212
        if (code) {
2,147,483,647✔
213
          return code;
×
214
        }
215
      }
216
    }
217
  }
218

219
  pBlock->info.dataLoad = 1;
2,147,483,647✔
220

221
  SDataBlockInfo info = {0};
2,147,483,647✔
222
  tsortGetBlockInfo(pTupleHandle, &info);
2,147,483,647✔
223

224
  pBlock->info.scanFlag = info.scanFlag;
2,147,483,647✔
225
  pBlock->info.rows += 1;
2,147,483,647✔
226
  return code;
2,147,483,647✔
227
}
228

229
/**
230
 * @brief get next tuple with group id attached, here assume that all tuples are sorted by group keys
231
 * @param [in, out] pBlock the output block, the group id will be saved in it
232
 * @retval NULL if next group tuple arrived and this new group tuple will be saved in pInfo.pSavedTuple
233
 */
234
static int32_t nextTupleWithGroupId(SSortHandle* pHandle, SSortOperatorInfo* pInfo, SSDataBlock* pBlock,
2,147,483,647✔
235
                                    STupleHandle** pTupleHandle) {
236
  QRY_PARAM_CHECK(pTupleHandle);
2,147,483,647✔
237

238
  int32_t       code = 0;
2,147,483,647✔
239
  STupleHandle* retTuple = pInfo->pGroupIdCalc->pSavedTuple;
2,147,483,647✔
240
  if (!retTuple) {
2,147,483,647✔
241
    code = tsortNextTuple(pHandle, &retTuple);
2,147,483,647✔
242
    if (code) {
2,147,483,647✔
243
      qError("failed to get next tuple, code:%s", tstrerror(code));
×
244
      return code;
×
245
    }
246
  }
247

248
  if (retTuple) {
2,147,483,647✔
249
    int32_t newGroup;
250
    if (pInfo->pGroupIdCalc->pSavedTuple) {
2,147,483,647✔
251
      newGroup = true;
1,523,181✔
252
      pInfo->pGroupIdCalc->pSavedTuple = NULL;
1,523,181✔
253
    } else {
254
      newGroup = tsortCompAndBuildKeys(pInfo->pGroupIdCalc->pSortColsArr, pInfo->pGroupIdCalc->keyBuf,
2,147,483,647✔
255
                                       &pInfo->pGroupIdCalc->lastKeysLen, retTuple);
2,147,483,647✔
256
    }
257

258
    bool emptyBlock = (pBlock->info.rows == 0);
2,147,483,647✔
259
    if (newGroup) {
2,147,483,647✔
260
      if (!emptyBlock) {
3,155,128✔
261
        // new group arrived, and we have already copied some tuples for cur group, save the new group tuple, return
262
        // NULL. Note that the keyBuf and lastKeysLen has been updated to new value
263
        pInfo->pGroupIdCalc->pSavedTuple = retTuple;
1,524,095✔
264
        retTuple = NULL;
1,524,095✔
265
      } else {
266
        // new group with empty block
267
        pInfo->pGroupIdCalc->lastGroupId = pBlock->info.id.groupId =
1,631,033✔
268
            calcGroupId(pInfo->pGroupIdCalc->keyBuf, pInfo->pGroupIdCalc->lastKeysLen);
1,631,033✔
269
      }
270
    } else {
271
      if (emptyBlock) {
2,147,483,647✔
272
        // new block but not new group, assign last group id to it
273
        pBlock->info.id.groupId = pInfo->pGroupIdCalc->lastGroupId;
930,452✔
274
      } else {
275
        // not new group and not empty block and ret NOT NULL, just return the tuple
276
      }
277
    }
278
  }
279

280
  *pTupleHandle = retTuple;
2,147,483,647✔
281
  return code;
2,147,483,647✔
282
}
283

284
static int32_t getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo,
106,669,777✔
285
                                SSortOperatorInfo* pInfo, SSDataBlock** pResBlock) {
286
  QRY_PARAM_CHECK(pResBlock);
106,669,777✔
287
  blockDataCleanup(pDataBlock);
106,671,308✔
288

289
  int32_t       lino = 0;
106,672,207✔
290
  int32_t       code = 0;
106,672,207✔
291
  STupleHandle* pTupleHandle = NULL;
106,672,207✔
292
  SSDataBlock*  p = NULL;
106,672,207✔
293

294
  code = tsortGetSortedDataBlock(pHandle, &p);
106,673,354✔
295
  if (p == NULL || (code != 0)) {
106,673,414✔
296
    return code;
16,087,537✔
297
  }
298

299
  code = blockDataEnsureCapacity(p, capacity);
90,585,877✔
300
  QUERY_CHECK_CODE(code, lino, _error);
90,585,768✔
301

302
  while (1) {
303
    if (pInfo->pGroupIdCalc) {
2,147,483,647✔
304
      code = nextTupleWithGroupId(pHandle, pInfo, p, &pTupleHandle);
2,147,483,647✔
305
    } else {
306
      code = tsortNextTuple(pHandle, &pTupleHandle);
2,147,483,647✔
307
    }
308

309
    TSDB_CHECK_CODE(code, lino, _error);
2,147,483,647✔
310
    if (pTupleHandle == NULL) {
2,147,483,647✔
311
      break;
63,023,133✔
312
    }
313

314
    code = appendOneRowToDataBlock(p, pTupleHandle);
2,147,483,647✔
315
    QUERY_CHECK_CODE(code, lino, _error);
2,147,483,647✔
316

317
    if (p->info.rows >= capacity) {
2,147,483,647✔
318
      break;
27,563,164✔
319
    }
320
  }
321

322
  QUERY_CHECK_CODE(code, lino, _error);
90,586,297✔
323

324
  if (p->info.rows > 0) {
90,586,297✔
325
    code = blockDataEnsureCapacity(pDataBlock, capacity);
61,839,992✔
326
    QUERY_CHECK_CODE(code, lino, _error);
61,839,784✔
327

328
    // todo extract function to handle this
329
    int32_t numOfCols = taosArrayGetSize(pColMatchInfo);
61,839,784✔
330
    for (int32_t i = 0; i < numOfCols; ++i) {
315,017,526✔
331
      SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, i);
253,177,114✔
332
      QUERY_CHECK_NULL(pmInfo, code, lino, _error, terrno);
253,177,493✔
333

334
      SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId);
253,177,493✔
335
      QUERY_CHECK_NULL(pSrc, code, lino, _error, terrno);
253,175,838✔
336

337
      SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->dstSlotId);
253,175,838✔
338
      QUERY_CHECK_NULL(pDst, code, lino, _error, terrno);
253,175,368✔
339

340
      code = colDataAssign(pDst, pSrc, p->info.rows, &pDataBlock->info);
253,175,368✔
341
      QUERY_CHECK_CODE(code, lino, _error);
253,178,141✔
342
    }
343

344
    pDataBlock->info.dataLoad = 1;
61,840,412✔
345
    pDataBlock->info.rows = p->info.rows;
61,840,412✔
346
    pDataBlock->info.scanFlag = p->info.scanFlag;
61,839,942✔
347
    pDataBlock->info.id.groupId = p->info.id.groupId;
61,838,944✔
348
  }
349

350
  blockDataDestroy(p);
90,586,247✔
351
  *pResBlock = (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
90,583,663✔
352
  return code;
90,583,350✔
353

354
  _error:
×
355
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
356

357
  blockDataDestroy(p);
×
358
  return code;
×
359
}
360

361
int32_t loadNextDataBlock(void* param, SSDataBlock** ppBlock) {
2,018,956,746✔
362
  SOperatorInfo* pOperator = (SOperatorInfo*)param;
2,018,956,746✔
363
  int32_t        code = pOperator->fpSet.getNextFn(pOperator, ppBlock);
2,018,956,746✔
364
  if (code) {
2,017,972,789✔
365
    qError("failed to get next data block from upstream, %s code:%s", __func__, tstrerror(code));
×
366
  } else {
367
    code = blockDataCheck(*ppBlock);
2,017,972,789✔
368
    if (code) {
2,018,010,092✔
369
      qError("failed to check block data, %s code:%s", __func__, tstrerror(code));
×
370
    }
371
  }
372
  return code;
2,017,976,471✔
373
}
374

375
// todo refactor: merged with fetch fp
376
void applyScalarFunction(SSDataBlock* pBlock, void* param) {
2,070,586,568✔
377
  SOperatorInfo*     pOperator = param;
2,070,586,568✔
378
  SSortOperatorInfo* pSort = pOperator->info;
2,070,586,568✔
379
  if (pOperator->exprSupp.pExprInfo != NULL && pOperator->exprSupp.numOfExprs > 0) {
2,070,592,610✔
380
    int32_t code = projectApplyFunctions(pOperator->exprSupp.pExprInfo, pBlock, pBlock, pOperator->exprSupp.pCtx,
2,107,860✔
381
                                         pOperator->exprSupp.numOfExprs, NULL,
382
                                         GET_STM_RTINFO(pOperator->pTaskInfo));
2,107,860✔
383
    if (code != TSDB_CODE_SUCCESS) {
2,107,860✔
384
      T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
385
    }
386
  }
387
}
2,070,589,616✔
388

389
int32_t doOpenSortOperator(SOperatorInfo* pOperator) {
107,591,567✔
390
  SSortOperatorInfo* pInfo = pOperator->info;
107,591,567✔
391
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;
107,596,039✔
392
  int32_t            code = TSDB_CODE_SUCCESS;
107,591,139✔
393
  int32_t            lino = 0;
107,591,139✔
394
  SSortSource* pSource =NULL;
107,591,139✔
395

396
  if (OPTR_IS_OPENED(pOperator)) {
107,591,139✔
397
    return code;
57,775,377✔
398
  }
399

400
  pInfo->startTs = taosGetTimestampUs();
49,816,900✔
401
  //  pInfo->binfo.pRes is not equalled to the input datablock.
402
  pInfo->pSortHandle = NULL;
49,813,932✔
403
  code =
404
      tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str, pInfo->maxRows,
99,625,620✔
405
                            pInfo->maxTupleLength, tsPQSortMemThreshold * 1024 * 1024, &pInfo->pSortHandle);
49,813,646✔
406
  QUERY_CHECK_CODE(code, lino, _end);
49,801,899✔
407

408
  tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, applyScalarFunction, pOperator);
49,801,899✔
409

410
  pSource = taosMemoryCalloc(1, sizeof(SSortSource));
49,793,782✔
411
  QUERY_CHECK_NULL(pSource, code, lino, _end, terrno);
49,790,271✔
412

413
  pSource->param = pOperator->pDownstream[0];
49,790,271✔
414
  pSource->onlyRef = true;
49,809,070✔
415

416
  code = tsortAddSource(pInfo->pSortHandle, pSource);
49,802,993✔
417
  QUERY_CHECK_CODE(code, lino, _end);
49,799,215✔
418
  pSource = NULL;
49,799,215✔
419

420
  code = tsortOpen(pInfo->pSortHandle);
49,799,215✔
421
  QUERY_CHECK_CODE(code, lino, _end);
48,860,304✔
422
  pOperator->cost.openCost = (taosGetTimestampUs() - pInfo->startTs) / 1000.0;
48,859,847✔
423
  pOperator->status = OP_RES_TO_RETURN;
48,860,386✔
424
  OPTR_SET_OPENED(pOperator);
48,859,847✔
425

426
_end:
48,860,843✔
427
  if (pSource) {
48,860,843✔
428
    taosMemoryFree(pSource);
×
429
  }
430
  if (code != TSDB_CODE_SUCCESS) {
48,860,304✔
431
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
457✔
432
    pTaskInfo->code = code;
457✔
433
    T_LONG_JMP(pTaskInfo->env, code);
457✔
434
  }
435
  return code;
48,859,847✔
436
}
437

438
int32_t doSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
107,591,673✔
439
  QRY_PARAM_CHECK(pResBlock);
107,591,673✔
440
  int32_t code = TSDB_CODE_SUCCESS;
107,592,584✔
441
  int32_t lino = 0;
107,592,584✔
442
  if (pOperator->status == OP_EXEC_DONE) {
107,592,584✔
443
    return code;
×
444
  }
445

446
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;
107,588,902✔
447
  SSortOperatorInfo* pInfo = pOperator->info;
107,587,973✔
448

449
  code = pOperator->fpSet._openFn(pOperator);
107,593,347✔
450
  QUERY_CHECK_CODE(code, lino, _end);
106,634,506✔
451

452
  // multi-group case not handle here
453
  SSDataBlock* pBlock = NULL;
106,634,506✔
454
  while (1) {
33,600✔
455
    if (tsortIsClosed(pInfo->pSortHandle)) {
106,668,599✔
456
      code = TSDB_CODE_TSC_QUERY_CANCELLED;
×
457
      QUERY_CHECK_CODE(code, lino, _end);
×
458
    }
459

460
    code = getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity,
106,673,335✔
461
                                pInfo->matchInfo.pList, pInfo, &pBlock);
462
    QUERY_CHECK_CODE(code, lino, _end);
106,669,495✔
463
    if (pBlock == NULL) {
106,669,495✔
464
      setOperatorCompleted(pOperator);
44,831,444✔
465
      return code;
44,832,422✔
466
    }
467

468
    code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, &pInfo->matchInfo, NULL);
61,838,051✔
469
    QUERY_CHECK_CODE(code, lino, _end);
61,837,239✔
470

471
    if (blockDataGetNumOfRows(pBlock) == 0) {
61,837,239✔
472
      continue;
×
473
    }
474

475
    // there are bugs?
476
    bool limitReached = applyLimitOffset(&pInfo->limitInfo, pBlock, pTaskInfo);
61,838,593✔
477
    if (limitReached) {
61,837,386✔
478
      resetLimitInfoForNextGroup(&pInfo->limitInfo);
5,512,652✔
479
    }
480

481
    pOperator->resultInfo.totalRows += pBlock->info.rows;
61,837,921✔
482
    if (pBlock->info.rows > 0) {
61,839,384✔
483
      break;
61,804,915✔
484
    }
485
  }
486

487
  *pResBlock = blockDataGetNumOfRows(pBlock) > 0 ? pBlock : NULL;
61,804,915✔
488
_end:
61,805,284✔
489
  if (code != TSDB_CODE_SUCCESS) {
61,801,918✔
UNCOV
490
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
491
    pTaskInfo->code = code;
×
492
    T_LONG_JMP(pTaskInfo->env, code);
×
493
  }
494
  return code;
61,804,951✔
495
}
496

497
void destroySortOperatorInfo(void* param) {
50,914,387✔
498
  SSortOperatorInfo* pInfo = (SSortOperatorInfo*)param;
50,914,387✔
499
  blockDataDestroy(pInfo->binfo.pRes);
50,914,387✔
500
  pInfo->binfo.pRes = NULL;
50,915,245✔
501

502
  tsortDestroySortHandle(pInfo->pSortHandle);
50,914,507✔
503
  taosArrayDestroy(pInfo->pSortInfo);
50,915,422✔
504
  taosArrayDestroy(pInfo->matchInfo.pList);
50,912,581✔
505
  destroySortOpGroupIdCalc(pInfo->pGroupIdCalc);
50,915,226✔
506
  taosMemoryFreeClear(param);
50,915,263✔
507
}
50,914,450✔
508

509
int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
1,601,232✔
510
  SSortExecInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortExecInfo));
1,601,232✔
511
  if (pInfo == NULL) {
1,600,186✔
512
    return terrno;
×
513
  }
514

515
  SSortOperatorInfo* pOperatorInfo = (SSortOperatorInfo*)pOptr->info;
1,600,186✔
516

517
  *pInfo = tsortGetSortExecInfo(pOperatorInfo->pSortHandle);
1,600,186✔
518
  *pOptrExplain = pInfo;
1,601,232✔
519
  *len = sizeof(SSortExecInfo);
1,601,232✔
520
  return TSDB_CODE_SUCCESS;
1,601,232✔
521
}
522

523
static void calcSortOperMaxTupleLength(SSortOperatorInfo* pSortOperInfo, SNodeList* pSortKeys) {
50,906,071✔
524
  SColMatchInfo* pColItem = &pSortOperInfo->matchInfo;
50,906,071✔
525
  size_t         size = taosArrayGetSize(pColItem->pList);
50,915,249✔
526
  for (size_t i = 0; i < size; ++i) {
168,556,140✔
527
    SColMatchItem* pInfo = taosArrayGet(pColItem->pList, i);
117,656,468✔
528
    if (pInfo == NULL) {
117,663,026✔
529
      continue;
×
530
    }
531

532
    pSortOperInfo->maxTupleLength += pInfo->dataType.bytes;
117,663,026✔
533
  }
534

535
  size = LIST_LENGTH(pSortKeys);
50,899,672✔
536
  for (size_t i = 0; i < size; ++i) {
120,204,230✔
537
    SOrderByExprNode* pOrderExprNode = (SOrderByExprNode*)nodesListGetNode(pSortKeys, i);
69,293,062✔
538
    pSortOperInfo->maxTupleLength += ((SColumnNode*)pOrderExprNode->pExpr)->node.resType.bytes;
69,302,287✔
539
  }
540
}
50,911,168✔
541

542
static void destroySortOpGroupIdCalc(SSortOpGroupIdCalc* pCalc) {
50,916,801✔
543
  if (pCalc) {
50,916,801✔
544
    taosArrayDestroy(pCalc->pSortColsArr);
107,852✔
545
    taosMemoryFree(pCalc->keyBuf);
107,852✔
546
    taosMemoryFree(pCalc);
107,852✔
547
  }
548
}
50,916,801✔
549

550
//=====================================================================================
551
// Group Sort Operator
552
typedef enum EChildOperatorStatus { CHILD_OP_NEW_GROUP, CHILD_OP_SAME_GROUP, CHILD_OP_FINISHED } EChildOperatorStatus;
553

554
typedef struct SGroupSortOperatorInfo {
555
  SOptrBasicInfo       binfo;
556
  SArray*              pSortInfo;
557
  SColMatchInfo        matchInfo;
558
  int64_t              startTs;
559
  uint64_t             sortElapsed;
560
  bool                 hasGroupId;
561
  uint64_t             currGroupId;
562
  SSDataBlock*         prefetchedSortInput;
563
  SSortHandle*         pCurrSortHandle;
564
  EChildOperatorStatus childOpStatus;
565
  SSortExecInfo        sortExecInfo;
566
} SGroupSortOperatorInfo;
567

568
int32_t getGroupSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo,
1,912,200✔
569
                                SGroupSortOperatorInfo* pInfo, SSDataBlock** pResBlock) {
570
  QRY_PARAM_CHECK(pResBlock);
1,912,200✔
571

572
  blockDataCleanup(pDataBlock);
1,912,200✔
573
  int32_t code = blockDataEnsureCapacity(pDataBlock, capacity);
1,912,200✔
574
  if (code) {
1,912,200✔
575
    return code;
×
576
  }
577

578
  SSDataBlock* p = NULL;
1,912,200✔
579
  code = tsortGetSortedDataBlock(pHandle, &p);
1,912,200✔
580
  if (p == NULL || (code != 0)) {
1,912,200✔
581
    return code;
×
582
  }
583

584
  code = blockDataEnsureCapacity(p, capacity);
1,912,200✔
585
  if (code) {
1,912,200✔
586
    return code;
×
587
  }
588

589
  while (1) {
1,847,058,884✔
590
    STupleHandle* pTupleHandle = NULL;
1,848,971,084✔
591
    code = tsortNextTuple(pHandle, &pTupleHandle);
1,848,971,084✔
592
    if (pTupleHandle == NULL || code != 0) {
1,848,971,084✔
593
      break;
594
    }
595

596
    code = appendOneRowToDataBlock(p, pTupleHandle);
1,848,824,920✔
597
    if (code) {
1,848,824,920✔
598
      break;
×
599
    }
600

601
    if (p->info.rows >= capacity) {
1,848,824,920✔
602
      break;
1,766,036✔
603
    }
604
  }
605

606
  if (p->info.rows > 0) {
1,912,200✔
607
    int32_t numOfCols = taosArrayGetSize(pColMatchInfo);
1,839,118✔
608
    for (int32_t i = 0; i < numOfCols; ++i) {
5,517,354✔
609
      SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, i);
3,678,236✔
610
      if (pmInfo == NULL) {
3,678,236✔
611
        return terrno;
×
612
      }
613

614
      SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId);
3,678,236✔
615
      if (pSrc == NULL) {
3,678,236✔
616
        return terrno;
×
617
      }
618

619
      SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->dstSlotId);
3,678,236✔
620
      if (pDst == NULL) {
3,678,236✔
621
        return terrno;
×
622
      }
623

624
      code = colDataAssign(pDst, pSrc, p->info.rows, &pDataBlock->info);
3,678,236✔
625
      if (code) {
3,678,236✔
626
        return code;
×
627
      }
628
    }
629

630
    pDataBlock->info.rows = p->info.rows;
1,839,118✔
631
    pDataBlock->info.capacity = p->info.rows;
1,839,118✔
632
    pDataBlock->info.scanFlag = p->info.scanFlag;
1,839,118✔
633
  }
634

635
  blockDataDestroy(p);
1,912,200✔
636
  *pResBlock = (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
1,912,200✔
637
  return code;
1,912,200✔
638
}
639

640
typedef struct SGroupSortSourceParam {
641
  SOperatorInfo*          childOpInfo;
642
  SGroupSortOperatorInfo* grpSortOpInfo;
643
} SGroupSortSourceParam;
644

645
int32_t fetchNextGroupSortDataBlock(void* param, SSDataBlock** ppBlock) {
101,513,016✔
646
  int32_t                 code = 0;
101,513,016✔
647
  int32_t                 lino = 0;
101,513,016✔
648
  SGroupSortSourceParam*  source = param;
101,513,016✔
649
  SGroupSortOperatorInfo* grpSortOpInfo = source->grpSortOpInfo;
101,513,016✔
650
  SSDataBlock*            block = NULL;
101,513,016✔
651

652
  QRY_PARAM_CHECK(ppBlock);
101,513,016✔
653

654
  if (grpSortOpInfo->prefetchedSortInput) {
101,513,016✔
655
    block = grpSortOpInfo->prefetchedSortInput;
73,082✔
656
    grpSortOpInfo->prefetchedSortInput = NULL;
73,082✔
657
    *ppBlock = block;
73,082✔
658
  } else {
659
    SOperatorInfo* childOp = source->childOpInfo;
101,439,934✔
660
    code = childOp->fpSet.getNextFn(childOp, &block);
101,439,934✔
661
    QUERY_CHECK_CODE(code, lino, _end);
101,439,934✔
662

663
    if (block != NULL) {
101,439,934✔
664
      code = blockDataCheck(block);
101,400,988✔
665
      QUERY_CHECK_CODE(code, lino, _end);
101,400,988✔
666
      if (block->info.id.groupId == grpSortOpInfo->currGroupId) {
101,400,988✔
667
        grpSortOpInfo->childOpStatus = CHILD_OP_SAME_GROUP;
101,366,852✔
668
        *ppBlock = block;
101,366,852✔
669
      } else {
670
        grpSortOpInfo->childOpStatus = CHILD_OP_NEW_GROUP;
34,136✔
671
        grpSortOpInfo->prefetchedSortInput = block;
34,136✔
672
      }
673
    } else {
674
      grpSortOpInfo->childOpStatus = CHILD_OP_FINISHED;
38,946✔
675
    }
676
  }
677

678
  return code;
101,513,016✔
679
_end:
×
680
  if (code != 0) {
×
681
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
682
  }
683
  return code;
×
684
}
685

686
int32_t beginSortGroup(SOperatorInfo* pOperator) {
73,082✔
687
  SGroupSortOperatorInfo* pInfo = pOperator->info;
73,082✔
688
  SExecTaskInfo*          pTaskInfo = pOperator->pTaskInfo;
73,082✔
689

690
  //  pInfo->binfo.pRes is not equalled to the input datablock.
691
  pInfo->pCurrSortHandle = NULL;
73,082✔
692

693
  int32_t code = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str, 0, 0,
73,082✔
694
                                       0, &pInfo->pCurrSortHandle);
695
  if (code) {
73,082✔
696
    return code;
×
697
  }
698

699
  tsortSetFetchRawDataFp(pInfo->pCurrSortHandle, fetchNextGroupSortDataBlock, applyScalarFunction, pOperator);
73,082✔
700

701
  SSortSource*           ps = taosMemoryCalloc(1, sizeof(SSortSource));
73,082✔
702
  SGroupSortSourceParam* param = taosMemoryCalloc(1, sizeof(SGroupSortSourceParam));
73,082✔
703
  if (ps == NULL || param == NULL) {
73,082✔
704
    taosMemoryFree(ps);
×
705
    taosMemoryFree(param);
×
706
    return terrno;
×
707
  }
708

709
  param->childOpInfo = pOperator->pDownstream[0];
73,082✔
710
  param->grpSortOpInfo = pInfo;
73,082✔
711

712
  ps->param = param;
73,082✔
713
  ps->onlyRef = false;
73,082✔
714
  code = tsortAddSource(pInfo->pCurrSortHandle, ps);
73,082✔
715
  if (code != 0) {
73,082✔
716
    return code;
×
717
  }
718

719
  code = tsortOpen(pInfo->pCurrSortHandle);
73,082✔
720
  return code;
73,082✔
721
}
722

723
int32_t finishSortGroup(SOperatorInfo* pOperator) {
73,082✔
724
  SGroupSortOperatorInfo* pInfo = pOperator->info;
73,082✔
725

726
  SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pCurrSortHandle);
73,082✔
727

728
  pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod;
73,082✔
729
  pInfo->sortExecInfo.sortBuffer = sortExecInfo.sortBuffer;
73,082✔
730
  pInfo->sortExecInfo.loops += sortExecInfo.loops;
73,082✔
731
  pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes;
73,082✔
732
  pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes;
73,082✔
733

734
  tsortDestroySortHandle(pInfo->pCurrSortHandle);
73,082✔
735
  pInfo->pCurrSortHandle = NULL;
73,082✔
736

737
  return TSDB_CODE_SUCCESS;
73,082✔
738
}
739

740
int32_t doGroupSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
1,882,269✔
741
  QRY_PARAM_CHECK(pResBlock);
1,882,269✔
742
  SExecTaskInfo*          pTaskInfo = pOperator->pTaskInfo;
1,882,269✔
743
  SGroupSortOperatorInfo* pInfo = pOperator->info;
1,882,269✔
744
  int32_t                 code = TSDB_CODE_SUCCESS;
1,882,269✔
745
  int32_t                 lino = 0;
1,882,269✔
746

747
  if (pOperator->status == OP_EXEC_DONE) {
1,882,269✔
748
    return code;
×
749
  }
750

751
  code = pOperator->fpSet._openFn(pOperator);
1,882,269✔
752
  QUERY_CHECK_CODE(code, lino, _end);
1,882,269✔
753

754
  if (!pInfo->hasGroupId) {
1,882,269✔
755
    pInfo->hasGroupId = true;
43,151✔
756

757
    pInfo->prefetchedSortInput = getNextBlockFromDownstream(pOperator, 0);
43,151✔
758
    if (pInfo->prefetchedSortInput == NULL) {
43,151✔
759
      setOperatorCompleted(pOperator);
4,205✔
760
      return code;
4,205✔
761
    }
762

763
    pInfo->currGroupId = pInfo->prefetchedSortInput->info.id.groupId;
38,946✔
764
    pInfo->childOpStatus = CHILD_OP_NEW_GROUP;
38,946✔
765
    code = beginSortGroup(pOperator);
38,946✔
766
    QUERY_CHECK_CODE(code, lino, _end);
38,946✔
767
  }
768

769
  SSDataBlock* pBlock = NULL;
1,878,064✔
770
  while (pInfo->pCurrSortHandle != NULL) {
1,912,200✔
771
    if (tsortIsClosed(pInfo->pCurrSortHandle)) {
1,912,200✔
772
      code = TSDB_CODE_TSC_QUERY_CANCELLED;
×
773
      QUERY_CHECK_CODE(code, lino, _end);
×
774
    }
775

776
    // beginSortGroup would fetch all child blocks of pInfo->currGroupId;
777
    if (pInfo->childOpStatus == CHILD_OP_SAME_GROUP) {
1,912,200✔
778
      code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
779
      QUERY_CHECK_CODE(code, lino, _end);
×
780
    }
781

782
    code = getGroupSortedBlockData(pInfo->pCurrSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity,
1,912,200✔
783
                                     pInfo->matchInfo.pList, pInfo, &pBlock);
784
    QUERY_CHECK_CODE(code, lino, _end);
1,912,200✔
785
    if (pBlock != NULL) {
1,912,200✔
786
      pBlock->info.id.groupId = pInfo->currGroupId;
1,839,118✔
787
      pOperator->resultInfo.totalRows += pBlock->info.rows;
1,839,118✔
788
      *pResBlock = pBlock;
1,839,118✔
789
      return code;
1,839,118✔
790
    } else {
791
      if (pInfo->childOpStatus == CHILD_OP_NEW_GROUP) {
73,082✔
792
        (void) finishSortGroup(pOperator);
34,136✔
793
        pInfo->currGroupId = pInfo->prefetchedSortInput->info.id.groupId;
34,136✔
794
        code = beginSortGroup(pOperator);
34,136✔
795
        QUERY_CHECK_CODE(code, lino, _end);
34,136✔
796
      } else if (pInfo->childOpStatus == CHILD_OP_FINISHED) {
38,946✔
797
        (void) finishSortGroup(pOperator);
38,946✔
798
        setOperatorCompleted(pOperator);
38,946✔
799
        return code;
38,946✔
800
      }
801
    }
802
  }
803

804
_end:
×
805
  if (code != TSDB_CODE_SUCCESS) {
×
806
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
807
    pTaskInfo->code = code;
×
808
    T_LONG_JMP(pTaskInfo->env, code);
×
809
  }
810
  return code;
×
811
}
812

813
int32_t getGroupSortExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
×
814
  SGroupSortOperatorInfo* pInfo = (SGroupSortOperatorInfo*)pOptr->info;
×
815
  *pOptrExplain = &pInfo->sortExecInfo;
×
816
  *len = sizeof(SSortExecInfo);
×
817
  return TSDB_CODE_SUCCESS;
×
818
}
819

820
void destroyGroupSortOperatorInfo(void* param) {
43,151✔
821
  SGroupSortOperatorInfo* pInfo = (SGroupSortOperatorInfo*)param;
43,151✔
822
  blockDataDestroy(pInfo->binfo.pRes);
43,151✔
823
  pInfo->binfo.pRes = NULL;
43,151✔
824

825
  taosArrayDestroy(pInfo->pSortInfo);
43,151✔
826
  taosArrayDestroy(pInfo->matchInfo.pList);
43,151✔
827

828
  tsortDestroySortHandle(pInfo->pCurrSortHandle);
43,151✔
829
  pInfo->pCurrSortHandle = NULL;
43,151✔
830

831
  taosMemoryFreeClear(param);
43,151✔
832
}
43,151✔
833

834
static int32_t resetGroupSortOperState(SOperatorInfo* pOper) {
×
835
  SGroupSortOperatorInfo* pInfo = pOper->info;
×
836
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
×
837
  pOper->status = OP_NOT_OPENED;
×
838

839
  pInfo->currGroupId = 0;
×
840
  pInfo->hasGroupId = false;
×
841
  pInfo->prefetchedSortInput = NULL;
×
842
  pInfo->childOpStatus = CHILD_OP_NEW_GROUP;
×
843
  pInfo->sortExecInfo = (SSortExecInfo){0};
×
844
  
845
  resetBasicOperatorState(&pInfo->binfo);
×
846
  destroySqlFunctionCtx(pOper->exprSupp.pCtx, pOper->exprSupp.pExprInfo, pOper->exprSupp.numOfExprs);
×
847
  taosMemoryFreeClear(pOper->exprSupp.rowEntryInfoOffset);
×
848
  pOper->exprSupp.pCtx =
×
849
      createSqlFunctionCtx(pOper->exprSupp.pExprInfo, pOper->exprSupp.numOfExprs, &pOper->exprSupp.rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore);
×
850

851
  tsortDestroySortHandle(pInfo->pCurrSortHandle);
×
852
  pInfo->pCurrSortHandle = NULL;
×
853

854
  return 0;
×
855
}
856

857
int32_t createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode,
43,151✔
858
                                    SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
859
  QRY_PARAM_CHECK(pOptrInfo);
43,151✔
860
  int32_t code = 0;
43,151✔
861
  int32_t lino = 0;
43,151✔
862

863
  SGroupSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupSortOperatorInfo));
43,151✔
864
  SOperatorInfo*          pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
43,151✔
865
  if (pInfo == NULL || pOperator == NULL) {
43,151✔
866
    code = terrno;
×
867
    goto _error;
×
868
  }
869

870
  SExprSupp*          pSup = &pOperator->exprSupp;
43,151✔
871
  SDataBlockDescNode* pDescNode = pSortPhyNode->node.pOutputDataBlockDesc;
43,151✔
872

873
  int32_t    numOfCols = 0;
43,151✔
874
  SExprInfo* pExprInfo = NULL;
43,151✔
875
  code = createExprInfo(pSortPhyNode->pExprs, NULL, &pExprInfo, &numOfCols);
43,151✔
876
  QUERY_CHECK_CODE(code, lino, _error);
43,151✔
877

878
  pSup->pExprInfo = pExprInfo;
43,151✔
879
  pSup->numOfExprs = numOfCols;
43,151✔
880

881
  initResultSizeInfo(&pOperator->resultInfo, 1024);
43,151✔
882
  pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset,
43,151✔
883
                                                  &pTaskInfo->storageAPI.functionStore);
884
  QUERY_CHECK_NULL(pOperator->exprSupp.pCtx, code, lino, _error, terrno);
43,151✔
885

886
  pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
43,151✔
887
  QUERY_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno);
43,151✔
888

889
  code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
43,151✔
890
  TSDB_CHECK_CODE(code, lino, _error);
43,151✔
891

892
  pInfo->binfo.inputTsOrder = pSortPhyNode->node.inputTsOrder;
43,151✔
893
  pInfo->binfo.outputTsOrder = pSortPhyNode->node.outputTsOrder;
43,151✔
894

895
  int32_t numOfOutputCols = 0;
43,151✔
896
  code = extractColMatchInfo(pSortPhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID,
43,151✔
897
                             &pInfo->matchInfo);
898
  TSDB_CHECK_CODE(code, lino, _error);
43,151✔
899

900
  pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys);
43,151✔
901
  setOperatorInfo(pOperator, "GroupSortOperator", QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT, false, OP_NOT_OPENED, pInfo,
43,151✔
902
                  pTaskInfo);
903
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doGroupSort, NULL, destroyGroupSortOperatorInfo,
43,151✔
904
                                         optrDefaultBufFn, getGroupSortExplainExecInfo, optrDefaultGetNextExtFn, NULL);
905

906
  setOperatorResetStateFn(pOperator, resetGroupSortOperState);
43,151✔
907
                                         
908
  code = appendDownstream(pOperator, &downstream, 1);
43,151✔
909
  if (code != TSDB_CODE_SUCCESS) {
43,151✔
910
    goto _error;
×
911
  }
912

913
  *pOptrInfo = pOperator;
43,151✔
914
  return TSDB_CODE_SUCCESS;
43,151✔
915

916
_error:
×
917
  pTaskInfo->code = code;
×
918
  if (pInfo != NULL) {
×
919
    destroyGroupSortOperatorInfo(pInfo);
×
920
  }
921
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
922
  return code;
×
923
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc