• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5083

17 May 2026 01:15AM UTC coverage: 73.377% (+0.05%) from 73.328%
#5083

push

travis-ci

web-flow
feat (TDgpt): Dynamic Model Synchronization Enhancements (#35344)

* refactor: do some internal refactor.

* fix: fix multiprocess sync issue.

* feat: add dynamic anomaly detection and forecasting services

* fix: log error message for undeploying model in exception handling

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* fix: handle undeploy when model exists only on disk

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/286aafa0-c3ce-4c27-b803-2707571e9dc1

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: guard dynamic registry concurrent access

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: tighten service list locking scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: restore prophet support and update tests per review feedback

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: improve test name and move copy inside lock scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* Potential fix for pull request finding

Co-au... (continued)

281619 of 383795 relevant lines covered (73.38%)

134936217.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.22
/source/libs/executor/src/sortoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "operator.h"
19
#include "querytask.h"
20
#include "tdatablock.h"
21

22
typedef struct SSortOpGroupIdCalc {
23
  STupleHandle* pSavedTuple;
24
  SArray*       pSortColsArr;
25
  char*         keyBuf;
26
  int32_t       lastKeysLen; // default to be 0
27
  uint64_t      lastGroupId;
28
  bool          excludePKCol;
29
} SSortOpGroupIdCalc;
30

31
typedef struct SSortOperatorInfo {
32
  SOptrBasicInfo      binfo;
33
  uint32_t            sortBufSize;  // max buffer size for in-memory sort
34
  SArray*             pSortInfo;
35
  SSortHandle*        pSortHandle;
36
  SColMatchInfo       matchInfo;
37
  int32_t             bufPageSize;
38
  int64_t             startTs;      // sort start time
39
  uint64_t            sortElapsed;  // sort elapsed time, time to flush to disk not included.
40
  SLimitInfo          limitInfo;
41
  uint64_t            maxTupleLength;
42
  int64_t             maxRows;
43
  SSortOpGroupIdCalc* pGroupIdCalc;
44
} SSortOperatorInfo;
45

46
static int32_t doSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
47
static int32_t doOpenSortOperator(SOperatorInfo* pOperator);
48
static int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len);
49
static int32_t doGroupSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
50

51
static void destroySortOperatorInfo(void* param);
52
static void calcSortOperMaxTupleLength(SSortOperatorInfo* pSortOperInfo, SNodeList* pSortKeys);
53

54
// Check whether the given slotId is the output of a Sort scalar pre-calculation expression.
55
// The planner's rewritePrecalcExprs pushes expression results (e.g. ts+1000) into extra slots
56
// in the child's output descriptor. This helper identifies such slots.
57
static bool sortIsExprResultSlot(const SOperatorInfo* pOperator, int32_t slotId) {
36,274,300✔
58
  if (pOperator == NULL || pOperator->exprSupp.pExprInfo == NULL || slotId < 0) {
36,274,300✔
59
    return false;
26,601,213✔
60
  }
61
  for (int32_t idx = 0; idx < pOperator->exprSupp.numOfExprs; ++idx) {
14,926,965✔
62
    if (pOperator->exprSupp.pExprInfo[idx].base.resSchema.slotId == slotId) {
9,694,905✔
63
      return true;
4,441,027✔
64
    }
65
  }
66
  return false;
5,232,060✔
67
}
68

69
// Find the slot index of the original (non-expression) primary timestamp column in the
70
// Sort's internal data block.  The internal block may contain both the original ts column
71
// and an expression-derived ts column (e.g. ts+1000).  We return the first TIMESTAMP
72
// column whose slot is NOT an expression result.
73
static int32_t sortFindOrigTsSlot(const SOperatorInfo* pOperator, const SSDataBlock* pBlock) {
4,441,027✔
74
  if (pBlock == NULL || pBlock->pDataBlock == NULL) {
4,441,027✔
75
    return -1;
×
76
  }
77
  int32_t colCount = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
4,441,027✔
78
  for (int32_t idx = 0; idx < colCount; ++idx) {
4,441,027✔
79
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, idx);
4,441,027✔
80
    if (pCol != NULL && pCol->info.type == TSDB_DATA_TYPE_TIMESTAMP &&
4,441,027✔
81
        !sortIsExprResultSlot(pOperator, idx)) {
4,441,027✔
82
      return idx;
4,441,027✔
83
    }
84
  }
85
  return -1;
×
86
}
87

88
static void destroySortOpGroupIdCalc(SSortOpGroupIdCalc* pCalc);
89

90
static int32_t resetSortOperState(SOperatorInfo* pOper) {
390,370✔
91
  SSortOperatorInfo* pInfo = pOper->info;
390,370✔
92
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
390,370✔
93
  pOper->status = OP_NOT_OPENED;
390,370✔
94

95
  resetBasicOperatorState(&pInfo->binfo);
390,370✔
96
  destroySqlFunctionCtx(pOper->exprSupp.pCtx, pOper->exprSupp.pExprInfo, pOper->exprSupp.numOfExprs);
390,370✔
97
  taosMemoryFreeClear(pOper->exprSupp.rowEntryInfoOffset);
390,370✔
98
  pOper->exprSupp.pCtx =
390,370✔
99
      createSqlFunctionCtx(pOper->exprSupp.pExprInfo, pOper->exprSupp.numOfExprs, &pOper->exprSupp.rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore);
390,370✔
100

101
  tsortDestroySortHandle(pInfo->pSortHandle);
390,370✔
102
  pInfo->pSortHandle = NULL;
390,370✔
103

104
  if (pInfo->pGroupIdCalc) {
390,370✔
105
    pInfo->pGroupIdCalc->lastGroupId = 0;
×
106
    pInfo->pGroupIdCalc->lastKeysLen = 0;
×
107
  }
108

109
  return 0;
390,370✔
110
}
111

112
// todo add limit/offset impl
113
int32_t createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
36,700,226✔
114
  QRY_PARAM_CHECK(pOptrInfo);
36,700,226✔
115

116
  int32_t code = 0;
36,700,212✔
117
  int32_t lino = 0;
36,700,212✔
118

119
  SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo));
36,700,212✔
120
  SOperatorInfo*     pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
36,689,948✔
121
  if (pInfo == NULL || pOperator == NULL) {
36,690,739✔
122
    code = terrno;
11✔
123
    goto _error;
×
124
  }
125
  initOperatorCostInfo(pOperator);
36,691,009✔
126

127
  pOperator->pTaskInfo = pTaskInfo;
36,705,050✔
128
  SDataBlockDescNode* pDescNode = pSortNode->node.pOutputDataBlockDesc;
36,704,454✔
129

130
  int32_t numOfCols = 0;
36,703,471✔
131
  code = createExprInfo(pSortNode->pExprs, NULL, &pOperator->exprSupp.pExprInfo, &numOfCols);
36,703,051✔
132
  QUERY_CHECK_CODE(code, lino, _error);
36,687,169✔
133

134
  pOperator->exprSupp.numOfExprs = numOfCols;
36,687,169✔
135
  int32_t numOfOutputCols = 0;
36,682,792✔
136
  code =
137
      extractColMatchInfo(pSortNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo);
36,698,740✔
138
  if (code != TSDB_CODE_SUCCESS) {
36,702,718✔
139
    goto _error;
×
140
  }
141
  
142
  calcSortOperMaxTupleLength(pInfo, pSortNode->pSortKeys);
36,702,718✔
143
  pInfo->maxRows = -1;
36,696,684✔
144
  if (pSortNode->node.pLimit && ((SLimitNode*)pSortNode->node.pLimit)->limit) {
36,696,673✔
145
    SLimitNode* pLimit = (SLimitNode*)pSortNode->node.pLimit;
5,368,244✔
146
    if (pLimit->limit->datum.i > 0) {
5,365,714✔
147
      pInfo->maxRows = pLimit->limit->datum.i + (pLimit->offset ? pLimit->offset->datum.i : 0);
5,362,388✔
148
    }
149
  }
150

151
  pOperator->exprSupp.pCtx =
36,690,413✔
152
      createSqlFunctionCtx(pOperator->exprSupp.pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore);
36,687,503✔
153
  QUERY_CHECK_NULL(pOperator->exprSupp.pCtx, code, lino, _error, terrno);
36,693,545✔
154
  initResultSizeInfo(&pOperator->resultInfo, 1024);
36,687,710✔
155
  code = filterInitFromNode((SNode*)pSortNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
36,696,233✔
156
                            pTaskInfo->pStreamRuntimeInfo);
36,696,718✔
157
  if (code != TSDB_CODE_SUCCESS) {
36,686,790✔
158
    goto _error;
×
159
  }
160

161
  pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
36,686,790✔
162
  QUERY_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno);
36,705,649✔
163

164
  pInfo->pSortInfo = createSortInfo(pSortNode->pSortKeys);
36,701,691✔
165
  TSDB_CHECK_NULL(pInfo->pSortInfo, code, lino, _error, terrno);
36,701,137✔
166

167
  if (pSortNode->calcGroupId) {
36,690,684✔
168
    int32_t keyLen;
125,080✔
169
    SSortOpGroupIdCalc* pGroupIdCalc = pInfo->pGroupIdCalc = taosMemoryCalloc(1, sizeof(SSortOpGroupIdCalc));
125,080✔
170
    if (!pGroupIdCalc) {
125,080✔
171
      code = terrno;
×
172
      goto _error;
×
173
    }
174
    SNodeList* pSortColsNodeArr = makeColsNodeArrFromSortKeys(pSortNode->pSortKeys);
125,080✔
175
    if (!pSortColsNodeArr) code = terrno;
125,080✔
176
    if (TSDB_CODE_SUCCESS == code) {
125,080✔
177
      pGroupIdCalc->pSortColsArr = makeColumnArrayFromList(pSortColsNodeArr);
125,080✔
178
      if (!pGroupIdCalc->pSortColsArr) code = terrno;
124,020✔
179
      nodesClearList(pSortColsNodeArr);
124,020✔
180
    }
181
    if (TSDB_CODE_SUCCESS == code) {
124,020✔
182
      // PK ts col should always at last, see partColOptCreateSort
183
      if (pSortNode->excludePkCol) taosArrayPop(pGroupIdCalc->pSortColsArr);
124,020✔
184
      code = extractKeysLen(pGroupIdCalc->pSortColsArr, &keyLen);
124,020✔
185
      QUERY_CHECK_CODE(code, lino, _error);
125,080✔
186
    }
187
    if (TSDB_CODE_SUCCESS == code) {
125,080✔
188
      pGroupIdCalc->lastKeysLen = 0;
125,080✔
189
      pGroupIdCalc->keyBuf = taosMemoryCalloc(1, keyLen);
125,080✔
190
      if (!pGroupIdCalc->keyBuf) {
124,550✔
191
        code = terrno;
×
192
      }
193
    }
194
  }
195
  if (code != TSDB_CODE_SUCCESS) goto _error;
36,694,402✔
196

197
  pInfo->binfo.inputTsOrder = pSortNode->node.inputTsOrder;
36,694,402✔
198
  pInfo->binfo.outputTsOrder = pSortNode->node.outputTsOrder;
36,688,590✔
199
  initLimitInfo(pSortNode->node.pLimit, pSortNode->node.pSlimit, &pInfo->limitInfo);
36,688,720✔
200

201
  setOperatorInfo(pOperator, "SortOperator", QUERY_NODE_PHYSICAL_PLAN_SORT, true, OP_NOT_OPENED, pInfo, pTaskInfo);
36,694,738✔
202

203

204
  // lazy evaluation for the following parameter since the input datablock is not known till now.
205
  //  pInfo->bufPageSize  = rowSize < 1024 ? 1024 * 2 : rowSize * 2;
206
  //  there are headers, so pageSize = rowSize + header pInfo->sortBufSize  = pInfo->bufPageSize * 16;
207
  // TODO dynamic set the available sort buffer
208

209
  pOperator->fpSet =
210
      createOperatorFpSet(doOpenSortOperator, doSort, NULL, destroySortOperatorInfo, optrDefaultBufFn, getExplainExecInfo, optrDefaultGetNextExtFn, NULL);
36,701,140✔
211

212
  setOperatorResetStateFn(pOperator, resetSortOperState);
36,693,224✔
213
  code = appendDownstream(pOperator, &downstream, 1);
36,693,361✔
214
  if (code != TSDB_CODE_SUCCESS) {
36,686,338✔
215
    goto _error;
×
216
  }
217

218
  *pOptrInfo = pOperator;
36,686,338✔
219
  return TSDB_CODE_SUCCESS;
36,681,878✔
220

221
_error:
×
222
  if (pInfo != NULL) {
×
223
    destroySortOperatorInfo(pInfo);
×
224
  }
225
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
226
  pTaskInfo->code = code;
×
227
  return code;
×
228
}
229

230
int32_t appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) {
2,147,483,647✔
231
  int32_t code = 0;
2,147,483,647✔
232
  for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
2,147,483,647✔
233
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
2,147,483,647✔
234
    if (pColInfo == NULL) {
2,147,483,647✔
235
      return terrno;
×
236
    }
237

238
    bool isNull = tsortIsNullVal(pTupleHandle, i);
2,147,483,647✔
239
    if (isNull) {
2,147,483,647✔
240
      colDataSetNULL(pColInfo, pBlock->info.rows);
2,147,483,647✔
241
    } else {
242
      char* pData = NULL;
2,147,483,647✔
243
      tsortGetValue(pTupleHandle, i, (void**) &pData);
2,147,483,647✔
244

245
      if (pData != NULL) {
2,147,483,647✔
246
        code = colDataSetVal(pColInfo, pBlock->info.rows, pData, false);
2,147,483,647✔
247
        if (code) {
2,147,483,647✔
248
          return code;
×
249
        }
250
      }
251
    }
252
  }
253

254
  pBlock->info.dataLoad = 1;
2,147,483,647✔
255

256
  SDataBlockInfo info = {0};
2,147,483,647✔
257
  tsortGetBlockInfo(pTupleHandle, &info);
2,147,483,647✔
258

259
  pBlock->info.scanFlag = info.scanFlag;
2,147,483,647✔
260
  pBlock->info.rows += 1;
2,147,483,647✔
261
  return code;
2,147,483,647✔
262
}
263

264
/**
265
 * @brief get next tuple with group id attached, here assume that all tuples are sorted by group keys
266
 * @param [in, out] pBlock the output block, the group id will be saved in it
267
 * @retval NULL if next group tuple arrived and this new group tuple will be saved in pInfo.pSavedTuple
268
 */
269
static int32_t nextTupleWithGroupId(SSortHandle* pHandle, SSortOperatorInfo* pInfo, SSDataBlock* pBlock,
2,147,483,647✔
270
                                    STupleHandle** pTupleHandle) {
271
  QRY_PARAM_CHECK(pTupleHandle);
2,147,483,647✔
272

273
  int32_t       code = 0;
2,147,483,647✔
274
  STupleHandle* retTuple = pInfo->pGroupIdCalc->pSavedTuple;
2,147,483,647✔
275
  if (!retTuple) {
2,147,483,647✔
276
    code = tsortNextTuple(pHandle, &retTuple);
2,147,483,647✔
277
    if (code) {
2,147,483,647✔
278
      qError("failed to get next tuple, code:%s", tstrerror(code));
×
279
      return code;
×
280
    }
281
  }
282

283
  if (retTuple) {
2,147,483,647✔
284
    int32_t newGroup;
285
    if (pInfo->pGroupIdCalc->pSavedTuple) {
2,147,483,647✔
286
      newGroup = true;
1,763,310✔
287
      pInfo->pGroupIdCalc->pSavedTuple = NULL;
1,763,310✔
288
    } else {
289
      newGroup = tsortCompAndBuildKeys(pInfo->pGroupIdCalc->pSortColsArr, pInfo->pGroupIdCalc->keyBuf,
2,147,483,647✔
290
                                       &pInfo->pGroupIdCalc->lastKeysLen, retTuple);
2,147,483,647✔
291
    }
292

293
    bool emptyBlock = (pBlock->info.rows == 0);
2,147,483,647✔
294
    if (newGroup) {
2,147,483,647✔
295
      if (!emptyBlock) {
3,652,760✔
296
        // new group arrived, and we have already copied some tuples for cur group, save the new group tuple, return
297
        // NULL. Note that the keyBuf and lastKeysLen has been updated to new value
298
        pInfo->pGroupIdCalc->pSavedTuple = retTuple;
1,764,370✔
299
        retTuple = NULL;
1,764,370✔
300
      } else {
301
        // new group with empty block
302
        pInfo->pGroupIdCalc->lastGroupId = pBlock->info.id.groupId =
1,888,390✔
303
            calcGroupId(pInfo->pGroupIdCalc->keyBuf, pInfo->pGroupIdCalc->lastKeysLen);
1,888,390✔
304
      }
305
    } else {
306
      if (emptyBlock) {
2,147,483,647✔
307
        // new block but not new group, assign last group id to it
308
        pBlock->info.id.groupId = pInfo->pGroupIdCalc->lastGroupId;
1,077,490✔
309
      } else {
310
        // not new group and not empty block and ret NOT NULL, just return the tuple
311
      }
312
    }
313
  }
314

315
  *pTupleHandle = retTuple;
2,147,483,647✔
316
  return code;
2,147,483,647✔
317
}
318

319
static int32_t getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo,
90,774,943✔
320
                                  const SOperatorInfo* pOperator, SSortOperatorInfo* pInfo, SSDataBlock** pResBlock) {
321
  QRY_PARAM_CHECK(pResBlock);
90,774,943✔
322
  blockDataCleanup(pDataBlock);
90,775,404✔
323

324
  int32_t       lino = 0;
90,775,975✔
325
  int32_t       code = 0;
90,775,975✔
326
  STupleHandle* pTupleHandle = NULL;
90,775,975✔
327
  SSDataBlock*  p = NULL;
90,775,975✔
328

329
  code = tsortGetSortedDataBlock(pHandle, &p);
90,775,986✔
330
  if (p == NULL || (code != 0)) {
90,776,927✔
331
    return code;
6,877,757✔
332
  }
333

334
  code = blockDataEnsureCapacity(p, capacity);
83,899,170✔
335
  QUERY_CHECK_CODE(code, lino, _error);
83,899,165✔
336

337
  while (1) {
338
    if (pInfo->pGroupIdCalc) {
2,147,483,647✔
339
      code = nextTupleWithGroupId(pHandle, pInfo, p, &pTupleHandle);
2,147,483,647✔
340
    } else {
341
      code = tsortNextTuple(pHandle, &pTupleHandle);
2,147,483,647✔
342
    }
343

344
    TSDB_CHECK_CODE(code, lino, _error);
2,147,483,647✔
345
    if (pTupleHandle == NULL) {
2,147,483,647✔
346
      break;
59,746,765✔
347
    }
348

349
    code = appendOneRowToDataBlock(p, pTupleHandle);
2,147,483,647✔
350
    QUERY_CHECK_CODE(code, lino, _error);
2,147,483,647✔
351

352
    if (p->info.rows >= capacity) {
2,147,483,647✔
353
      break;
24,152,850✔
354
    }
355
  }
356

357
  QUERY_CHECK_CODE(code, lino, _error);
83,899,615✔
358

359
  if (p->info.rows > 0) {
83,899,615✔
360
    code = blockDataEnsureCapacity(pDataBlock, capacity);
55,667,689✔
361
    QUERY_CHECK_CODE(code, lino, _error);
55,667,688✔
362

363
    // todo extract function to handle this
364
    int32_t numOfCols = taosArrayGetSize(pColMatchInfo);
55,667,688✔
365
    for (int32_t i = 0; i < numOfCols; ++i) {
310,626,558✔
366
      SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, i);
254,959,782✔
367
      QUERY_CHECK_NULL(pmInfo, code, lino, _error, terrno);
254,960,907✔
368

369
      int32_t srcSlotId = pmInfo->srcSlotId;
254,960,907✔
370

371
      // Fix: when the planner's setListSlotId resolves Sort pTargets by name, it may
372
      // mistakenly bind the primary timestamp column to the scalar expression result slot
373
      // (e.g. slot for ts+1000) instead of the original ts slot, because pushdownDataBlockSlots
374
      // added the expression slot with the same column name.  This causes the downstream
375
      // Project operator to apply the expression again (ts+1000 becomes ts+2000).
376
      // Detect this case and fall back to the original timestamp slot.
377
      if (pmInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID &&
254,960,573✔
378
          pmInfo->dataType.type == TSDB_DATA_TYPE_TIMESTAMP &&
63,811,548✔
379
          sortIsExprResultSlot(pOperator, srcSlotId)) {
31,832,823✔
380
        int32_t origSlot = sortFindOrigTsSlot(pOperator, p);
4,441,027✔
381
        if (origSlot >= 0 && origSlot != srcSlotId) {
4,441,027✔
382
          srcSlotId = origSlot;
4,441,027✔
383
        }
384
      }
385

386
      SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, srcSlotId);
254,960,906✔
387
      QUERY_CHECK_NULL(pSrc, code, lino, _error, terrno);
254,959,226✔
388

389
      SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->dstSlotId);
254,959,226✔
390
      QUERY_CHECK_NULL(pDst, code, lino, _error, terrno);
254,960,007✔
391

392
      code = colDataAssign(pDst, pSrc, p->info.rows, &pDataBlock->info);
254,960,007✔
393
      QUERY_CHECK_CODE(code, lino, _error);
254,958,870✔
394
    }
395

396
    pDataBlock->info.dataLoad = 1;
55,666,776✔
397
    pDataBlock->info.rows = p->info.rows;
55,665,749✔
398
    pDataBlock->info.scanFlag = p->info.scanFlag;
55,666,193✔
399
    // propagate both C-group id and baseGId from upstream
400
    pDataBlock->info.id.groupId = p->info.id.groupId;
55,666,632✔
401
    pDataBlock->info.id.baseGId = p->info.id.baseGId;
55,666,632✔
402
  }
403

404
  blockDataDestroy(p);
83,896,325✔
405
  *pResBlock = (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
83,896,743✔
406
  return code;
83,895,979✔
407

408
  _error:
×
409
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
410

411
  blockDataDestroy(p);
×
412
  return code;
×
413
}
414

415
int32_t loadNextDataBlock(void* param, SSDataBlock** ppBlock) {
2,147,483,647✔
416
  SOperatorInfo* pOperator = (SOperatorInfo*)param;
2,147,483,647✔
417
  int32_t        code = pOperator->fpSet.getNextFn(pOperator, ppBlock);
2,147,483,647✔
418
  if (code) {
2,147,483,647✔
419
    qError("failed to get next data block from upstream, %s code:%s", __func__, tstrerror(code));
×
420
  } else {
421
    code = blockDataCheck(*ppBlock);
2,147,483,647✔
422
    if (code) {
2,147,483,647✔
423
      qError("failed to check block data, %s code:%s", __func__, tstrerror(code));
×
424
    }
425
  }
426
  return code;
2,147,483,647✔
427
}
428

429
// todo refactor: merged with fetch fp
430
void applyScalarFunction(SSDataBlock* pBlock, void* param) {
2,147,483,647✔
431
  SOperatorInfo*     pOperator = param;
2,147,483,647✔
432
  SSortOperatorInfo* pSort = pOperator->info;
2,147,483,647✔
433
  if (pOperator->exprSupp.pExprInfo != NULL && pOperator->exprSupp.numOfExprs > 0) {
2,147,483,647✔
434
    int32_t code = projectApplyFunctions(pOperator->exprSupp.pExprInfo, pBlock, pBlock, pOperator->exprSupp.pCtx,
13,043,622✔
435
                                         pOperator->exprSupp.numOfExprs, NULL,
436
                                         GET_STM_RTINFO(pOperator->pTaskInfo), pOperator->pTaskInfo);
6,522,412✔
437
    if (code != TSDB_CODE_SUCCESS) {
6,522,569✔
438
      T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
439
    }
440
  }
441
}
2,147,483,647✔
442

443
int32_t doOpenSortOperator(SOperatorInfo* pOperator) {
90,854,957✔
444
  SSortOperatorInfo* pInfo = pOperator->info;
90,854,957✔
445
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;
90,855,960✔
446
  int32_t            code = TSDB_CODE_SUCCESS;
90,850,296✔
447
  int32_t            lino = 0;
90,850,296✔
448
  SSortSource* pSource =NULL;
90,850,296✔
449

450
  if (pOperator->exprSupp.pFilterInfo != NULL) {
90,850,296✔
451
    filterSetExecContext(pOperator->exprSupp.pFilterInfo, pTaskInfo, isTaskKilled);
1,270✔
452
  }
453

454
  if (OPTR_IS_OPENED(pOperator)) {
90,853,795✔
455
    return code;
54,064,972✔
456
  }
457

458
  //  pInfo->binfo.pRes is not equalled to the input datablock.
459
  pInfo->pSortHandle = NULL;
36,782,543✔
460
  code =
461
      tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str, pInfo->maxRows,
73,569,102✔
462
                            pInfo->maxTupleLength, tsPQSortMemThreshold * 1024 * 1024, &pInfo->pSortHandle);
36,789,221✔
463
  QUERY_CHECK_CODE(code, lino, _end);
36,779,671✔
464

465
  tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, applyScalarFunction, pOperator);
36,779,671✔
466

467
  pSource = taosMemoryCalloc(1, sizeof(SSortSource));
36,771,010✔
468
  QUERY_CHECK_NULL(pSource, code, lino, _end, terrno);
36,776,020✔
469

470
  pSource->param = pOperator->pDownstream[0];
36,776,020✔
471
  pSource->onlyRef = true;
36,783,313✔
472

473
  code = tsortAddSource(pInfo->pSortHandle, pSource);
36,786,331✔
474
  QUERY_CHECK_CODE(code, lino, _end);
36,779,432✔
475
  pSource = NULL;
36,779,432✔
476

477
  code = tsortOpen(pInfo->pSortHandle);
36,779,432✔
478
  QUERY_CHECK_CODE(code, lino, _end);
36,664,872✔
479
  pOperator->status = OP_RES_TO_RETURN;
36,664,324✔
480
  OPTR_SET_OPENED(pOperator);
36,664,335✔
481

482
_end:
36,664,872✔
483
  if (pSource) {
36,664,872✔
484
    taosMemoryFree(pSource);
×
485
  }
486
  if (code != TSDB_CODE_SUCCESS) {
36,664,271✔
487
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
548✔
488
    pTaskInfo->code = code;
548✔
489
    T_LONG_JMP(pTaskInfo->env, code);
548✔
490
  }
491
  return code;
36,663,723✔
492
}
493

494
int32_t doSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
90,892,782✔
495
  QRY_PARAM_CHECK(pResBlock);
90,892,782✔
496
  int32_t code = TSDB_CODE_SUCCESS;
90,893,976✔
497
  int32_t lino = 0;
90,893,976✔
498
  if (pOperator->status == OP_EXEC_DONE) {
90,893,976✔
499
    return code;
37,491✔
500
  }
501

502
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;
90,849,482✔
503
  SSortOperatorInfo* pInfo = pOperator->info;
90,852,665✔
504

505
  code = pOperator->fpSet._openFn(pOperator);
90,853,072✔
506
  QUERY_CHECK_CODE(code, lino, _end);
90,727,040✔
507

508
  // multi-group case not handle here
509
  SSDataBlock* pBlock = NULL;
90,727,040✔
510
  while (1) {
44,058✔
511
    if (tsortIsClosed(pInfo->pSortHandle)) {
90,772,734✔
512
      code = TSDB_CODE_TSC_QUERY_CANCELLED;
×
513
      QUERY_CHECK_CODE(code, lino, _end);
×
514
    }
515

516
    recordOpExecBeforeDownstream(pOperator);
90,776,169✔
517
    code = getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity,
90,774,819✔
518
                                pInfo->matchInfo.pList, pOperator, pInfo, &pBlock);
519
    recordOpExecAfterDownstream(pOperator, pBlock ? pBlock->info.rows : 0);
90,772,867✔
520
    QUERY_CHECK_CODE(code, lino, _end);
90,775,129✔
521
    if (pBlock == NULL) {
90,775,129✔
522
      setOperatorCompleted(pOperator);
35,108,643✔
523
      return code;
35,108,948✔
524
    }
525

526
    code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, &pInfo->matchInfo, NULL);
55,666,486✔
527
    QUERY_CHECK_CODE(code, lino, _end);
55,666,748✔
528

529
    if (blockDataGetNumOfRows(pBlock) == 0) {
55,666,748✔
530
      continue;
×
531
    }
532

533
    // there are bugs?
534
    bool limitReached = applyLimitOffset(&pInfo->limitInfo, pBlock, pTaskInfo);
55,664,383✔
535
    if (limitReached) {
55,667,093✔
536
      resetLimitInfoForNextGroup(&pInfo->limitInfo);
3,316,548✔
537
    }
538

539
    if (pBlock->info.rows > 0) {
55,665,885✔
540
      break;
55,620,904✔
541
    }
542
  }
543

544
  *pResBlock = blockDataGetNumOfRows(pBlock) > 0 ? pBlock : NULL;
55,620,904✔
545
_end:
55,621,370✔
546
  if (code != TSDB_CODE_SUCCESS) {
55,620,377✔
547
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
334✔
548
    pTaskInfo->code = code;
334✔
549
    T_LONG_JMP(pTaskInfo->env, code);
×
550
  }
551
  return code;
55,620,347✔
552
}
553

554
void destroySortOperatorInfo(void* param) {
36,703,383✔
555
  SSortOperatorInfo* pInfo = (SSortOperatorInfo*)param;
36,703,383✔
556
  blockDataDestroy(pInfo->binfo.pRes);
36,703,383✔
557
  pInfo->binfo.pRes = NULL;
36,704,962✔
558

559
  tsortDestroySortHandle(pInfo->pSortHandle);
36,705,558✔
560
  taosArrayDestroy(pInfo->pSortInfo);
36,703,949✔
561
  taosArrayDestroy(pInfo->matchInfo.pList);
36,705,160✔
562
  destroySortOpGroupIdCalc(pInfo->pGroupIdCalc);
36,704,731✔
563
  taosMemoryFreeClear(param);
36,704,305✔
564
}
36,705,761✔
565

566
int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
110,202✔
567
  SSortExecInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortExecInfo));
110,202✔
568
  if (pInfo == NULL) {
110,202✔
569
    return terrno;
×
570
  }
571

572
  SSortOperatorInfo* pOperatorInfo = (SSortOperatorInfo*)pOptr->info;
110,202✔
573

574
  *pInfo = tsortGetSortExecInfo(pOperatorInfo->pSortHandle);
110,202✔
575
  *pOptrExplain = pInfo;
110,202✔
576
  *len = sizeof(SSortExecInfo);
110,202✔
577
  return TSDB_CODE_SUCCESS;
110,202✔
578
}
579

580
static void calcSortOperMaxTupleLength(SSortOperatorInfo* pSortOperInfo, SNodeList* pSortKeys) {
36,702,287✔
581
  SColMatchInfo* pColItem = &pSortOperInfo->matchInfo;
36,702,287✔
582
  size_t         size = taosArrayGetSize(pColItem->pList);
36,705,269✔
583
  for (size_t i = 0; i < size; ++i) {
147,635,941✔
584
    SColMatchItem* pInfo = taosArrayGet(pColItem->pList, i);
110,940,668✔
585
    if (pInfo == NULL) {
110,942,558✔
586
      continue;
×
587
    }
588

589
    pSortOperInfo->maxTupleLength += pInfo->dataType.bytes;
110,942,558✔
590
  }
591

592
  size = LIST_LENGTH(pSortKeys);
36,695,273✔
593
  for (size_t i = 0; i < size; ++i) {
88,558,183✔
594
    SOrderByExprNode* pOrderExprNode = (SOrderByExprNode*)nodesListGetNode(pSortKeys, i);
51,860,895✔
595
    pSortOperInfo->maxTupleLength += ((SColumnNode*)pOrderExprNode->pExpr)->node.resType.bytes;
51,855,349✔
596
  }
597
}
36,697,288✔
598

599
static void destroySortOpGroupIdCalc(SSortOpGroupIdCalc* pCalc) {
36,704,285✔
600
  if (pCalc) {
36,704,285✔
601
    taosArrayDestroy(pCalc->pSortColsArr);
125,080✔
602
    taosMemoryFree(pCalc->keyBuf);
125,080✔
603
    taosMemoryFree(pCalc);
125,080✔
604
  }
605
}
36,704,285✔
606

607
//=====================================================================================
608
// Group Sort Operator
609
typedef enum EChildOperatorStatus { CHILD_OP_NEW_GROUP, CHILD_OP_SAME_GROUP, CHILD_OP_FINISHED } EChildOperatorStatus;
610

611
typedef struct SGroupSortOperatorInfo {
612
  SOptrBasicInfo       binfo;
613
  SArray*              pSortInfo;
614
  SColMatchInfo        matchInfo;
615
  int64_t              startTs;
616
  uint64_t             sortElapsed;
617
  bool                 hasGroupId;
618
  uint64_t             currGroupId;
619
  uint64_t             currBaseGId;
620
  SSDataBlock*         prefetchedSortInput;
621
  SSortHandle*         pCurrSortHandle;
622
  EChildOperatorStatus childOpStatus;
623
  SSortExecInfo        sortExecInfo;
624
} SGroupSortOperatorInfo;
625

626
int32_t getGroupSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo,
161,836✔
627
                                SGroupSortOperatorInfo* pInfo, SSDataBlock** pResBlock) {
628
  QRY_PARAM_CHECK(pResBlock);
161,836✔
629

630
  blockDataCleanup(pDataBlock);
161,836✔
631
  int32_t code = blockDataEnsureCapacity(pDataBlock, capacity);
161,836✔
632
  if (code) {
161,836✔
633
    return code;
×
634
  }
635

636
  SSDataBlock* p = NULL;
161,836✔
637
  code = tsortGetSortedDataBlock(pHandle, &p);
161,836✔
638
  if (p == NULL || (code != 0)) {
161,836✔
639
    return code;
×
640
  }
641

642
  code = blockDataEnsureCapacity(p, capacity);
161,836✔
643
  if (code) {
161,836✔
644
    return code;
×
645
  }
646

647
  while (1) {
8,544,400✔
648
    STupleHandle* pTupleHandle = NULL;
8,706,236✔
649
    code = tsortNextTuple(pHandle, &pTupleHandle);
8,706,236✔
650
    if (pTupleHandle == NULL || code != 0) {
8,706,236✔
651
      break;
652
    }
653

654
    code = appendOneRowToDataBlock(p, pTupleHandle);
8,544,400✔
655
    if (code) {
8,544,400✔
656
      break;
×
657
    }
658

659
    if (p->info.rows >= capacity) {
8,544,400✔
660
      break;
×
661
    }
662
  }
663

664
  if (p->info.rows > 0) {
161,836✔
665
    int32_t numOfCols = taosArrayGetSize(pColMatchInfo);
80,918✔
666
    for (int32_t i = 0; i < numOfCols; ++i) {
242,754✔
667
      SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, i);
161,836✔
668
      if (pmInfo == NULL) {
161,836✔
669
        return terrno;
×
670
      }
671

672
      SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId);
161,836✔
673
      if (pSrc == NULL) {
161,836✔
674
        return terrno;
×
675
      }
676

677
      SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->dstSlotId);
161,836✔
678
      if (pDst == NULL) {
161,836✔
679
        return terrno;
×
680
      }
681

682
      code = colDataAssign(pDst, pSrc, p->info.rows, &pDataBlock->info);
161,836✔
683
      if (code) {
161,836✔
684
        return code;
×
685
      }
686
    }
687

688
    pDataBlock->info.rows = p->info.rows;
80,918✔
689
    pDataBlock->info.capacity = p->info.rows;
80,918✔
690
    pDataBlock->info.scanFlag = p->info.scanFlag;
80,918✔
691
    // propagate ids for the current group
692
    pDataBlock->info.id.groupId = pInfo->currGroupId;
80,918✔
693
    pDataBlock->info.id.baseGId = pInfo->currBaseGId;
80,918✔
694
  }
695

696
  blockDataDestroy(p);
161,836✔
697
  *pResBlock = (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
161,836✔
698
  return code;
161,836✔
699
}
700

701
typedef struct SGroupSortSourceParam {
702
  SOperatorInfo*          childOpInfo;
703
  SGroupSortOperatorInfo* grpSortOpInfo;
704
} SGroupSortSourceParam;
705

706
int32_t fetchNextGroupSortDataBlock(void* param, SSDataBlock** ppBlock) {
161,836✔
707
  int32_t                 code = 0;
161,836✔
708
  int32_t                 lino = 0;
161,836✔
709
  SGroupSortSourceParam*  source = param;
161,836✔
710
  SGroupSortOperatorInfo* grpSortOpInfo = source->grpSortOpInfo;
161,836✔
711
  SSDataBlock*            block = NULL;
161,836✔
712

713
  QRY_PARAM_CHECK(ppBlock);
161,836✔
714

715
  if (grpSortOpInfo->prefetchedSortInput) {
161,836✔
716
    block = grpSortOpInfo->prefetchedSortInput;
80,918✔
717
    grpSortOpInfo->prefetchedSortInput = NULL;
80,918✔
718
    *ppBlock = block;
80,918✔
719
  } else {
720
    SOperatorInfo* childOp = source->childOpInfo;
80,918✔
721
    code = childOp->fpSet.getNextFn(childOp, &block);
80,918✔
722
    QUERY_CHECK_CODE(code, lino, _end);
80,918✔
723

724
    if (block != NULL) {
80,918✔
725
      code = blockDataCheck(block);
17,460✔
726
      QUERY_CHECK_CODE(code, lino, _end);
17,460✔
727
      if (block->info.id.groupId == grpSortOpInfo->currGroupId) {
17,460✔
728
        grpSortOpInfo->childOpStatus = CHILD_OP_SAME_GROUP;
×
729
        *ppBlock = block;
×
730
      } else {
731
        grpSortOpInfo->childOpStatus = CHILD_OP_NEW_GROUP;
17,460✔
732
        grpSortOpInfo->prefetchedSortInput = block;
17,460✔
733
      }
734
    } else {
735
      grpSortOpInfo->childOpStatus = CHILD_OP_FINISHED;
63,458✔
736
    }
737
  }
738

739
  return code;
161,836✔
740
_end:
×
741
  if (code != 0) {
×
742
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
743
  }
744
  return code;
×
745
}
746

747
int32_t beginSortGroup(SOperatorInfo* pOperator) {
80,918✔
748
  SGroupSortOperatorInfo* pInfo = pOperator->info;
80,918✔
749
  SExecTaskInfo*          pTaskInfo = pOperator->pTaskInfo;
80,918✔
750

751
  //  pInfo->binfo.pRes is not equalled to the input datablock.
752
  pInfo->pCurrSortHandle = NULL;
80,918✔
753

754
  int32_t code = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str, 0, 0,
80,918✔
755
                                       0, &pInfo->pCurrSortHandle);
756
  if (code) {
80,918✔
757
    return code;
×
758
  }
759

760
  tsortSetFetchRawDataFp(pInfo->pCurrSortHandle, fetchNextGroupSortDataBlock, applyScalarFunction, pOperator);
80,918✔
761

762
  SSortSource*           ps = taosMemoryCalloc(1, sizeof(SSortSource));
80,918✔
763
  SGroupSortSourceParam* param = taosMemoryCalloc(1, sizeof(SGroupSortSourceParam));
80,918✔
764
  if (ps == NULL || param == NULL) {
80,918✔
765
    taosMemoryFree(ps);
×
766
    taosMemoryFree(param);
×
767
    return terrno;
×
768
  }
769

770
  param->childOpInfo = pOperator->pDownstream[0];
80,918✔
771
  param->grpSortOpInfo = pInfo;
80,918✔
772

773
  ps->param = param;
80,918✔
774
  ps->onlyRef = false;
80,918✔
775
  code = tsortAddSource(pInfo->pCurrSortHandle, ps);
80,918✔
776
  if (code != 0) {
80,918✔
777
    return code;
×
778
  }
779

780
  code = tsortOpen(pInfo->pCurrSortHandle);
80,918✔
781
  return code;
80,918✔
782
}
783

784
int32_t finishSortGroup(SOperatorInfo* pOperator) {
80,918✔
785
  SGroupSortOperatorInfo* pInfo = pOperator->info;
80,918✔
786

787
  SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pCurrSortHandle);
80,918✔
788

789
  pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod;
80,918✔
790
  pInfo->sortExecInfo.sortBuffer = sortExecInfo.sortBuffer;
80,918✔
791
  pInfo->sortExecInfo.loops += sortExecInfo.loops;
80,918✔
792
  pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes;
80,918✔
793
  pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes;
80,918✔
794

795
  tsortDestroySortHandle(pInfo->pCurrSortHandle);
80,918✔
796
  pInfo->pCurrSortHandle = NULL;
80,918✔
797

798
  return TSDB_CODE_SUCCESS;
80,918✔
799
}
800

801
int32_t doGroupSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
167,887✔
802
  QRY_PARAM_CHECK(pResBlock);
167,887✔
803
  SExecTaskInfo*          pTaskInfo = pOperator->pTaskInfo;
167,887✔
804
  SGroupSortOperatorInfo* pInfo = pOperator->info;
167,887✔
805
  int32_t                 code = TSDB_CODE_SUCCESS;
167,887✔
806
  int32_t                 lino = 0;
167,887✔
807

808
  if (pOperator->status == OP_EXEC_DONE) {
167,887✔
809
    return code;
×
810
  }
811

812
  code = pOperator->fpSet._openFn(pOperator);
167,887✔
813
  QUERY_CHECK_CODE(code, lino, _end);
167,887✔
814

815
  if (!pInfo->hasGroupId) {
167,887✔
816
    pInfo->hasGroupId = true;
86,969✔
817

818
    pInfo->prefetchedSortInput = getNextBlockFromDownstream(pOperator, 0);
86,969✔
819
    if (pInfo->prefetchedSortInput == NULL) {
86,969✔
820
      setOperatorCompleted(pOperator);
23,511✔
821
      return code;
23,511✔
822
    }
823

824
    pInfo->currGroupId = pInfo->prefetchedSortInput->info.id.groupId;
63,458✔
825
    pInfo->currBaseGId = pInfo->prefetchedSortInput->info.id.baseGId;
63,458✔
826
    pInfo->childOpStatus = CHILD_OP_NEW_GROUP;
63,458✔
827
    code = beginSortGroup(pOperator);
63,458✔
828
    QUERY_CHECK_CODE(code, lino, _end);
63,458✔
829
  }
830

831
  SSDataBlock* pBlock = NULL;
144,376✔
832
  while (pInfo->pCurrSortHandle != NULL) {
161,836✔
833
    if (tsortIsClosed(pInfo->pCurrSortHandle)) {
161,836✔
834
      code = TSDB_CODE_TSC_QUERY_CANCELLED;
×
835
      QUERY_CHECK_CODE(code, lino, _end);
×
836
    }
837

838
    // beginSortGroup would fetch all child blocks of pInfo->currGroupId;
839
    if (pInfo->childOpStatus == CHILD_OP_SAME_GROUP) {
161,836✔
840
      code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
841
      QUERY_CHECK_CODE(code, lino, _end);
×
842
    }
843

844
    recordOpExecBeforeDownstream(pOperator);
161,836✔
845
    code = getGroupSortedBlockData(pInfo->pCurrSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity,
161,836✔
846
                                     pInfo->matchInfo.pList, pInfo, &pBlock);
847
    recordOpExecAfterDownstream(pOperator, pBlock ? pBlock->info.rows : 0);
161,836✔
848
    QUERY_CHECK_CODE(code, lino, _end);
161,836✔
849
    if (pBlock != NULL) {
161,836✔
850
      // keep both ids aligned with current group
851
      pBlock->info.id.groupId = pInfo->currGroupId;
80,918✔
852
      pBlock->info.id.baseGId = pInfo->currBaseGId;
80,918✔
853
      // baseGId follows upstream; if upstream is empty here, preserve current
854
      // (no-op if not set). We cannot reconstruct baseGId here; rely on upstream propagation.
855
      pOperator->resultInfo.totalRows += pBlock->info.rows;
80,918✔
856
      *pResBlock = pBlock;
80,918✔
857
      return code;
80,918✔
858
    } else {
859
      if (pInfo->childOpStatus == CHILD_OP_NEW_GROUP) {
80,918✔
860
        (void) finishSortGroup(pOperator);
17,460✔
861
        pInfo->currGroupId = pInfo->prefetchedSortInput->info.id.groupId;
17,460✔
862
        pInfo->currBaseGId = pInfo->prefetchedSortInput->info.id.baseGId;
17,460✔
863
        code = beginSortGroup(pOperator);
17,460✔
864
        QUERY_CHECK_CODE(code, lino, _end);
17,460✔
865
      } else if (pInfo->childOpStatus == CHILD_OP_FINISHED) {
63,458✔
866
        (void) finishSortGroup(pOperator);
63,458✔
867
        setOperatorCompleted(pOperator);
63,458✔
868
        return code;
63,458✔
869
      }
870
    }
871
  }
872

873
_end:
×
874
  if (code != TSDB_CODE_SUCCESS) {
×
875
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
876
    pTaskInfo->code = code;
×
877
    T_LONG_JMP(pTaskInfo->env, code);
×
878
  }
879
  return code;
×
880
}
881

882
int32_t getGroupSortExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
×
883
  SGroupSortOperatorInfo* pInfo = (SGroupSortOperatorInfo*)pOptr->info;
×
884
  *pOptrExplain = &pInfo->sortExecInfo;
×
885
  *len = sizeof(SSortExecInfo);
×
886
  return TSDB_CODE_SUCCESS;
×
887
}
888

889
void destroyGroupSortOperatorInfo(void* param) {
86,969✔
890
  SGroupSortOperatorInfo* pInfo = (SGroupSortOperatorInfo*)param;
86,969✔
891
  blockDataDestroy(pInfo->binfo.pRes);
86,969✔
892
  pInfo->binfo.pRes = NULL;
86,969✔
893

894
  taosArrayDestroy(pInfo->pSortInfo);
86,969✔
895
  taosArrayDestroy(pInfo->matchInfo.pList);
86,969✔
896

897
  tsortDestroySortHandle(pInfo->pCurrSortHandle);
86,969✔
898
  pInfo->pCurrSortHandle = NULL;
86,969✔
899

900
  taosMemoryFreeClear(param);
86,969✔
901
}
86,969✔
902

903
static int32_t resetGroupSortOperState(SOperatorInfo* pOper) {
×
904
  SGroupSortOperatorInfo* pInfo = pOper->info;
×
905
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
×
906
  pOper->status = OP_NOT_OPENED;
×
907

908
  pInfo->currGroupId = 0;
×
909
  pInfo->hasGroupId = false;
×
910
  pInfo->prefetchedSortInput = NULL;
×
911
  pInfo->childOpStatus = CHILD_OP_NEW_GROUP;
×
912
  pInfo->sortExecInfo = (SSortExecInfo){0};
×
913
  
914
  resetBasicOperatorState(&pInfo->binfo);
×
915
  destroySqlFunctionCtx(pOper->exprSupp.pCtx, pOper->exprSupp.pExprInfo, pOper->exprSupp.numOfExprs);
×
916
  taosMemoryFreeClear(pOper->exprSupp.rowEntryInfoOffset);
×
917
  pOper->exprSupp.pCtx =
×
918
      createSqlFunctionCtx(pOper->exprSupp.pExprInfo, pOper->exprSupp.numOfExprs, &pOper->exprSupp.rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore);
×
919

920
  tsortDestroySortHandle(pInfo->pCurrSortHandle);
×
921
  pInfo->pCurrSortHandle = NULL;
×
922

923
  return 0;
×
924
}
925

926
int32_t createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode,
86,969✔
927
                                    SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
928
  QRY_PARAM_CHECK(pOptrInfo);
86,969✔
929
  int32_t code = 0;
86,969✔
930
  int32_t lino = 0;
86,969✔
931

932
  SGroupSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupSortOperatorInfo));
86,969✔
933
  SOperatorInfo*          pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
86,969✔
934
  if (pInfo == NULL || pOperator == NULL) {
86,969✔
935
    code = terrno;
×
936
    goto _error;
×
937
  }
938
  initOperatorCostInfo(pOperator);
86,969✔
939

940
  SExprSupp*          pSup = &pOperator->exprSupp;
86,969✔
941
  SDataBlockDescNode* pDescNode = pSortPhyNode->node.pOutputDataBlockDesc;
86,969✔
942

943
  int32_t    numOfCols = 0;
86,969✔
944
  SExprInfo* pExprInfo = NULL;
86,969✔
945
  code = createExprInfo(pSortPhyNode->pExprs, NULL, &pExprInfo, &numOfCols);
86,969✔
946
  QUERY_CHECK_CODE(code, lino, _error);
86,969✔
947

948
  pSup->pExprInfo = pExprInfo;
86,969✔
949
  pSup->numOfExprs = numOfCols;
86,969✔
950

951
  initResultSizeInfo(&pOperator->resultInfo, 1024);
86,969✔
952
  pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset,
86,969✔
953
                                                  &pTaskInfo->storageAPI.functionStore);
954
  QUERY_CHECK_NULL(pOperator->exprSupp.pCtx, code, lino, _error, terrno);
86,969✔
955

956
  pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
86,969✔
957
  QUERY_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno);
86,969✔
958

959
  code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
86,969✔
960
  TSDB_CHECK_CODE(code, lino, _error);
86,969✔
961

962
  pInfo->binfo.inputTsOrder = pSortPhyNode->node.inputTsOrder;
86,969✔
963
  pInfo->binfo.outputTsOrder = pSortPhyNode->node.outputTsOrder;
86,969✔
964

965
  int32_t numOfOutputCols = 0;
86,969✔
966
  code = extractColMatchInfo(pSortPhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID,
86,969✔
967
                             &pInfo->matchInfo);
968
  TSDB_CHECK_CODE(code, lino, _error);
86,969✔
969

970
  pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys);
86,969✔
971
  setOperatorInfo(pOperator, "GroupSortOperator", QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT, false, OP_NOT_OPENED, pInfo,
86,969✔
972
                  pTaskInfo);
973
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doGroupSort, NULL, destroyGroupSortOperatorInfo,
86,969✔
974
                                         optrDefaultBufFn, getGroupSortExplainExecInfo, optrDefaultGetNextExtFn, NULL);
975

976
  setOperatorResetStateFn(pOperator, resetGroupSortOperState);
86,969✔
977
                                         
978
  code = appendDownstream(pOperator, &downstream, 1);
86,969✔
979
  if (code != TSDB_CODE_SUCCESS) {
86,969✔
980
    goto _error;
×
981
  }
982

983
  *pOptrInfo = pOperator;
86,969✔
984
  return TSDB_CODE_SUCCESS;
86,969✔
985

986
_error:
×
987
  pTaskInfo->code = code;
×
988
  if (pInfo != NULL) {
×
989
    destroyGroupSortOperatorInfo(pInfo);
×
990
  }
991
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
992
  return code;
×
993
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc