• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5016

03 Apr 2026 03:59PM UTC coverage: 72.299% (+0.01%) from 72.289%
#5016

push

travis-ci

web-flow
merge: from main to 3.0 branch #35067

4055 of 5985 new or added lines in 68 files covered. (67.75%)

13126 existing lines in 156 files now uncovered.

257424 of 356056 relevant lines covered (72.3%)

133108577.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.33
/source/libs/executor/src/sortoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "operator.h"
19
#include "querytask.h"
20
#include "tdatablock.h"
21

22
typedef struct SSortOpGroupIdCalc {
23
  STupleHandle* pSavedTuple;
24
  SArray*       pSortColsArr;
25
  char*         keyBuf;
26
  int32_t       lastKeysLen; // default to be 0
27
  uint64_t      lastGroupId;
28
  bool          excludePKCol;
29
} SSortOpGroupIdCalc;
30

31
typedef struct SSortOperatorInfo {
32
  SOptrBasicInfo      binfo;
33
  uint32_t            sortBufSize;  // max buffer size for in-memory sort
34
  SArray*             pSortInfo;
35
  SSortHandle*        pSortHandle;
36
  SColMatchInfo       matchInfo;
37
  int32_t             bufPageSize;
38
  int64_t             startTs;      // sort start time
39
  uint64_t            sortElapsed;  // sort elapsed time, time to flush to disk not included.
40
  SLimitInfo          limitInfo;
41
  uint64_t            maxTupleLength;
42
  int64_t             maxRows;
43
  SSortOpGroupIdCalc* pGroupIdCalc;
44
} SSortOperatorInfo;
45

46
static int32_t doSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
47
static int32_t doOpenSortOperator(SOperatorInfo* pOperator);
48
static int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len);
49
static int32_t doGroupSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
50

51
static void destroySortOperatorInfo(void* param);
52
static void calcSortOperMaxTupleLength(SSortOperatorInfo* pSortOperInfo, SNodeList* pSortKeys);
53

54
// Check whether the given slotId is the output of a Sort scalar pre-calculation expression.
55
// The planner's rewritePrecalcExprs pushes expression results (e.g. ts+1000) into extra slots
56
// in the child's output descriptor. This helper identifies such slots.
57
static bool sortIsExprResultSlot(const SOperatorInfo* pOperator, int32_t slotId) {
31,903,349✔
58
  if (pOperator == NULL || pOperator->exprSupp.pExprInfo == NULL || slotId < 0) {
31,903,349✔
59
    return false;
22,422,791✔
60
  }
61
  for (int32_t idx = 0; idx < pOperator->exprSupp.numOfExprs; ++idx) {
14,351,792✔
62
    if (pOperator->exprSupp.pExprInfo[idx].base.resSchema.slotId == slotId) {
9,499,887✔
63
      return true;
4,628,247✔
64
    }
65
  }
66
  return false;
4,851,905✔
67
}
68

69
// Find the slot index of the original (non-expression) primary timestamp column in the
70
// Sort's internal data block.  The internal block may contain both the original ts column
71
// and an expression-derived ts column (e.g. ts+1000).  We return the first TIMESTAMP
72
// column whose slot is NOT an expression result.
73
static int32_t sortFindOrigTsSlot(const SOperatorInfo* pOperator, const SSDataBlock* pBlock) {
4,628,247✔
74
  if (pBlock == NULL || pBlock->pDataBlock == NULL) {
4,628,247✔
NEW
75
    return -1;
×
76
  }
77
  int32_t colCount = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
4,628,247✔
78
  for (int32_t idx = 0; idx < colCount; ++idx) {
4,628,247✔
79
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, idx);
4,628,247✔
80
    if (pCol != NULL && pCol->info.type == TSDB_DATA_TYPE_TIMESTAMP &&
4,628,247✔
81
        !sortIsExprResultSlot(pOperator, idx)) {
4,628,247✔
82
      return idx;
4,628,247✔
83
    }
84
  }
NEW
85
  return -1;
×
86
}
87

88
static void destroySortOpGroupIdCalc(SSortOpGroupIdCalc* pCalc);
89

90
static int32_t resetSortOperState(SOperatorInfo* pOper) {
222,774✔
91
  SSortOperatorInfo* pInfo = pOper->info;
222,774✔
92
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
222,774✔
93
  pOper->status = OP_NOT_OPENED;
222,774✔
94

95
  resetBasicOperatorState(&pInfo->binfo);
222,774✔
96
  destroySqlFunctionCtx(pOper->exprSupp.pCtx, pOper->exprSupp.pExprInfo, pOper->exprSupp.numOfExprs);
222,774✔
97
  taosMemoryFreeClear(pOper->exprSupp.rowEntryInfoOffset);
222,774✔
98
  pOper->exprSupp.pCtx =
222,774✔
99
      createSqlFunctionCtx(pOper->exprSupp.pExprInfo, pOper->exprSupp.numOfExprs, &pOper->exprSupp.rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore);
222,774✔
100

101
  tsortDestroySortHandle(pInfo->pSortHandle);
222,774✔
102
  pInfo->pSortHandle = NULL;
222,774✔
103

104
  if (pInfo->pGroupIdCalc) {
222,774✔
UNCOV
105
    pInfo->pGroupIdCalc->lastGroupId = 0;
×
UNCOV
106
    pInfo->pGroupIdCalc->lastKeysLen = 0;
×
107
  }
108

109
  return 0;
222,774✔
110
}
111

112
// todo add limit/offset impl
113
int32_t createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
34,346,601✔
114
  QRY_PARAM_CHECK(pOptrInfo);
34,346,601✔
115

116
  int32_t code = 0;
34,353,560✔
117
  int32_t lino = 0;
34,353,560✔
118

119
  SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo));
34,353,560✔
120
  SOperatorInfo*     pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
34,271,047✔
121
  if (pInfo == NULL || pOperator == NULL) {
34,285,156✔
122
    code = terrno;
3,397✔
UNCOV
123
    goto _error;
×
124
  }
125
  initOperatorCostInfo(pOperator);
34,281,759✔
126

127
  pOperator->pTaskInfo = pTaskInfo;
34,336,141✔
128
  SDataBlockDescNode* pDescNode = pSortNode->node.pOutputDataBlockDesc;
34,349,313✔
129

130
  int32_t numOfCols = 0;
34,342,291✔
131
  code = createExprInfo(pSortNode->pExprs, NULL, &pOperator->exprSupp.pExprInfo, &numOfCols);
34,356,016✔
132
  QUERY_CHECK_CODE(code, lino, _error);
34,289,320✔
133

134
  pOperator->exprSupp.numOfExprs = numOfCols;
34,289,320✔
135
  int32_t numOfOutputCols = 0;
34,298,950✔
136
  code =
137
      extractColMatchInfo(pSortNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo);
34,300,736✔
138
  if (code != TSDB_CODE_SUCCESS) {
34,342,959✔
139
    goto _error;
×
140
  }
141
  
142
  calcSortOperMaxTupleLength(pInfo, pSortNode->pSortKeys);
34,342,959✔
143
  pInfo->maxRows = -1;
34,337,579✔
144
  if (pSortNode->node.pLimit && ((SLimitNode*)pSortNode->node.pLimit)->limit) {
34,340,542✔
145
    SLimitNode* pLimit = (SLimitNode*)pSortNode->node.pLimit;
4,884,981✔
146
    if (pLimit->limit->datum.i > 0) {
4,889,664✔
147
      pInfo->maxRows = pLimit->limit->datum.i + (pLimit->offset ? pLimit->offset->datum.i : 0);
4,884,339✔
148
    }
149
  }
150

151
  pOperator->exprSupp.pCtx =
34,299,093✔
152
      createSqlFunctionCtx(pOperator->exprSupp.pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore);
34,294,279✔
153
  QUERY_CHECK_NULL(pOperator->exprSupp.pCtx, code, lino, _error, terrno);
34,305,229✔
154
  initResultSizeInfo(&pOperator->resultInfo, 1024);
34,275,840✔
155
  code = filterInitFromNode((SNode*)pSortNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
34,312,427✔
156
                            pTaskInfo->pStreamRuntimeInfo);
34,303,904✔
157
  if (code != TSDB_CODE_SUCCESS) {
34,278,577✔
UNCOV
158
    goto _error;
×
159
  }
160

161
  pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
34,278,577✔
162
  QUERY_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno);
34,369,688✔
163

164
  pInfo->pSortInfo = createSortInfo(pSortNode->pSortKeys);
34,327,603✔
165
  TSDB_CHECK_NULL(pInfo->pSortInfo, code, lino, _error, terrno);
34,351,694✔
166

167
  if (pSortNode->calcGroupId) {
34,292,019✔
168
    int32_t keyLen;
115,385✔
169
    SSortOpGroupIdCalc* pGroupIdCalc = pInfo->pGroupIdCalc = taosMemoryCalloc(1, sizeof(SSortOpGroupIdCalc));
115,385✔
170
    if (!pGroupIdCalc) {
114,894✔
UNCOV
171
      code = terrno;
×
UNCOV
172
      goto _error;
×
173
    }
174
    SNodeList* pSortColsNodeArr = makeColsNodeArrFromSortKeys(pSortNode->pSortKeys);
114,894✔
175
    if (!pSortColsNodeArr) code = terrno;
115,876✔
176
    if (TSDB_CODE_SUCCESS == code) {
115,876✔
177
      pGroupIdCalc->pSortColsArr = makeColumnArrayFromList(pSortColsNodeArr);
115,876✔
178
      if (!pGroupIdCalc->pSortColsArr) code = terrno;
115,876✔
179
      nodesClearList(pSortColsNodeArr);
115,385✔
180
    }
181
    if (TSDB_CODE_SUCCESS == code) {
114,403✔
182
      // PK ts col should always at last, see partColOptCreateSort
183
      if (pSortNode->excludePkCol) taosArrayPop(pGroupIdCalc->pSortColsArr);
115,385✔
184
      code = extractKeysLen(pGroupIdCalc->pSortColsArr, &keyLen);
115,385✔
185
      QUERY_CHECK_CODE(code, lino, _error);
114,403✔
186
    }
187
    if (TSDB_CODE_SUCCESS == code) {
113,421✔
188
      pGroupIdCalc->lastKeysLen = 0;
114,403✔
189
      pGroupIdCalc->keyBuf = taosMemoryCalloc(1, keyLen);
115,385✔
190
      if (!pGroupIdCalc->keyBuf) {
113,912✔
UNCOV
191
        code = terrno;
×
192
      }
193
    }
194
  }
195
  if (code != TSDB_CODE_SUCCESS) goto _error;
34,301,887✔
196

197
  pInfo->binfo.inputTsOrder = pSortNode->node.inputTsOrder;
34,301,887✔
198
  pInfo->binfo.outputTsOrder = pSortNode->node.outputTsOrder;
34,320,451✔
199
  initLimitInfo(pSortNode->node.pLimit, pSortNode->node.pSlimit, &pInfo->limitInfo);
34,281,423✔
200

201
  setOperatorInfo(pOperator, "SortOperator", QUERY_NODE_PHYSICAL_PLAN_SORT, true, OP_NOT_OPENED, pInfo, pTaskInfo);
34,329,015✔
202

203

204
  // lazy evaluation for the following parameter since the input datablock is not known till now.
205
  //  pInfo->bufPageSize  = rowSize < 1024 ? 1024 * 2 : rowSize * 2;
206
  //  there are headers, so pageSize = rowSize + header pInfo->sortBufSize  = pInfo->bufPageSize * 16;
207
  // TODO dynamic set the available sort buffer
208

209
  pOperator->fpSet =
210
      createOperatorFpSet(doOpenSortOperator, doSort, NULL, destroySortOperatorInfo, optrDefaultBufFn, getExplainExecInfo, optrDefaultGetNextExtFn, NULL);
34,347,544✔
211

212
  setOperatorResetStateFn(pOperator, resetSortOperState);
34,286,106✔
213
  code = appendDownstream(pOperator, &downstream, 1);
34,310,613✔
214
  if (code != TSDB_CODE_SUCCESS) {
34,282,627✔
UNCOV
215
    goto _error;
×
216
  }
217

218
  *pOptrInfo = pOperator;
34,282,627✔
219
  return TSDB_CODE_SUCCESS;
34,283,118✔
220

UNCOV
221
_error:
×
UNCOV
222
  if (pInfo != NULL) {
×
UNCOV
223
    destroySortOperatorInfo(pInfo);
×
224
  }
225
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
UNCOV
226
  pTaskInfo->code = code;
×
UNCOV
227
  return code;
×
228
}
229

230
int32_t appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) {
2,147,483,647✔
231
  int32_t code = 0;
2,147,483,647✔
232
  for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
2,147,483,647✔
233
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
2,147,483,647✔
234
    if (pColInfo == NULL) {
2,147,483,647✔
UNCOV
235
      return terrno;
×
236
    }
237

238
    bool isNull = tsortIsNullVal(pTupleHandle, i);
2,147,483,647✔
239
    if (isNull) {
2,147,483,647✔
240
      colDataSetNULL(pColInfo, pBlock->info.rows);
2,147,483,647✔
241
    } else {
242
      char* pData = NULL;
2,147,483,647✔
243
      tsortGetValue(pTupleHandle, i, (void**) &pData);
2,147,483,647✔
244

245
      if (pData != NULL) {
2,147,483,647✔
246
        code = colDataSetVal(pColInfo, pBlock->info.rows, pData, false);
2,147,483,647✔
247
        if (code) {
2,147,483,647✔
UNCOV
248
          return code;
×
249
        }
250
      }
251
    }
252
  }
253

254
  pBlock->info.dataLoad = 1;
2,147,483,647✔
255

256
  SDataBlockInfo info = {0};
2,147,483,647✔
257
  tsortGetBlockInfo(pTupleHandle, &info);
2,147,483,647✔
258

259
  pBlock->info.scanFlag = info.scanFlag;
2,147,483,647✔
260
  pBlock->info.rows += 1;
2,147,483,647✔
261
  return code;
2,147,483,647✔
262
}
263

264
/**
265
 * @brief get next tuple with group id attached, here assume that all tuples are sorted by group keys
266
 * @param [in, out] pBlock the output block, the group id will be saved in it
267
 * @retval NULL if next group tuple arrived and this new group tuple will be saved in pInfo.pSavedTuple
268
 */
269
static int32_t nextTupleWithGroupId(SSortHandle* pHandle, SSortOperatorInfo* pInfo, SSDataBlock* pBlock,
2,147,483,647✔
270
                                    STupleHandle** pTupleHandle) {
271
  QRY_PARAM_CHECK(pTupleHandle);
2,147,483,647✔
272

273
  int32_t       code = 0;
2,147,483,647✔
274
  STupleHandle* retTuple = pInfo->pGroupIdCalc->pSavedTuple;
2,147,483,647✔
275
  if (!retTuple) {
2,147,483,647✔
276
    code = tsortNextTuple(pHandle, &retTuple);
2,147,483,647✔
277
    if (code) {
2,147,483,647✔
UNCOV
278
      qError("failed to get next tuple, code:%s", tstrerror(code));
×
UNCOV
279
      return code;
×
280
    }
281
  }
282

283
  if (retTuple) {
2,147,483,647✔
284
    int32_t newGroup;
285
    if (pInfo->pGroupIdCalc->pSavedTuple) {
2,147,483,647✔
286
      newGroup = true;
1,632,084✔
287
      pInfo->pGroupIdCalc->pSavedTuple = NULL;
1,632,084✔
288
    } else {
289
      newGroup = tsortCompAndBuildKeys(pInfo->pGroupIdCalc->pSortColsArr, pInfo->pGroupIdCalc->keyBuf,
2,147,483,647✔
290
                                       &pInfo->pGroupIdCalc->lastKeysLen, retTuple);
2,147,483,647✔
291
    }
292

293
    bool emptyBlock = (pBlock->info.rows == 0);
2,147,483,647✔
294
    if (newGroup) {
2,147,483,647✔
295
      if (!emptyBlock) {
3,381,026✔
296
        // new group arrived, and we have already copied some tuples for cur group, save the new group tuple, return
297
        // NULL. Note that the keyBuf and lastKeysLen has been updated to new value
298
        pInfo->pGroupIdCalc->pSavedTuple = retTuple;
1,633,066✔
299
        retTuple = NULL;
1,633,066✔
300
      } else {
301
        // new group with empty block
302
        pInfo->pGroupIdCalc->lastGroupId = pBlock->info.id.groupId =
1,747,960✔
303
            calcGroupId(pInfo->pGroupIdCalc->keyBuf, pInfo->pGroupIdCalc->lastKeysLen);
1,747,960✔
304
      }
305
    } else {
306
      if (emptyBlock) {
2,147,483,647✔
307
        // new block but not new group, assign last group id to it
308
        pBlock->info.id.groupId = pInfo->pGroupIdCalc->lastGroupId;
994,766✔
309
      } else {
310
        // not new group and not empty block and ret NOT NULL, just return the tuple
311
      }
312
    }
313
  }
314

315
  *pTupleHandle = retTuple;
2,147,483,647✔
316
  return code;
2,147,483,647✔
317
}
318

319
static int32_t getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo,
83,080,803✔
320
                                  const SOperatorInfo* pOperator, SSortOperatorInfo* pInfo, SSDataBlock** pResBlock) {
321
  QRY_PARAM_CHECK(pResBlock);
83,080,803✔
322
  blockDataCleanup(pDataBlock);
83,082,803✔
323

324
  int32_t       lino = 0;
83,085,307✔
325
  int32_t       code = 0;
83,085,307✔
326
  STupleHandle* pTupleHandle = NULL;
83,085,307✔
327
  SSDataBlock*  p = NULL;
83,085,861✔
328

329
  code = tsortGetSortedDataBlock(pHandle, &p);
83,085,861✔
330
  if (p == NULL || (code != 0)) {
83,085,880✔
331
    return code;
6,179,185✔
332
  }
333

334
  code = blockDataEnsureCapacity(p, capacity);
76,906,695✔
335
  QUERY_CHECK_CODE(code, lino, _error);
76,905,069✔
336

337
  while (1) {
338
    if (pInfo->pGroupIdCalc) {
2,147,483,647✔
339
      code = nextTupleWithGroupId(pHandle, pInfo, p, &pTupleHandle);
2,147,483,647✔
340
    } else {
341
      code = tsortNextTuple(pHandle, &pTupleHandle);
2,147,483,647✔
342
    }
343

344
    TSDB_CHECK_CODE(code, lino, _error);
2,147,483,647✔
345
    if (pTupleHandle == NULL) {
2,147,483,647✔
346
      break;
56,335,676✔
347
    }
348

349
    code = appendOneRowToDataBlock(p, pTupleHandle);
2,147,483,647✔
350
    QUERY_CHECK_CODE(code, lino, _error);
2,147,483,647✔
351

352
    if (p->info.rows >= capacity) {
2,147,483,647✔
353
      break;
20,570,472✔
354
    }
355
  }
356

357
  QUERY_CHECK_CODE(code, lino, _error);
76,906,148✔
358

359
  if (p->info.rows > 0) {
76,906,148✔
360
    code = blockDataEnsureCapacity(pDataBlock, capacity);
50,251,391✔
361
    QUERY_CHECK_CODE(code, lino, _error);
50,250,863✔
362

363
    // todo extract function to handle this
364
    int32_t numOfCols = taosArrayGetSize(pColMatchInfo);
50,250,863✔
365
    for (int32_t i = 0; i < numOfCols; ++i) {
291,367,502✔
366
      SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, i);
241,115,191✔
367
      QUERY_CHECK_NULL(pmInfo, code, lino, _error, terrno);
241,116,892✔
368

369
      int32_t srcSlotId = pmInfo->srcSlotId;
241,116,892✔
370

371
      // Fix: when the planner's setListSlotId resolves Sort pTargets by name, it may
372
      // mistakenly bind the primary timestamp column to the scalar expression result slot
373
      // (e.g. slot for ts+1000) instead of the original ts slot, because pushdownDataBlockSlots
374
      // added the expression slot with the same column name.  This causes the downstream
375
      // Project operator to apply the expression again (ts+1000 becomes ts+2000).
376
      // Detect this case and fall back to the original timestamp slot.
377
      if (pmInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID &&
241,116,892✔
378
          pmInfo->dataType.type == TSDB_DATA_TYPE_TIMESTAMP &&
54,659,009✔
379
          sortIsExprResultSlot(pOperator, srcSlotId)) {
27,275,102✔
380
        int32_t origSlot = sortFindOrigTsSlot(pOperator, p);
4,628,247✔
381
        if (origSlot >= 0 && origSlot != srcSlotId) {
4,628,247✔
382
          srcSlotId = origSlot;
4,628,247✔
383
        }
384
      }
385

386
      SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, srcSlotId);
241,115,654✔
387
      QUERY_CHECK_NULL(pSrc, code, lino, _error, terrno);
241,114,606✔
388

389
      SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->dstSlotId);
241,114,606✔
390
      QUERY_CHECK_NULL(pDst, code, lino, _error, terrno);
241,111,397✔
391

392
      code = colDataAssign(pDst, pSrc, p->info.rows, &pDataBlock->info);
241,111,397✔
393
      QUERY_CHECK_CODE(code, lino, _error);
241,117,186✔
394
    }
395

396
    pDataBlock->info.dataLoad = 1;
50,252,311✔
397
    pDataBlock->info.rows = p->info.rows;
50,251,945✔
398
    pDataBlock->info.scanFlag = p->info.scanFlag;
50,248,666✔
399
    // propagate both C-group id and baseGId from upstream
400
    pDataBlock->info.id.groupId = p->info.id.groupId;
50,246,321✔
401
    pDataBlock->info.id.baseGId = p->info.id.baseGId;
50,244,192✔
402
  }
403

404
  blockDataDestroy(p);
76,898,392✔
405
  *pResBlock = (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
76,898,289✔
406
  return code;
76,897,337✔
407

UNCOV
408
  _error:
×
UNCOV
409
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
410

UNCOV
411
  blockDataDestroy(p);
×
UNCOV
412
  return code;
×
413
}
414

415
int32_t loadNextDataBlock(void* param, SSDataBlock** ppBlock) {
2,146,623,707✔
416
  SOperatorInfo* pOperator = (SOperatorInfo*)param;
2,146,623,707✔
417
  int32_t        code = pOperator->fpSet.getNextFn(pOperator, ppBlock);
2,146,623,707✔
418
  if (code) {
2,146,498,332✔
UNCOV
419
    qError("failed to get next data block from upstream, %s code:%s", __func__, tstrerror(code));
×
420
  } else {
421
    code = blockDataCheck(*ppBlock);
2,146,498,332✔
422
    if (code) {
2,146,517,262✔
UNCOV
423
      qError("failed to check block data, %s code:%s", __func__, tstrerror(code));
×
424
    }
425
  }
426
  return code;
2,146,514,704✔
427
}
428

429
// todo refactor: merged with fetch fp
430
void applyScalarFunction(SSDataBlock* pBlock, void* param) {
2,147,483,647✔
431
  SOperatorInfo*     pOperator = param;
2,147,483,647✔
432
  SSortOperatorInfo* pSort = pOperator->info;
2,147,483,647✔
433
  if (pOperator->exprSupp.pExprInfo != NULL && pOperator->exprSupp.numOfExprs > 0) {
2,147,483,647✔
434
    int32_t code = projectApplyFunctions(pOperator->exprSupp.pExprInfo, pBlock, pBlock, pOperator->exprSupp.pCtx,
5,875,168✔
435
                                         pOperator->exprSupp.numOfExprs, NULL,
436
                                         GET_STM_RTINFO(pOperator->pTaskInfo));
5,875,713✔
437
    if (code != TSDB_CODE_SUCCESS) {
5,876,803✔
UNCOV
438
      T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
439
    }
440
  }
441
}
2,147,483,647✔
442

443
int32_t doOpenSortOperator(SOperatorInfo* pOperator) {
83,146,810✔
444
  SSortOperatorInfo* pInfo = pOperator->info;
83,146,810✔
445
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;
83,147,796✔
446
  int32_t            code = TSDB_CODE_SUCCESS;
83,135,990✔
447
  int32_t            lino = 0;
83,135,990✔
448
  SSortSource* pSource =NULL;
83,135,990✔
449

450
  if (OPTR_IS_OPENED(pOperator)) {
83,135,990✔
451
    return code;
48,779,337✔
452
  }
453

454
  //  pInfo->binfo.pRes is not equalled to the input datablock.
455
  pInfo->pSortHandle = NULL;
34,349,773✔
456
  code =
457
      tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str, pInfo->maxRows,
68,712,046✔
458
                            pInfo->maxTupleLength, tsPQSortMemThreshold * 1024 * 1024, &pInfo->pSortHandle);
34,351,015✔
459
  QUERY_CHECK_CODE(code, lino, _end);
34,327,491✔
460

461
  tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, applyScalarFunction, pOperator);
34,327,491✔
462

463
  pSource = taosMemoryCalloc(1, sizeof(SSortSource));
34,306,651✔
464
  QUERY_CHECK_NULL(pSource, code, lino, _end, terrno);
34,292,153✔
465

466
  pSource->param = pOperator->pDownstream[0];
34,292,153✔
467
  pSource->onlyRef = true;
34,347,727✔
468

469
  code = tsortAddSource(pInfo->pSortHandle, pSource);
34,334,894✔
470
  QUERY_CHECK_CODE(code, lino, _end);
34,326,275✔
471
  pSource = NULL;
34,326,275✔
472

473
  code = tsortOpen(pInfo->pSortHandle);
34,326,275✔
474
  QUERY_CHECK_CODE(code, lino, _end);
34,260,400✔
475
  pOperator->status = OP_RES_TO_RETURN;
34,259,916✔
476
  OPTR_SET_OPENED(pOperator);
34,260,470✔
477

478
_end:
34,261,508✔
479
  if (pSource) {
34,261,508✔
UNCOV
480
    taosMemoryFree(pSource);
×
481
  }
482
  if (code != TSDB_CODE_SUCCESS) {
34,260,409✔
483
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
484✔
484
    pTaskInfo->code = code;
484✔
485
    T_LONG_JMP(pTaskInfo->env, code);
484✔
486
  }
487
  return code;
34,259,925✔
488
}
489

490
int32_t doSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
83,190,318✔
491
  QRY_PARAM_CHECK(pResBlock);
83,190,318✔
492
  int32_t code = TSDB_CODE_SUCCESS;
83,192,545✔
493
  int32_t lino = 0;
83,192,545✔
494
  if (pOperator->status == OP_EXEC_DONE) {
83,192,545✔
495
    return code;
34,938✔
496
  }
497

498
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;
83,140,702✔
499
  SSortOperatorInfo* pInfo = pOperator->info;
83,136,448✔
500

501
  code = pOperator->fpSet._openFn(pOperator);
83,145,998✔
502
  QUERY_CHECK_CODE(code, lino, _end);
83,038,954✔
503

504
  // multi-group case not handle here
505
  SSDataBlock* pBlock = NULL;
83,038,954✔
506
  while (1) {
39,140✔
507
    if (tsortIsClosed(pInfo->pSortHandle)) {
83,079,629✔
UNCOV
508
      code = TSDB_CODE_TSC_QUERY_CANCELLED;
×
UNCOV
509
      QUERY_CHECK_CODE(code, lino, _end);
×
510
    }
511

512
    recordOpExecBeforeDownstream(pOperator);
83,082,033✔
513
    code = getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity,
83,081,633✔
514
                                pInfo->matchInfo.pList, pOperator, pInfo, &pBlock);
515
    recordOpExecAfterDownstream(pOperator, pBlock ? pBlock->info.rows : 0);
83,078,897✔
516
    QUERY_CHECK_CODE(code, lino, _end);
83,080,535✔
517
    if (pBlock == NULL) {
83,080,535✔
518
      setOperatorCompleted(pOperator);
32,833,257✔
519
      return code;
32,832,208✔
520
    }
521

522
    code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, &pInfo->matchInfo, NULL);
50,247,278✔
523
    QUERY_CHECK_CODE(code, lino, _end);
50,247,311✔
524

525
    if (blockDataGetNumOfRows(pBlock) == 0) {
50,247,311✔
UNCOV
526
      continue;
×
527
    }
528

529
    // there are bugs?
530
    bool limitReached = applyLimitOffset(&pInfo->limitInfo, pBlock, pTaskInfo);
50,242,656✔
531
    if (limitReached) {
50,247,211✔
532
      resetLimitInfoForNextGroup(&pInfo->limitInfo);
3,028,686✔
533
    }
534

535
    if (pBlock->info.rows > 0) {
50,246,819✔
536
      break;
50,208,069✔
537
    }
538
  }
539

540
  *pResBlock = blockDataGetNumOfRows(pBlock) > 0 ? pBlock : NULL;
50,208,069✔
541
_end:
50,206,752✔
542
  if (code != TSDB_CODE_SUCCESS) {
50,201,420✔
UNCOV
543
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
544
    pTaskInfo->code = code;
×
UNCOV
545
    T_LONG_JMP(pTaskInfo->env, code);
×
546
  }
547
  return code;
50,203,543✔
548
}
549

550
void destroySortOperatorInfo(void* param) {
34,352,844✔
551
  SSortOperatorInfo* pInfo = (SSortOperatorInfo*)param;
34,352,844✔
552
  blockDataDestroy(pInfo->binfo.pRes);
34,352,844✔
553
  pInfo->binfo.pRes = NULL;
34,356,622✔
554

555
  tsortDestroySortHandle(pInfo->pSortHandle);
34,358,908✔
556
  taosArrayDestroy(pInfo->pSortInfo);
34,372,716✔
557
  taosArrayDestroy(pInfo->matchInfo.pList);
34,371,617✔
558
  destroySortOpGroupIdCalc(pInfo->pGroupIdCalc);
34,372,716✔
559
  taosMemoryFreeClear(param);
34,366,676✔
560
}
34,371,108✔
561

562
int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
98,059✔
563
  SSortExecInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortExecInfo));
98,059✔
564
  if (pInfo == NULL) {
98,059✔
UNCOV
565
    return terrno;
×
566
  }
567

568
  SSortOperatorInfo* pOperatorInfo = (SSortOperatorInfo*)pOptr->info;
98,059✔
569

570
  *pInfo = tsortGetSortExecInfo(pOperatorInfo->pSortHandle);
98,059✔
571
  *pOptrExplain = pInfo;
97,527✔
572
  *len = sizeof(SSortExecInfo);
97,527✔
573
  return TSDB_CODE_SUCCESS;
97,527✔
574
}
575

576
static void calcSortOperMaxTupleLength(SSortOperatorInfo* pSortOperInfo, SNodeList* pSortKeys) {
34,344,453✔
577
  SColMatchInfo* pColItem = &pSortOperInfo->matchInfo;
34,344,453✔
578
  size_t         size = taosArrayGetSize(pColItem->pList);
34,364,862✔
579
  for (size_t i = 0; i < size; ++i) {
143,938,233✔
580
    SColMatchItem* pInfo = taosArrayGet(pColItem->pList, i);
109,629,736✔
581
    if (pInfo == NULL) {
109,638,366✔
UNCOV
582
      continue;
×
583
    }
584

585
    pSortOperInfo->maxTupleLength += pInfo->dataType.bytes;
109,638,366✔
586
  }
587

588
  size = LIST_LENGTH(pSortKeys);
34,308,497✔
589
  for (size_t i = 0; i < size; ++i) {
84,373,028✔
590
    SOrderByExprNode* pOrderExprNode = (SOrderByExprNode*)nodesListGetNode(pSortKeys, i);
50,033,174✔
591
    pSortOperInfo->maxTupleLength += ((SColumnNode*)pOrderExprNode->pExpr)->node.resType.bytes;
50,031,397✔
592
  }
593
}
34,339,854✔
594

595
static void destroySortOpGroupIdCalc(SSortOpGroupIdCalc* pCalc) {
34,372,770✔
596
  if (pCalc) {
34,372,770✔
597
    taosArrayDestroy(pCalc->pSortColsArr);
115,876✔
598
    taosMemoryFree(pCalc->keyBuf);
115,876✔
599
    taosMemoryFree(pCalc);
115,876✔
600
  }
601
}
34,372,770✔
602

603
//=====================================================================================
604
// Group Sort Operator
605
typedef enum EChildOperatorStatus { CHILD_OP_NEW_GROUP, CHILD_OP_SAME_GROUP, CHILD_OP_FINISHED } EChildOperatorStatus;
606

607
typedef struct SGroupSortOperatorInfo {
608
  SOptrBasicInfo       binfo;
609
  SArray*              pSortInfo;
610
  SColMatchInfo        matchInfo;
611
  int64_t              startTs;
612
  uint64_t             sortElapsed;
613
  bool                 hasGroupId;
614
  uint64_t             currGroupId;
615
  uint64_t             currBaseGId;
616
  SSDataBlock*         prefetchedSortInput;
617
  SSortHandle*         pCurrSortHandle;
618
  EChildOperatorStatus childOpStatus;
619
  SSortExecInfo        sortExecInfo;
620
} SGroupSortOperatorInfo;
621

622
int32_t getGroupSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo,
2,213,554✔
623
                                SGroupSortOperatorInfo* pInfo, SSDataBlock** pResBlock) {
624
  QRY_PARAM_CHECK(pResBlock);
2,213,554✔
625

626
  blockDataCleanup(pDataBlock);
2,213,554✔
627
  int32_t code = blockDataEnsureCapacity(pDataBlock, capacity);
2,213,554✔
628
  if (code) {
2,213,554✔
UNCOV
629
    return code;
×
630
  }
631

632
  SSDataBlock* p = NULL;
2,213,554✔
633
  code = tsortGetSortedDataBlock(pHandle, &p);
2,213,554✔
634
  if (p == NULL || (code != 0)) {
2,213,554✔
UNCOV
635
    return code;
×
636
  }
637

638
  code = blockDataEnsureCapacity(p, capacity);
2,213,554✔
639
  if (code) {
2,213,554✔
UNCOV
640
    return code;
×
641
  }
642

643
  while (1) {
2,033,538,700✔
644
    STupleHandle* pTupleHandle = NULL;
2,035,752,254✔
645
    code = tsortNextTuple(pHandle, &pTupleHandle);
2,035,751,808✔
646
    if (pTupleHandle == NULL || code != 0) {
2,035,752,696✔
647
      break;
648
    }
649

650
    code = appendOneRowToDataBlock(p, pTupleHandle);
2,035,477,560✔
651
    if (code) {
2,035,477,118✔
UNCOV
652
      break;
×
653
    }
654

655
    if (p->info.rows >= capacity) {
2,035,477,118✔
656
      break;
1,938,418✔
657
    }
658
  }
659

660
  if (p->info.rows > 0) {
2,213,554✔
661
    int32_t numOfCols = taosArrayGetSize(pColMatchInfo);
2,075,986✔
662
    for (int32_t i = 0; i < numOfCols; ++i) {
6,227,958✔
663
      SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, i);
4,151,972✔
664
      if (pmInfo == NULL) {
4,151,972✔
UNCOV
665
        return terrno;
×
666
      }
667

668
      SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId);
4,151,972✔
669
      if (pSrc == NULL) {
4,151,972✔
UNCOV
670
        return terrno;
×
671
      }
672

673
      SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->dstSlotId);
4,151,972✔
674
      if (pDst == NULL) {
4,151,972✔
UNCOV
675
        return terrno;
×
676
      }
677

678
      code = colDataAssign(pDst, pSrc, p->info.rows, &pDataBlock->info);
4,151,972✔
679
      if (code) {
4,151,972✔
UNCOV
680
        return code;
×
681
      }
682
    }
683

684
    pDataBlock->info.rows = p->info.rows;
2,075,986✔
685
    pDataBlock->info.capacity = p->info.rows;
2,075,986✔
686
    pDataBlock->info.scanFlag = p->info.scanFlag;
2,075,986✔
687
    // propagate ids for the current group
688
    pDataBlock->info.id.groupId = pInfo->currGroupId;
2,075,544✔
689
    pDataBlock->info.id.baseGId = pInfo->currBaseGId;
2,075,986✔
690
  }
691

692
  blockDataDestroy(p);
2,213,554✔
693
  *pResBlock = (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
2,213,554✔
694
  return code;
2,213,554✔
695
}
696

697
typedef struct SGroupSortSourceParam {
698
  SOperatorInfo*          childOpInfo;
699
  SGroupSortOperatorInfo* grpSortOpInfo;
700
} SGroupSortSourceParam;
701

702
int32_t fetchNextGroupSortDataBlock(void* param, SSDataBlock** ppBlock) {
111,536,362✔
703
  int32_t                 code = 0;
111,536,362✔
704
  int32_t                 lino = 0;
111,536,362✔
705
  SGroupSortSourceParam*  source = param;
111,536,362✔
706
  SGroupSortOperatorInfo* grpSortOpInfo = source->grpSortOpInfo;
111,536,362✔
707
  SSDataBlock*            block = NULL;
111,536,362✔
708

709
  QRY_PARAM_CHECK(ppBlock);
111,536,362✔
710

711
  if (grpSortOpInfo->prefetchedSortInput) {
111,536,362✔
712
    block = grpSortOpInfo->prefetchedSortInput;
137,568✔
713
    grpSortOpInfo->prefetchedSortInput = NULL;
137,568✔
714
    *ppBlock = block;
137,568✔
715
  } else {
716
    SOperatorInfo* childOp = source->childOpInfo;
111,398,794✔
717
    code = childOp->fpSet.getNextFn(childOp, &block);
111,398,794✔
718
    QUERY_CHECK_CODE(code, lino, _end);
111,398,794✔
719

720
    if (block != NULL) {
111,398,794✔
721
      code = blockDataCheck(block);
111,314,594✔
722
      QUERY_CHECK_CODE(code, lino, _end);
111,314,594✔
723
      if (block->info.id.groupId == grpSortOpInfo->currGroupId) {
111,314,594✔
724
        grpSortOpInfo->childOpStatus = CHILD_OP_SAME_GROUP;
111,261,226✔
725
        *ppBlock = block;
111,261,226✔
726
      } else {
727
        grpSortOpInfo->childOpStatus = CHILD_OP_NEW_GROUP;
53,368✔
728
        grpSortOpInfo->prefetchedSortInput = block;
53,368✔
729
      }
730
    } else {
731
      grpSortOpInfo->childOpStatus = CHILD_OP_FINISHED;
84,200✔
732
    }
733
  }
734

735
  return code;
111,536,362✔
UNCOV
736
_end:
×
737
  if (code != 0) {
×
UNCOV
738
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
739
  }
UNCOV
740
  return code;
×
741
}
742

743
int32_t beginSortGroup(SOperatorInfo* pOperator) {
137,568✔
744
  SGroupSortOperatorInfo* pInfo = pOperator->info;
137,568✔
745
  SExecTaskInfo*          pTaskInfo = pOperator->pTaskInfo;
137,568✔
746

747
  //  pInfo->binfo.pRes is not equalled to the input datablock.
748
  pInfo->pCurrSortHandle = NULL;
137,568✔
749

750
  int32_t code = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str, 0, 0,
137,568✔
751
                                       0, &pInfo->pCurrSortHandle);
752
  if (code) {
137,568✔
UNCOV
753
    return code;
×
754
  }
755

756
  tsortSetFetchRawDataFp(pInfo->pCurrSortHandle, fetchNextGroupSortDataBlock, applyScalarFunction, pOperator);
137,568✔
757

758
  SSortSource*           ps = taosMemoryCalloc(1, sizeof(SSortSource));
137,568✔
759
  SGroupSortSourceParam* param = taosMemoryCalloc(1, sizeof(SGroupSortSourceParam));
137,568✔
760
  if (ps == NULL || param == NULL) {
137,568✔
UNCOV
761
    taosMemoryFree(ps);
×
UNCOV
762
    taosMemoryFree(param);
×
UNCOV
763
    return terrno;
×
764
  }
765

766
  param->childOpInfo = pOperator->pDownstream[0];
137,568✔
767
  param->grpSortOpInfo = pInfo;
137,568✔
768

769
  ps->param = param;
137,568✔
770
  ps->onlyRef = false;
137,568✔
771
  code = tsortAddSource(pInfo->pCurrSortHandle, ps);
137,568✔
772
  if (code != 0) {
137,568✔
UNCOV
773
    return code;
×
774
  }
775

776
  code = tsortOpen(pInfo->pCurrSortHandle);
137,568✔
777
  return code;
137,568✔
778
}
779

780
int32_t finishSortGroup(SOperatorInfo* pOperator) {
137,568✔
781
  SGroupSortOperatorInfo* pInfo = pOperator->info;
137,568✔
782

783
  SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pCurrSortHandle);
137,568✔
784

785
  pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod;
137,568✔
786
  pInfo->sortExecInfo.sortBuffer = sortExecInfo.sortBuffer;
137,568✔
787
  pInfo->sortExecInfo.loops += sortExecInfo.loops;
137,568✔
788
  pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes;
137,568✔
789
  pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes;
137,568✔
790

791
  tsortDestroySortHandle(pInfo->pCurrSortHandle);
137,568✔
792
  pInfo->pCurrSortHandle = NULL;
137,568✔
793

794
  return TSDB_CODE_SUCCESS;
137,568✔
795
}
796

797
int32_t doGroupSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
2,180,552✔
798
  QRY_PARAM_CHECK(pResBlock);
2,180,552✔
799
  SExecTaskInfo*          pTaskInfo = pOperator->pTaskInfo;
2,180,944✔
800
  SGroupSortOperatorInfo* pInfo = pOperator->info;
2,180,552✔
801
  int32_t                 code = TSDB_CODE_SUCCESS;
2,180,552✔
802
  int32_t                 lino = 0;
2,180,552✔
803

804
  if (pOperator->status == OP_EXEC_DONE) {
2,180,552✔
UNCOV
805
    return code;
×
806
  }
807

808
  code = pOperator->fpSet._openFn(pOperator);
2,180,998✔
809
  QUERY_CHECK_CODE(code, lino, _end);
2,180,998✔
810

811
  if (!pInfo->hasGroupId) {
2,180,998✔
812
    pInfo->hasGroupId = true;
105,012✔
813

814
    pInfo->prefetchedSortInput = getNextBlockFromDownstream(pOperator, 0);
105,012✔
815
    if (pInfo->prefetchedSortInput == NULL) {
105,404✔
816
      setOperatorCompleted(pOperator);
21,204✔
817
      return code;
21,204✔
818
    }
819

820
    pInfo->currGroupId = pInfo->prefetchedSortInput->info.id.groupId;
84,200✔
821
    pInfo->currBaseGId = pInfo->prefetchedSortInput->info.id.baseGId;
84,200✔
822
    pInfo->childOpStatus = CHILD_OP_NEW_GROUP;
84,200✔
823
    code = beginSortGroup(pOperator);
84,200✔
824
    QUERY_CHECK_CODE(code, lino, _end);
84,200✔
825
  }
826

827
  SSDataBlock* pBlock = NULL;
2,160,186✔
828
  while (pInfo->pCurrSortHandle != NULL) {
2,213,554✔
829
    if (tsortIsClosed(pInfo->pCurrSortHandle)) {
2,213,554✔
UNCOV
830
      code = TSDB_CODE_TSC_QUERY_CANCELLED;
×
831
      QUERY_CHECK_CODE(code, lino, _end);
×
832
    }
833

834
    // beginSortGroup would fetch all child blocks of pInfo->currGroupId;
835
    if (pInfo->childOpStatus == CHILD_OP_SAME_GROUP) {
2,213,554✔
UNCOV
836
      code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
UNCOV
837
      QUERY_CHECK_CODE(code, lino, _end);
×
838
    }
839

840
    recordOpExecBeforeDownstream(pOperator);
2,213,554✔
841
    code = getGroupSortedBlockData(pInfo->pCurrSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity,
2,213,554✔
842
                                     pInfo->matchInfo.pList, pInfo, &pBlock);
843
    recordOpExecAfterDownstream(pOperator, pBlock ? pBlock->info.rows : 0);
2,213,554✔
844
    QUERY_CHECK_CODE(code, lino, _end);
2,213,112✔
845
    if (pBlock != NULL) {
2,213,112✔
846
      // keep both ids aligned with current group
847
      pBlock->info.id.groupId = pInfo->currGroupId;
2,075,986✔
848
      pBlock->info.id.baseGId = pInfo->currBaseGId;
2,075,986✔
849
      // baseGId follows upstream; if upstream is empty here, preserve current
850
      // (no-op if not set). We cannot reconstruct baseGId here; rely on upstream propagation.
851
      pOperator->resultInfo.totalRows += pBlock->info.rows;
2,075,986✔
852
      *pResBlock = pBlock;
2,075,102✔
853
      return code;
2,075,986✔
854
    } else {
855
      if (pInfo->childOpStatus == CHILD_OP_NEW_GROUP) {
137,126✔
856
        (void) finishSortGroup(pOperator);
53,368✔
857
        pInfo->currGroupId = pInfo->prefetchedSortInput->info.id.groupId;
53,368✔
858
        pInfo->currBaseGId = pInfo->prefetchedSortInput->info.id.baseGId;
53,368✔
859
        code = beginSortGroup(pOperator);
53,368✔
860
        QUERY_CHECK_CODE(code, lino, _end);
53,368✔
861
      } else if (pInfo->childOpStatus == CHILD_OP_FINISHED) {
84,200✔
862
        (void) finishSortGroup(pOperator);
84,200✔
863
        setOperatorCompleted(pOperator);
84,200✔
864
        return code;
84,200✔
865
      }
866
    }
867
  }
868

869
_end:
×
UNCOV
870
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
871
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
872
    pTaskInfo->code = code;
×
UNCOV
873
    T_LONG_JMP(pTaskInfo->env, code);
×
874
  }
UNCOV
875
  return code;
×
876
}
877

UNCOV
878
int32_t getGroupSortExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
×
UNCOV
879
  SGroupSortOperatorInfo* pInfo = (SGroupSortOperatorInfo*)pOptr->info;
×
UNCOV
880
  *pOptrExplain = &pInfo->sortExecInfo;
×
UNCOV
881
  *len = sizeof(SSortExecInfo);
×
UNCOV
882
  return TSDB_CODE_SUCCESS;
×
883
}
884

885
void destroyGroupSortOperatorInfo(void* param) {
105,404✔
886
  SGroupSortOperatorInfo* pInfo = (SGroupSortOperatorInfo*)param;
105,404✔
887
  blockDataDestroy(pInfo->binfo.pRes);
105,404✔
888
  pInfo->binfo.pRes = NULL;
105,404✔
889

890
  taosArrayDestroy(pInfo->pSortInfo);
105,404✔
891
  taosArrayDestroy(pInfo->matchInfo.pList);
105,404✔
892

893
  tsortDestroySortHandle(pInfo->pCurrSortHandle);
105,404✔
894
  pInfo->pCurrSortHandle = NULL;
104,962✔
895

896
  taosMemoryFreeClear(param);
104,962✔
897
}
105,404✔
898

UNCOV
899
static int32_t resetGroupSortOperState(SOperatorInfo* pOper) {
×
900
  SGroupSortOperatorInfo* pInfo = pOper->info;
×
901
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
×
UNCOV
902
  pOper->status = OP_NOT_OPENED;
×
903

UNCOV
904
  pInfo->currGroupId = 0;
×
UNCOV
905
  pInfo->hasGroupId = false;
×
UNCOV
906
  pInfo->prefetchedSortInput = NULL;
×
UNCOV
907
  pInfo->childOpStatus = CHILD_OP_NEW_GROUP;
×
UNCOV
908
  pInfo->sortExecInfo = (SSortExecInfo){0};
×
909
  
UNCOV
910
  resetBasicOperatorState(&pInfo->binfo);
×
UNCOV
911
  destroySqlFunctionCtx(pOper->exprSupp.pCtx, pOper->exprSupp.pExprInfo, pOper->exprSupp.numOfExprs);
×
UNCOV
912
  taosMemoryFreeClear(pOper->exprSupp.rowEntryInfoOffset);
×
UNCOV
913
  pOper->exprSupp.pCtx =
×
UNCOV
914
      createSqlFunctionCtx(pOper->exprSupp.pExprInfo, pOper->exprSupp.numOfExprs, &pOper->exprSupp.rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore);
×
915

UNCOV
916
  tsortDestroySortHandle(pInfo->pCurrSortHandle);
×
UNCOV
917
  pInfo->pCurrSortHandle = NULL;
×
918

UNCOV
919
  return 0;
×
920
}
921

922
int32_t createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode,
104,566✔
923
                                    SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
924
  QRY_PARAM_CHECK(pOptrInfo);
104,566✔
925
  int32_t code = 0;
105,404✔
926
  int32_t lino = 0;
105,404✔
927

928
  SGroupSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupSortOperatorInfo));
105,404✔
929
  SOperatorInfo*          pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
103,152✔
930
  if (pInfo == NULL || pOperator == NULL) {
104,128✔
UNCOV
931
    code = terrno;
×
UNCOV
932
    goto _error;
×
933
  }
934
  initOperatorCostInfo(pOperator);
104,128✔
935

936
  SExprSupp*          pSup = &pOperator->exprSupp;
103,990✔
937
  SDataBlockDescNode* pDescNode = pSortPhyNode->node.pOutputDataBlockDesc;
103,990✔
938

939
  int32_t    numOfCols = 0;
104,040✔
940
  SExprInfo* pExprInfo = NULL;
104,432✔
941
  code = createExprInfo(pSortPhyNode->pExprs, NULL, &pExprInfo, &numOfCols);
104,874✔
942
  QUERY_CHECK_CODE(code, lino, _error);
103,240✔
943

944
  pSup->pExprInfo = pExprInfo;
103,240✔
945
  pSup->numOfExprs = numOfCols;
103,102✔
946

947
  initResultSizeInfo(&pOperator->resultInfo, 1024);
103,686✔
948
  pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset,
102,656✔
949
                                                  &pTaskInfo->storageAPI.functionStore);
950
  QUERY_CHECK_NULL(pOperator->exprSupp.pCtx, code, lino, _error, terrno);
103,152✔
951

952
  pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
102,714✔
953
  QUERY_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno);
105,404✔
954

955
  code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
104,570✔
956
  TSDB_CHECK_CODE(code, lino, _error);
105,404✔
957

958
  pInfo->binfo.inputTsOrder = pSortPhyNode->node.inputTsOrder;
105,404✔
959
  pInfo->binfo.outputTsOrder = pSortPhyNode->node.outputTsOrder;
105,404✔
960

961
  int32_t numOfOutputCols = 0;
105,404✔
962
  code = extractColMatchInfo(pSortPhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID,
105,404✔
963
                             &pInfo->matchInfo);
964
  TSDB_CHECK_CODE(code, lino, _error);
104,962✔
965

966
  pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys);
104,962✔
967
  setOperatorInfo(pOperator, "GroupSortOperator", QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT, false, OP_NOT_OPENED, pInfo,
104,432✔
968
                  pTaskInfo);
969
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doGroupSort, NULL, destroyGroupSortOperatorInfo,
104,432✔
970
                                         optrDefaultBufFn, getGroupSortExplainExecInfo, optrDefaultGetNextExtFn, NULL);
971

972
  setOperatorResetStateFn(pOperator, resetGroupSortOperState);
105,404✔
973
                                         
974
  code = appendDownstream(pOperator, &downstream, 1);
103,990✔
975
  if (code != TSDB_CODE_SUCCESS) {
104,520✔
976
    goto _error;
×
977
  }
978

979
  *pOptrInfo = pOperator;
104,520✔
980
  return TSDB_CODE_SUCCESS;
104,520✔
981

UNCOV
982
_error:
×
983
  pTaskInfo->code = code;
×
UNCOV
984
  if (pInfo != NULL) {
×
UNCOV
985
    destroyGroupSortOperatorInfo(pInfo);
×
986
  }
UNCOV
987
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
UNCOV
988
  return code;
×
989
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc