• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3524

08 Nov 2024 04:27AM UTC coverage: 60.898% (+5.0%) from 55.861%
#3524

push

travis-ci

web-flow
Merge pull request #28647 from taosdata/fix/3.0/TD-32519_drop_ctb

fix TD-32519 drop child table with tsma caused crash

118687 of 248552 branches covered (47.75%)

Branch coverage included in aggregate %.

286 of 337 new or added lines in 18 files covered. (84.87%)

9647 existing lines in 190 files now uncovered.

199106 of 273291 relevant lines covered (72.85%)

15236719.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.51
/source/libs/executor/src/sortoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "operator.h"
19
#include "querytask.h"
20
#include "tdatablock.h"
21

22
typedef struct SSortOpGroupIdCalc {
23
  STupleHandle* pSavedTuple;
24
  SArray*       pSortColsArr;
25
  char*         keyBuf;
26
  int32_t       lastKeysLen; // default to be 0
27
  uint64_t      lastGroupId;
28
  bool          excludePKCol;
29
} SSortOpGroupIdCalc;
30

31
typedef struct SSortOperatorInfo {
32
  SOptrBasicInfo      binfo;
33
  uint32_t            sortBufSize;  // max buffer size for in-memory sort
34
  SArray*             pSortInfo;
35
  SSortHandle*        pSortHandle;
36
  SColMatchInfo       matchInfo;
37
  int32_t             bufPageSize;
38
  int64_t             startTs;      // sort start time
39
  uint64_t            sortElapsed;  // sort elapsed time, time to flush to disk not included.
40
  SLimitInfo          limitInfo;
41
  uint64_t            maxTupleLength;
42
  int64_t             maxRows;
43
  SSortOpGroupIdCalc* pGroupIdCalc;
44
} SSortOperatorInfo;
45

46
static int32_t doSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
47
static int32_t doOpenSortOperator(SOperatorInfo* pOperator);
48
static int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len);
49
static int32_t doGroupSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
50

51
static void destroySortOperatorInfo(void* param);
52
static void calcSortOperMaxTupleLength(SSortOperatorInfo* pSortOperInfo, SNodeList* pSortKeys);
53

54
static void destroySortOpGroupIdCalc(SSortOpGroupIdCalc* pCalc);
55

56
// todo add limit/offset impl
57
int32_t createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
980,054✔
58
  QRY_PARAM_CHECK(pOptrInfo);
980,054!
59

60
  int32_t code = 0;
980,054✔
61
  int32_t lino = 0;
980,054✔
62

63
  SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo));
980,054✔
64
  SOperatorInfo*     pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
980,166✔
65
  if (pInfo == NULL || pOperator == NULL) {
980,204!
66
    code = terrno;
11✔
UNCOV
67
    goto _error;
×
68
  }
69

70
  pOperator->pTaskInfo = pTaskInfo;
980,193✔
71
  SDataBlockDescNode* pDescNode = pSortNode->node.pOutputDataBlockDesc;
980,193✔
72

73
  int32_t numOfCols = 0;
980,193✔
74
  code = createExprInfo(pSortNode->pExprs, NULL, &pOperator->exprSupp.pExprInfo, &numOfCols);
980,193✔
75
  QUERY_CHECK_CODE(code, lino, _error);
980,258!
76

77
  pOperator->exprSupp.numOfExprs = numOfCols;
980,258✔
78
  int32_t numOfOutputCols = 0;
980,258✔
79
  code =
80
      extractColMatchInfo(pSortNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo);
980,258✔
81
  if (code != TSDB_CODE_SUCCESS) {
980,232!
82
    goto _error;
×
83
  }
84
  
85
  calcSortOperMaxTupleLength(pInfo, pSortNode->pSortKeys);
980,232✔
86
  pInfo->maxRows = -1;
980,255✔
87
  if (pSortNode->node.pLimit) {
980,255✔
88
    SLimitNode* pLimit = (SLimitNode*)pSortNode->node.pLimit;
155,550✔
89
    if (pLimit->limit > 0) pInfo->maxRows = pLimit->limit + pLimit->offset;
155,550✔
90
  }
91

92
  pOperator->exprSupp.pCtx =
980,219✔
93
      createSqlFunctionCtx(pOperator->exprSupp.pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore);
980,255✔
94
  QUERY_CHECK_NULL(pOperator->exprSupp.pCtx, code, lino, _error, terrno);
980,219!
95
  initResultSizeInfo(&pOperator->resultInfo, 1024);
980,219✔
96
  code = filterInitFromNode((SNode*)pSortNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
980,178✔
97
  if (code != TSDB_CODE_SUCCESS) {
980,236!
98
    goto _error;
×
99
  }
100

101
  pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
980,236✔
102
  QUERY_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno);
980,263!
103

104
  pInfo->pSortInfo = createSortInfo(pSortNode->pSortKeys);
980,263✔
105
  TSDB_CHECK_NULL(pInfo->pSortInfo, code, lino, _error, terrno);
980,271✔
106

107
  if (pSortNode->calcGroupId) {
980,144✔
108
    int32_t keyLen;
109
    SSortOpGroupIdCalc* pGroupIdCalc = pInfo->pGroupIdCalc = taosMemoryCalloc(1, sizeof(SSortOpGroupIdCalc));
944✔
110
    if (!pGroupIdCalc) {
944!
111
      code = terrno;
×
112
      goto _error;
×
113
    }
114
    SNodeList* pSortColsNodeArr = makeColsNodeArrFromSortKeys(pSortNode->pSortKeys);
944✔
115
    if (!pSortColsNodeArr) code = TSDB_CODE_OUT_OF_MEMORY;
944!
116
    if (TSDB_CODE_SUCCESS == code) {
944!
117
      pGroupIdCalc->pSortColsArr = makeColumnArrayFromList(pSortColsNodeArr);
944✔
118
      if (!pGroupIdCalc->pSortColsArr) code = TSDB_CODE_OUT_OF_MEMORY;
944!
119
      nodesClearList(pSortColsNodeArr);
944✔
120
    }
121
    if (TSDB_CODE_SUCCESS == code) {
945✔
122
      // PK ts col should always at last, see partColOptCreateSort
123
      if (pSortNode->excludePkCol) taosArrayPop(pGroupIdCalc->pSortColsArr);
944✔
124
      code = extractKeysLen(pGroupIdCalc->pSortColsArr, &keyLen);
944✔
125
      QUERY_CHECK_CODE(code, lino, _error);
943!
126
    }
127
    if (TSDB_CODE_SUCCESS == code) {
944!
128
      pGroupIdCalc->lastKeysLen = 0;
944✔
129
      pGroupIdCalc->keyBuf = taosMemoryCalloc(1, keyLen);
944✔
130
      if (!pGroupIdCalc->keyBuf) {
944!
131
        code = terrno;
×
132
      }
133
    }
134
  }
135
  if (code != TSDB_CODE_SUCCESS) goto _error;
980,144!
136

137
  pInfo->binfo.inputTsOrder = pSortNode->node.inputTsOrder;
980,144✔
138
  pInfo->binfo.outputTsOrder = pSortNode->node.outputTsOrder;
980,144✔
139
  initLimitInfo(pSortNode->node.pLimit, pSortNode->node.pSlimit, &pInfo->limitInfo);
980,144✔
140

141
  setOperatorInfo(pOperator, "SortOperator", QUERY_NODE_PHYSICAL_PLAN_SORT, true, OP_NOT_OPENED, pInfo, pTaskInfo);
980,260✔
142

143

144
  // lazy evaluation for the following parameter since the input datablock is not known till now.
145
  //  pInfo->bufPageSize  = rowSize < 1024 ? 1024 * 2 : rowSize * 2;
146
  //  there are headers, so pageSize = rowSize + header pInfo->sortBufSize  = pInfo->bufPageSize * 16;
147
  // TODO dynamic set the available sort buffer
148

149
  pOperator->fpSet =
150
      createOperatorFpSet(doOpenSortOperator, doSort, NULL, destroySortOperatorInfo, optrDefaultBufFn, getExplainExecInfo, optrDefaultGetNextExtFn, NULL);
980,253✔
151

152
  code = appendDownstream(pOperator, &downstream, 1);
980,227✔
153
  if (code != TSDB_CODE_SUCCESS) {
980,281!
154
    goto _error;
×
155
  }
156

157
  *pOptrInfo = pOperator;
980,281✔
158
  return TSDB_CODE_SUCCESS;
980,281✔
159

UNCOV
160
_error:
×
UNCOV
161
  if (pInfo != NULL) {
×
UNCOV
162
    destroySortOperatorInfo(pInfo);
×
163
  }
UNCOV
164
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
UNCOV
165
  pTaskInfo->code = code;
×
UNCOV
166
  return code;
×
167
}
168

169
int32_t appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) {
1,966,355,166✔
170
  int32_t code = 0;
1,966,355,166✔
171
  for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
2,147,483,647✔
172
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
2,147,483,647✔
173
    if (pColInfo == NULL) {
2,147,483,647!
174
      return terrno;
×
175
    }
176

177
    bool isNull = tsortIsNullVal(pTupleHandle, i);
2,147,483,647✔
178
    if (isNull) {
2,147,483,647✔
179
      colDataSetNULL(pColInfo, pBlock->info.rows);
1,044,218,862✔
180
    } else {
181
      char* pData = NULL;
2,147,483,647✔
182
      tsortGetValue(pTupleHandle, i, (void**) &pData);
2,147,483,647✔
183

184
      if (pData != NULL) {
2,147,483,647✔
185
        code = colDataSetVal(pColInfo, pBlock->info.rows, pData, false);
2,147,483,647✔
186
        if (code) {
2,147,483,647!
UNCOV
187
          return code;
×
188
        }
189
      }
190
    }
191
  }
192

193
  pBlock->info.dataLoad = 1;
1,831,096,269✔
194

195
  SDataBlockInfo info = {0};
1,831,096,269✔
196
  tsortGetBlockInfo(pTupleHandle, &info);
1,831,096,269✔
197

198
  pBlock->info.scanFlag = info.scanFlag;
1,969,901,150✔
199
  pBlock->info.rows += 1;
1,969,901,150✔
200
  return code;
1,969,901,150✔
201
}
202

203
/**
204
 * @brief get next tuple with group id attached, here assume that all tuples are sorted by group keys
205
 * @param [in, out] pBlock the output block, the group id will be saved in it
206
 * @retval NULL if next group tuple arrived and this new group tuple will be saved in pInfo.pSavedTuple
207
 */
208
static int32_t nextTupleWithGroupId(SSortHandle* pHandle, SSortOperatorInfo* pInfo, SSDataBlock* pBlock,
21,757,404✔
209
                                    STupleHandle** pTupleHandle) {
210
  QRY_PARAM_CHECK(pTupleHandle);
21,757,404!
211

212
  int32_t       code = 0;
21,757,404✔
213
  STupleHandle* retTuple = pInfo->pGroupIdCalc->pSavedTuple;
21,757,404✔
214
  if (!retTuple) {
21,757,404✔
215
    code = tsortNextTuple(pHandle, &retTuple);
21,285,889✔
216
    if (code) {
21,377,536✔
217
      qError("failed to get next tuple, code:%s", tstrerror(code));
18,689!
218
      return code;
×
219
    }
220
  }
221

222
  if (retTuple) {
21,830,362!
223
    int32_t newGroup;
224
    if (pInfo->pGroupIdCalc->pSavedTuple) {
21,841,333✔
225
      newGroup = true;
480,541✔
226
      pInfo->pGroupIdCalc->pSavedTuple = NULL;
480,541✔
227
    } else {
228
      newGroup = tsortCompAndBuildKeys(pInfo->pGroupIdCalc->pSortColsArr, pInfo->pGroupIdCalc->keyBuf,
21,360,792✔
229
                                       &pInfo->pGroupIdCalc->lastKeysLen, retTuple);
21,360,792✔
230
    }
231

232
    bool emptyBlock = (pBlock->info.rows == 0);
21,750,333✔
233
    if (newGroup) {
21,750,333✔
234
      if (!emptyBlock) {
956,245✔
235
        // new group arrived, and we have already copied some tuples for cur group, save the new group tuple, return
236
        // NULL. Note that the keyBuf and lastKeysLen has been updated to new value
237
        pInfo->pGroupIdCalc->pSavedTuple = retTuple;
481,479✔
238
        retTuple = NULL;
481,479✔
239
      } else {
240
        // new group with empty block
241
        pInfo->pGroupIdCalc->lastGroupId = pBlock->info.id.groupId =
483,857✔
242
            calcGroupId(pInfo->pGroupIdCalc->keyBuf, pInfo->pGroupIdCalc->lastKeysLen);
474,766✔
243
      }
244
    } else {
245
      if (emptyBlock) {
20,794,088✔
246
        // new block but not new group, assign last group id to it
247
        pBlock->info.id.groupId = pInfo->pGroupIdCalc->lastGroupId;
6,791✔
248
      } else {
249
        // not new group and not empty block and ret NOT NULL, just return the tuple
250
      }
251
    }
252
  }
253

254
  *pTupleHandle = retTuple;
21,748,453✔
255
  return code;
21,748,453✔
256
}
257

258
static int32_t getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo,
3,609,324✔
259
                                SSortOperatorInfo* pInfo, SSDataBlock** pResBlock) {
260
  QRY_PARAM_CHECK(pResBlock);
3,609,324!
261
  blockDataCleanup(pDataBlock);
3,609,324✔
262

263
  int32_t       lino = 0;
3,604,103✔
264
  int32_t       code = 0;
3,604,103✔
265
  STupleHandle* pTupleHandle = NULL;
3,604,103✔
266
  SSDataBlock*  p = NULL;
3,604,103✔
267

268
  code = tsortGetSortedDataBlock(pHandle, &p);
3,604,103✔
269
  if (p == NULL || (code != 0)) {
3,606,197!
270
    return code;
117,984✔
271
  }
272

273
  code = blockDataEnsureCapacity(p, capacity);
3,488,213✔
274
  QUERY_CHECK_CODE(code, lino, _error);
3,490,730!
275

276
  while (1) {
277
    if (pInfo->pGroupIdCalc) {
1,498,175,177✔
278
      code = nextTupleWithGroupId(pHandle, pInfo, p, &pTupleHandle);
21,763,533✔
279
    } else {
280
      code = tsortNextTuple(pHandle, &pTupleHandle);
1,476,411,644✔
281
    }
282

283
    TSDB_CHECK_CODE(code, lino, _error);
1,498,373,258!
284
    if (pTupleHandle == NULL) {
1,498,373,258✔
285
      break;
2,166,716✔
286
    }
287

288
    code = appendOneRowToDataBlock(p, pTupleHandle);
1,496,206,542✔
289
    QUERY_CHECK_CODE(code, lino, _error);
1,495,864,527!
290

291
    if (p->info.rows >= capacity) {
1,495,864,527✔
292
      break;
1,180,080✔
293
    }
294
  }
295

296
  QUERY_CHECK_CODE(code, lino, _error);
3,346,796!
297

298
  if (p->info.rows > 0) {
3,346,796✔
299
    code = blockDataEnsureCapacity(pDataBlock, capacity);
2,666,790✔
300
    QUERY_CHECK_CODE(code, lino, _error);
2,666,323!
301

302
    // todo extract function to handle this
303
    int32_t numOfCols = taosArrayGetSize(pColMatchInfo);
2,666,323✔
304
    for (int32_t i = 0; i < numOfCols; ++i) {
8,867,580✔
305
      SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, i);
6,210,352✔
306
      QUERY_CHECK_NULL(pmInfo, code, lino, _error, terrno);
6,206,203!
307

308
      SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId);
6,206,203✔
309
      QUERY_CHECK_NULL(pSrc, code, lino, _error, terrno);
6,202,643!
310

311
      SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->dstSlotId);
6,202,643✔
312
      QUERY_CHECK_NULL(pDst, code, lino, _error, terrno);
6,196,960!
313

314
      code = colDataAssign(pDst, pSrc, p->info.rows, &pDataBlock->info);
6,196,960✔
315
      QUERY_CHECK_CODE(code, lino, _error);
6,201,731!
316
    }
317

318
    pDataBlock->info.dataLoad = 1;
2,657,228✔
319
    pDataBlock->info.rows = p->info.rows;
2,657,228✔
320
    pDataBlock->info.scanFlag = p->info.scanFlag;
2,657,228✔
321
    pDataBlock->info.id.groupId = p->info.id.groupId;
2,657,228✔
322
  }
323

324
  blockDataDestroy(p);
3,337,234✔
325
  *pResBlock = (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
3,491,411✔
326
  return code;
3,491,411✔
327

UNCOV
328
  _error:
×
UNCOV
329
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
330

UNCOV
331
  blockDataDestroy(p);
×
UNCOV
332
  return code;
×
333
}
334

335
int32_t loadNextDataBlock(void* param, SSDataBlock** ppBlock) {
2,480,086✔
336
  SOperatorInfo* pOperator = (SOperatorInfo*)param;
2,480,086✔
337
  int32_t        code = pOperator->fpSet.getNextFn(pOperator, ppBlock);
2,480,086✔
338
  if (code) {
2,480,125!
339
    qError("failed to get next data block from upstream, %s code:%s", __func__, tstrerror(code));
×
340
  } else {
341
    code = blockDataCheck(*ppBlock);
2,480,125✔
342
    if (code) {
2,480,025!
343
      qError("failed to check block data, %s code:%s", __func__, tstrerror(code));
×
344
    }
345
  }
346
  return code;
2,479,979✔
347
}
348

349
// todo refactor: merged with fetch fp
350
void applyScalarFunction(SSDataBlock* pBlock, void* param) {
1,500,168✔
351
  SOperatorInfo*     pOperator = param;
1,500,168✔
352
  SSortOperatorInfo* pSort = pOperator->info;
1,500,168✔
353
  if (pOperator->exprSupp.pExprInfo != NULL && pOperator->exprSupp.numOfExprs > 0) {
1,500,168!
354
    int32_t code = projectApplyFunctions(pOperator->exprSupp.pExprInfo, pBlock, pBlock, pOperator->exprSupp.pCtx,
984✔
355
                                         pOperator->exprSupp.numOfExprs, NULL);
356
    if (code != TSDB_CODE_SUCCESS) {
984!
357
      T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
358
    }
359
  }
360
}
1,500,168✔
361

362
int32_t doOpenSortOperator(SOperatorInfo* pOperator) {
3,606,050✔
363
  SSortOperatorInfo* pInfo = pOperator->info;
3,606,050✔
364
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;
3,606,050✔
365
  int32_t            code = TSDB_CODE_SUCCESS;
3,606,050✔
366
  int32_t            lino = 0;
3,606,050✔
367
  SSortSource* pSource =NULL;
3,606,050✔
368

369
  if (OPTR_IS_OPENED(pOperator)) {
3,606,050✔
370
    return code;
2,626,237✔
371
  }
372

373
  pInfo->startTs = taosGetTimestampUs();
980,198✔
374
  //  pInfo->binfo.pRes is not equalled to the input datablock.
375
  pInfo->pSortHandle = NULL;
980,198✔
376
  code =
377
      tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str, pInfo->maxRows,
980,198✔
378
                            pInfo->maxTupleLength, tsPQSortMemThreshold * 1024 * 1024, &pInfo->pSortHandle);
980,198✔
379
  QUERY_CHECK_CODE(code, lino, _end);
980,227!
380

381
  tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, applyScalarFunction, pOperator);
980,227✔
382

383
  pSource = taosMemoryCalloc(1, sizeof(SSortSource));
980,107✔
384
  QUERY_CHECK_NULL(pSource, code, lino, _end, terrno);
980,282!
385

386
  pSource->param = pOperator->pDownstream[0];
980,282✔
387
  pSource->onlyRef = true;
980,282✔
388

389
  code = tsortAddSource(pInfo->pSortHandle, pSource);
980,282✔
390
  QUERY_CHECK_CODE(code, lino, _end);
980,212!
391
  pSource = NULL;
980,212✔
392

393
  code = tsortOpen(pInfo->pSortHandle);
980,212✔
394
  QUERY_CHECK_CODE(code, lino, _end);
980,264!
395
  pOperator->cost.openCost = (taosGetTimestampUs() - pInfo->startTs) / 1000.0;
980,286✔
396
  pOperator->status = OP_RES_TO_RETURN;
980,286✔
397
  OPTR_SET_OPENED(pOperator);
980,286✔
398

399
_end:
980,286✔
400
  if (pSource) {
980,286!
401
    taosMemoryFree(pSource);
×
402
  }
403
  if (code != TSDB_CODE_SUCCESS) {
980,270!
UNCOV
404
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
405
    pTaskInfo->code = code;
×
UNCOV
406
    T_LONG_JMP(pTaskInfo->env, code);
×
407
  }
408
  return code;
980,270✔
409
}
410

411
int32_t doSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
3,606,165✔
412
  QRY_PARAM_CHECK(pResBlock);
3,606,165!
413
  int32_t code = TSDB_CODE_SUCCESS;
3,606,165✔
414
  int32_t lino = 0;
3,606,165✔
415
  if (pOperator->status == OP_EXEC_DONE) {
3,606,165✔
416
    return code;
8✔
417
  }
418

419
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;
3,606,157✔
420
  SSortOperatorInfo* pInfo = pOperator->info;
3,606,157✔
421

422
  code = pOperator->fpSet._openFn(pOperator);
3,606,157✔
423
  QUERY_CHECK_CODE(code, lino, _end);
3,605,998!
424

425
  // multi-group case not handle here
426
  SSDataBlock* pBlock = NULL;
3,605,998✔
427
  while (1) {
752✔
428
    if (tsortIsClosed(pInfo->pSortHandle)) {
3,606,750!
429
      code = TSDB_CODE_TSC_QUERY_CANCELLED;
×
430
      QUERY_CHECK_CODE(code, lino, _end);
×
431
    }
432

433
    code = getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity,
3,609,542✔
434
                                pInfo->matchInfo.pList, pInfo, &pBlock);
435
    QUERY_CHECK_CODE(code, lino, _end);
3,609,120!
436
    if (pBlock == NULL) {
3,609,120✔
437
      setOperatorCompleted(pOperator);
942,047✔
438
      return code;
942,043✔
439
    }
440

441
    code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, &pInfo->matchInfo);
2,667,073✔
442
    QUERY_CHECK_CODE(code, lino, _end);
2,666,378!
443

444
    if (blockDataGetNumOfRows(pBlock) == 0) {
2,666,378!
445
      continue;
×
446
    }
447

448
    // there are bugs?
449
    bool limitReached = applyLimitOffset(&pInfo->limitInfo, pBlock, pTaskInfo);
2,666,084✔
450
    if (limitReached) {
2,665,737✔
451
      resetLimitInfoForNextGroup(&pInfo->limitInfo);
57,165✔
452
    }
453

454
    pOperator->resultInfo.totalRows += pBlock->info.rows;
2,665,759✔
455
    if (pBlock->info.rows > 0) {
2,665,759✔
456
      break;
2,665,007✔
457
    }
458
  }
459

460
  *pResBlock = blockDataGetNumOfRows(pBlock) > 0 ? pBlock : NULL;
2,665,007✔
461
_end:
2,663,569✔
462
  if (code != TSDB_CODE_SUCCESS) {
2,663,569!
UNCOV
463
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
464
    pTaskInfo->code = code;
×
UNCOV
465
    T_LONG_JMP(pTaskInfo->env, code);
×
466
  }
467
  return code;
2,663,569✔
468
}
469

470
void destroySortOperatorInfo(void* param) {
980,310✔
471
  SSortOperatorInfo* pInfo = (SSortOperatorInfo*)param;
980,310✔
472
  blockDataDestroy(pInfo->binfo.pRes);
980,310✔
473
  pInfo->binfo.pRes = NULL;
980,315✔
474

475
  tsortDestroySortHandle(pInfo->pSortHandle);
980,315✔
476
  taosArrayDestroy(pInfo->pSortInfo);
980,323✔
477
  taosArrayDestroy(pInfo->matchInfo.pList);
980,322✔
478
  destroySortOpGroupIdCalc(pInfo->pGroupIdCalc);
980,324✔
479
  taosMemoryFreeClear(param);
980,316!
480
}
980,322✔
481

482
int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
57,237✔
483
  SSortExecInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortExecInfo));
57,237✔
484
  if (pInfo == NULL) {
57,237!
485
    return terrno;
×
486
  }
487

488
  SSortOperatorInfo* pOperatorInfo = (SSortOperatorInfo*)pOptr->info;
57,237✔
489

490
  *pInfo = tsortGetSortExecInfo(pOperatorInfo->pSortHandle);
57,237✔
491
  *pOptrExplain = pInfo;
57,237✔
492
  *len = sizeof(SSortExecInfo);
57,237✔
493
  return TSDB_CODE_SUCCESS;
57,237✔
494
}
495

496
static void calcSortOperMaxTupleLength(SSortOperatorInfo* pSortOperInfo, SNodeList* pSortKeys) {
980,146✔
497
  SColMatchInfo* pColItem = &pSortOperInfo->matchInfo;
980,146✔
498
  size_t         size = taosArrayGetSize(pColItem->pList);
980,146✔
499
  for (size_t i = 0; i < size; ++i) {
4,527,467✔
500
    SColMatchItem* pInfo = taosArrayGet(pColItem->pList, i);
3,547,360✔
501
    if (pInfo == NULL) {
3,547,278!
502
      continue;
×
503
    }
504

505
    pSortOperInfo->maxTupleLength += pInfo->dataType.bytes;
3,547,278✔
506
  }
507

508
  size = LIST_LENGTH(pSortKeys);
980,107!
509
  for (size_t i = 0; i < size; ++i) {
2,116,579✔
510
    SOrderByExprNode* pOrderExprNode = (SOrderByExprNode*)nodesListGetNode(pSortKeys, i);
1,136,442✔
511
    pSortOperInfo->maxTupleLength += ((SColumnNode*)pOrderExprNode->pExpr)->node.resType.bytes;
1,136,472✔
512
  }
513
}
980,137✔
514

515
static void destroySortOpGroupIdCalc(SSortOpGroupIdCalc* pCalc) {
980,314✔
516
  if (pCalc) {
980,314✔
517
    taosArrayDestroy(pCalc->pSortColsArr);
944✔
518
    taosMemoryFree(pCalc->keyBuf);
944✔
519
    taosMemoryFree(pCalc);
944✔
520
  }
521
}
980,314✔
522

523
//=====================================================================================
524
// Group Sort Operator
525
typedef enum EChildOperatorStatus { CHILD_OP_NEW_GROUP, CHILD_OP_SAME_GROUP, CHILD_OP_FINISHED } EChildOperatorStatus;
526

527
typedef struct SGroupSortOperatorInfo {
528
  SOptrBasicInfo       binfo;
529
  SArray*              pSortInfo;
530
  SColMatchInfo        matchInfo;
531
  int64_t              startTs;
532
  uint64_t             sortElapsed;
533
  bool                 hasGroupId;
534
  uint64_t             currGroupId;
535
  SSDataBlock*         prefetchedSortInput;
536
  SSortHandle*         pCurrSortHandle;
537
  EChildOperatorStatus childOpStatus;
538
  SSortExecInfo        sortExecInfo;
539
} SGroupSortOperatorInfo;
540

541
int32_t getGroupSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo,
360✔
542
                                SGroupSortOperatorInfo* pInfo, SSDataBlock** pResBlock) {
543
  QRY_PARAM_CHECK(pResBlock);
360!
544

545
  blockDataCleanup(pDataBlock);
360✔
546
  int32_t code = blockDataEnsureCapacity(pDataBlock, capacity);
360✔
547
  if (code) {
360!
548
    return code;
×
549
  }
550

551
  SSDataBlock* p = NULL;
360✔
552
  code = tsortGetSortedDataBlock(pHandle, &p);
360✔
553
  if (p == NULL || (code != 0)) {
360!
554
    return code;
×
555
  }
556

557
  code = blockDataEnsureCapacity(p, capacity);
360✔
558
  if (code) {
360!
559
    return code;
×
560
  }
561

562
  while (1) {
18,000✔
563
    STupleHandle* pTupleHandle = NULL;
18,360✔
564
    code = tsortNextTuple(pHandle, &pTupleHandle);
18,360✔
565
    if (pTupleHandle == NULL || code != 0) {
18,360!
566
      break;
567
    }
568

569
    code = appendOneRowToDataBlock(p, pTupleHandle);
18,000✔
570
    if (code) {
18,000!
571
      break;
×
572
    }
573

574
    if (p->info.rows >= capacity) {
18,000!
575
      break;
×
576
    }
577
  }
578

579
  if (p->info.rows > 0) {
360✔
580
    int32_t numOfCols = taosArrayGetSize(pColMatchInfo);
180✔
581
    for (int32_t i = 0; i < numOfCols; ++i) {
540✔
582
      SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, i);
360✔
583
      if (pmInfo == NULL) {
360!
584
        return terrno;
×
585
      }
586

587
      SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId);
360✔
588
      if (pSrc == NULL) {
360!
589
        return terrno;
×
590
      }
591

592
      SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->dstSlotId);
360✔
593
      if (pDst == NULL) {
360!
594
        return terrno;
×
595
      }
596

597
      code = colDataAssign(pDst, pSrc, p->info.rows, &pDataBlock->info);
360✔
598
      if (code) {
360!
599
        return code;
×
600
      }
601
    }
602

603
    pDataBlock->info.rows = p->info.rows;
180✔
604
    pDataBlock->info.capacity = p->info.rows;
180✔
605
    pDataBlock->info.scanFlag = p->info.scanFlag;
180✔
606
  }
607

608
  blockDataDestroy(p);
360✔
609
  *pResBlock = (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
360✔
610
  return code;
360✔
611
}
612

613
typedef struct SGroupSortSourceParam {
614
  SOperatorInfo*          childOpInfo;
615
  SGroupSortOperatorInfo* grpSortOpInfo;
616
} SGroupSortSourceParam;
617

618
int32_t fetchNextGroupSortDataBlock(void* param, SSDataBlock** ppBlock) {
450✔
619
  int32_t                 code = 0;
450✔
620
  int32_t                 lino = 0;
450✔
621
  SGroupSortSourceParam*  source = param;
450✔
622
  SGroupSortOperatorInfo* grpSortOpInfo = source->grpSortOpInfo;
450✔
623
  SSDataBlock*            block = NULL;
450✔
624

625
  QRY_PARAM_CHECK(ppBlock);
450!
626

627
  if (grpSortOpInfo->prefetchedSortInput) {
450✔
628
    block = grpSortOpInfo->prefetchedSortInput;
180✔
629
    grpSortOpInfo->prefetchedSortInput = NULL;
180✔
630
    *ppBlock = block;
180✔
631
  } else {
632
    SOperatorInfo* childOp = source->childOpInfo;
270✔
633
    code = childOp->fpSet.getNextFn(childOp, &block);
270✔
634
    QUERY_CHECK_CODE(code, lino, _end);
270!
635

636
    if (block != NULL) {
270✔
637
      code = blockDataCheck(block);
90✔
638
      QUERY_CHECK_CODE(code, lino, _end);
90!
639
      if (block->info.id.groupId == grpSortOpInfo->currGroupId) {
90!
640
        grpSortOpInfo->childOpStatus = CHILD_OP_SAME_GROUP;
90✔
641
        *ppBlock = block;
90✔
642
      } else {
643
        grpSortOpInfo->childOpStatus = CHILD_OP_NEW_GROUP;
×
644
        grpSortOpInfo->prefetchedSortInput = block;
×
645
      }
646
    } else {
647
      grpSortOpInfo->childOpStatus = CHILD_OP_FINISHED;
180✔
648
    }
649
  }
650

651
  return code;
450✔
652
_end:
×
653
  if (code != 0) {
×
654
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
655
  }
656
  return code;
×
657
}
658

659
int32_t beginSortGroup(SOperatorInfo* pOperator) {
180✔
660
  SGroupSortOperatorInfo* pInfo = pOperator->info;
180✔
661
  SExecTaskInfo*          pTaskInfo = pOperator->pTaskInfo;
180✔
662

663
  //  pInfo->binfo.pRes is not equalled to the input datablock.
664
  pInfo->pCurrSortHandle = NULL;
180✔
665

666
  int32_t code = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str, 0, 0,
180✔
667
                                       0, &pInfo->pCurrSortHandle);
668
  if (code) {
180!
669
    return code;
×
670
  }
671

672
  tsortSetFetchRawDataFp(pInfo->pCurrSortHandle, fetchNextGroupSortDataBlock, applyScalarFunction, pOperator);
180✔
673

674
  SSortSource*           ps = taosMemoryCalloc(1, sizeof(SSortSource));
180✔
675
  SGroupSortSourceParam* param = taosMemoryCalloc(1, sizeof(SGroupSortSourceParam));
180✔
676
  if (ps == NULL || param == NULL) {
180!
677
    taosMemoryFree(ps);
×
678
    taosMemoryFree(param);
×
679
    return terrno;
×
680
  }
681

682
  param->childOpInfo = pOperator->pDownstream[0];
180✔
683
  param->grpSortOpInfo = pInfo;
180✔
684

685
  ps->param = param;
180✔
686
  ps->onlyRef = false;
180✔
687
  code = tsortAddSource(pInfo->pCurrSortHandle, ps);
180✔
688
  if (code != 0) {
180!
689
    return code;
×
690
  }
691

692
  code = tsortOpen(pInfo->pCurrSortHandle);
180✔
693
  return code;
180✔
694
}
695

696
int32_t finishSortGroup(SOperatorInfo* pOperator) {
180✔
697
  SGroupSortOperatorInfo* pInfo = pOperator->info;
180✔
698

699
  SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pCurrSortHandle);
180✔
700

701
  pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod;
180✔
702
  pInfo->sortExecInfo.sortBuffer = sortExecInfo.sortBuffer;
180✔
703
  pInfo->sortExecInfo.loops += sortExecInfo.loops;
180✔
704
  pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes;
180✔
705
  pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes;
180✔
706

707
  tsortDestroySortHandle(pInfo->pCurrSortHandle);
180✔
708
  pInfo->pCurrSortHandle = NULL;
180✔
709

710
  return TSDB_CODE_SUCCESS;
180✔
711
}
712

713
int32_t doGroupSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
385✔
714
  QRY_PARAM_CHECK(pResBlock);
385!
715
  SExecTaskInfo*          pTaskInfo = pOperator->pTaskInfo;
385✔
716
  SGroupSortOperatorInfo* pInfo = pOperator->info;
385✔
717
  int32_t                 code = TSDB_CODE_SUCCESS;
385✔
718
  int32_t                 lino = 0;
385✔
719

720
  if (pOperator->status == OP_EXEC_DONE) {
385!
721
    return code;
×
722
  }
723

724
  code = pOperator->fpSet._openFn(pOperator);
385✔
725
  QUERY_CHECK_CODE(code, lino, _end);
385!
726

727
  if (!pInfo->hasGroupId) {
385✔
728
    pInfo->hasGroupId = true;
205✔
729

730
    pInfo->prefetchedSortInput = getNextBlockFromDownstream(pOperator, 0);
205✔
731
    if (pInfo->prefetchedSortInput == NULL) {
205✔
732
      setOperatorCompleted(pOperator);
25✔
733
      return code;
25✔
734
    }
735

736
    pInfo->currGroupId = pInfo->prefetchedSortInput->info.id.groupId;
180✔
737
    pInfo->childOpStatus = CHILD_OP_NEW_GROUP;
180✔
738
    code = beginSortGroup(pOperator);
180✔
739
    QUERY_CHECK_CODE(code, lino, _end);
180!
740
  }
741

742
  SSDataBlock* pBlock = NULL;
360✔
743
  while (pInfo->pCurrSortHandle != NULL) {
360!
744
    if (tsortIsClosed(pInfo->pCurrSortHandle)) {
360!
745
      code = TSDB_CODE_TSC_QUERY_CANCELLED;
×
746
      QUERY_CHECK_CODE(code, lino, _end);
×
747
    }
748

749
    // beginSortGroup would fetch all child blocks of pInfo->currGroupId;
750
    if (pInfo->childOpStatus == CHILD_OP_SAME_GROUP) {
360!
751
      code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
752
      QUERY_CHECK_CODE(code, lino, _end);
×
753
    }
754

755
    code = getGroupSortedBlockData(pInfo->pCurrSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity,
360✔
756
                                     pInfo->matchInfo.pList, pInfo, &pBlock);
757
    QUERY_CHECK_CODE(code, lino, _end);
360!
758
    if (pBlock != NULL) {
360✔
759
      pBlock->info.id.groupId = pInfo->currGroupId;
180✔
760
      pOperator->resultInfo.totalRows += pBlock->info.rows;
180✔
761
      *pResBlock = pBlock;
180✔
762
      return code;
180✔
763
    } else {
764
      if (pInfo->childOpStatus == CHILD_OP_NEW_GROUP) {
180!
765
        (void) finishSortGroup(pOperator);
×
766
        pInfo->currGroupId = pInfo->prefetchedSortInput->info.id.groupId;
×
767
        code = beginSortGroup(pOperator);
×
768
        QUERY_CHECK_CODE(code, lino, _end);
×
769
      } else if (pInfo->childOpStatus == CHILD_OP_FINISHED) {
180!
770
        (void) finishSortGroup(pOperator);
180✔
771
        setOperatorCompleted(pOperator);
180✔
772
        return code;
180✔
773
      }
774
    }
775
  }
776

777
_end:
×
778
  if (code != TSDB_CODE_SUCCESS) {
×
779
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
780
    pTaskInfo->code = code;
×
781
    T_LONG_JMP(pTaskInfo->env, code);
×
782
  }
783
  return code;
×
784
}
785

786
int32_t getGroupSortExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
×
787
  SGroupSortOperatorInfo* pInfo = (SGroupSortOperatorInfo*)pOptr->info;
×
788
  *pOptrExplain = &pInfo->sortExecInfo;
×
789
  *len = sizeof(SSortExecInfo);
×
790
  return TSDB_CODE_SUCCESS;
×
791
}
792

793
void destroyGroupSortOperatorInfo(void* param) {
205✔
794
  SGroupSortOperatorInfo* pInfo = (SGroupSortOperatorInfo*)param;
205✔
795
  blockDataDestroy(pInfo->binfo.pRes);
205✔
796
  pInfo->binfo.pRes = NULL;
205✔
797

798
  taosArrayDestroy(pInfo->pSortInfo);
205✔
799
  taosArrayDestroy(pInfo->matchInfo.pList);
205✔
800

801
  tsortDestroySortHandle(pInfo->pCurrSortHandle);
205✔
802
  pInfo->pCurrSortHandle = NULL;
205✔
803

804
  taosMemoryFreeClear(param);
205!
805
}
205✔
806

807
int32_t createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode,
205✔
808
                                    SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
809
  QRY_PARAM_CHECK(pOptrInfo);
205!
810
  int32_t code = 0;
205✔
811
  int32_t lino = 0;
205✔
812

813
  SGroupSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupSortOperatorInfo));
205✔
814
  SOperatorInfo*          pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
205✔
815
  if (pInfo == NULL || pOperator == NULL) {
205!
816
    code = terrno;
×
817
    goto _error;
×
818
  }
819

820
  SExprSupp*          pSup = &pOperator->exprSupp;
205✔
821
  SDataBlockDescNode* pDescNode = pSortPhyNode->node.pOutputDataBlockDesc;
205✔
822

823
  int32_t    numOfCols = 0;
205✔
824
  SExprInfo* pExprInfo = NULL;
205✔
825
  code = createExprInfo(pSortPhyNode->pExprs, NULL, &pExprInfo, &numOfCols);
205✔
826
  QUERY_CHECK_CODE(code, lino, _error);
205!
827

828
  pSup->pExprInfo = pExprInfo;
205✔
829
  pSup->numOfExprs = numOfCols;
205✔
830

831
  initResultSizeInfo(&pOperator->resultInfo, 1024);
205✔
832
  pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset,
205✔
833
                                                  &pTaskInfo->storageAPI.functionStore);
834
  QUERY_CHECK_NULL(pOperator->exprSupp.pCtx, code, lino, _error, terrno);
205!
835

836
  pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
205✔
837
  QUERY_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno);
205!
838

839
  code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
205✔
840
  TSDB_CHECK_CODE(code, lino, _error);
205!
841

842
  pInfo->binfo.inputTsOrder = pSortPhyNode->node.inputTsOrder;
205✔
843
  pInfo->binfo.outputTsOrder = pSortPhyNode->node.outputTsOrder;
205✔
844

845
  int32_t numOfOutputCols = 0;
205✔
846
  code = extractColMatchInfo(pSortPhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID,
205✔
847
                             &pInfo->matchInfo);
848
  TSDB_CHECK_CODE(code, lino, _error);
205!
849

850
  pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys);
205✔
851
  setOperatorInfo(pOperator, "GroupSortOperator", QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT, false, OP_NOT_OPENED, pInfo,
205✔
852
                  pTaskInfo);
853
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doGroupSort, NULL, destroyGroupSortOperatorInfo,
205✔
854
                                         optrDefaultBufFn, getGroupSortExplainExecInfo, optrDefaultGetNextExtFn, NULL);
855

856
  code = appendDownstream(pOperator, &downstream, 1);
205✔
857
  if (code != TSDB_CODE_SUCCESS) {
205!
858
    goto _error;
×
859
  }
860

861
  *pOptrInfo = pOperator;
205✔
862
  return TSDB_CODE_SUCCESS;
205✔
863

864
_error:
×
865
  pTaskInfo->code = code;
×
866
  if (pInfo != NULL) {
×
867
    destroyGroupSortOperatorInfo(pInfo);
×
868
  }
869
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
870
  return code;
×
871
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc