• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4933

20 Jan 2026 10:44AM UTC coverage: 66.671% (+0.03%) from 66.646%
#4933

push

travis-ci

web-flow
merge: from main to 3.0 #34340

73 of 178 new or added lines in 9 files covered. (41.01%)

1199 existing lines in 124 files now uncovered.

203121 of 304663 relevant lines covered (66.67%)

132228377.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.49
/source/libs/executor/src/tsort.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "query.h"
17
#include "tcommon.h"
18

19
#include "executil.h"
20
#include "tcompare.h"
21
#include "tdatablock.h"
22
#include "tdef.h"
23
#include "theap.h"
24
#include "tlosertree.h"
25
#include "tpagedbuf.h"
26
#include "tsimplehash.h"
27
#include "tsort.h"
28
#include "tutil.h"
29

30
#define AllocatedTupleType  0
31
#define ReferencedTupleType 1  // tuple references to one row in pDataBlock
32

33
struct STupleHandle {
34
  SSDataBlock* pBlock;
35
  int32_t      rowIndex;
36
};
37

38
typedef struct SSortMemFileRegion {
39
  int64_t fileOffset;
40
  int32_t regionSize;
41

42
  int32_t bufRegOffset;
43
  int32_t bufLen;
44
  char*   buf;
45
} SSortMemFileRegion;
46

47
typedef struct SSortMemFile {
48
  char*   writeBuf;
49
  int32_t writeBufSize;
50
  int64_t writeFileOffset;
51

52
  int32_t currRegionId;
53
  int32_t currRegionOffset;
54
  bool    bRegionDirty;
55

56
  SArray* aFileRegions;
57
  int32_t cacheSize;
58
  int32_t blockSize;
59

60
  FILE* pTdFile;
61
  char  memFilePath[PATH_MAX];
62
} SSortMemFile;
63

64
struct SSortHandle {
65
  int32_t        type;
66
  int32_t        pageSize;
67
  int32_t        numOfPages;
68
  SDiskbasedBuf* pBuf;
69
  SArray*        pSortInfo;
70
  SArray*        pOrderedSource;
71
  int32_t        loops;
72
  uint64_t       sortElapsed;
73
  int64_t        startTs;
74
  uint64_t       totalElapsed;
75

76
  uint64_t      pqMaxRows;
77
  uint32_t      pqMaxTupleLength;
78
  uint32_t      pqSortBufSize;
79
  bool          forceUsePQSort;
80
  BoundedQueue* pBoundedQueue;
81
  uint32_t      tmpRowIdx;
82
  int64_t       mergeLimit;
83
  int64_t       currMergeLimitTs;
84

85
  int32_t           sourceId;
86
  SSDataBlock*      pDataBlock;
87
  SMsortComparParam cmpParam;
88
  int32_t           numOfCompletedSources;
89
  bool              opened;
90
  int8_t            closed;
91
  const char*       idStr;
92
  bool              inMemSort;
93
  bool              needAdjust;
94
  STupleHandle      tupleHandle;
95
  void*             param;
96
  void (*beforeFp)(SSDataBlock* pBlock, void* param);
97

98
  _sort_fetch_block_fn_t  fetchfp;
99
  _sort_merge_compar_fn_t comparFn;
100
  SMultiwayMergeTreeInfo* pMergeTree;
101

102
  bool singleTableMerge;
103

104
  bool (*abortCheckFn)(void* param);
105
  void* abortCheckParam;
106

107
  bool          bSortByRowId;
108
  SSortMemFile* pExtRowsMemFile;
109
  int32_t       extRowBytes;
110
  int32_t       extRowsPageSize;
111
  int32_t       extRowsMemSize;
112
  int32_t       srcTsSlotId;
113
  SArray*       aExtRowsOrders;
114
  bool          bSortPk;
115
  void (*mergeLimitReachedFn)(uint64_t tableUid, void* param);
116
  void* mergeLimitReachedParam;
117
};
118

119
static void    destroySortMemFile(SSortHandle* pHandle);
120
static int32_t getRowBufFromExtMemFile(SSortHandle* pHandle, int32_t regionId, int32_t tupleOffset, int32_t rowLen,
121
                                       char** ppRow, bool* pFreeRow);
122
void           tsortSetSingleTableMerge(SSortHandle* pHandle) { pHandle->singleTableMerge = true; }
28,909,301✔
123

124
void tsortSetAbortCheckFn(SSortHandle* pHandle, bool (*checkFn)(void*), void* param) {
41,780,113✔
125
  pHandle->abortCheckFn = checkFn;
41,780,113✔
126
  pHandle->abortCheckParam = param;
41,821,008✔
127
}
41,788,247✔
128

129
static int32_t msortComparFn(const void* pLeft, const void* pRight, void* param);
130

131
// | offset[0] | offset[1] |....| nullbitmap | data |...|
132
static void* createTuple(uint32_t columnNum, uint32_t tupleLen) {
2,147,483,647✔
133
  uint32_t totalLen = sizeof(uint32_t) * columnNum + BitmapLen(columnNum) + tupleLen;
2,147,483,647✔
134
  return taosMemoryCalloc(1, totalLen);
2,147,483,647✔
135
}
136

137
static void destoryAllocatedTuple(void* t) { taosMemoryFree(t); }
2,147,483,647✔
138

139
#define tupleOffset(tuple, colIdx)                ((uint32_t*)(tuple + sizeof(uint32_t) * colIdx))
140
#define tupleSetOffset(tuple, colIdx, offset)     (*tupleOffset(tuple, colIdx) = offset)
141
#define tupleSetNull(tuple, colIdx, colNum)       colDataSetNull_f((char*)tuple + sizeof(uint32_t) * colNum, colIdx)
142
#define tupleColIsNull(tuple, colIdx, colNum)     BMIsNull((char*)tuple + sizeof(uint32_t) * colNum, colIdx)
143
#define tupleGetDataStartOffset(colNum)           (sizeof(uint32_t) * colNum + BitmapLen(colNum))
144
#define tupleSetData(tuple, offset, data, length) memcpy(tuple + offset, data, length)
145

146
/**
147
 * @param t the tuple pointer addr, if realloced, *t is changed to the new addr
148
 * @param offset copy data into pTuple start from offset
149
 * @param colIndex the columnIndex, for setting null bitmap
150
 * @return the next offset to add field
151
 * */
152
static inline int32_t tupleAddField(char** t, uint32_t colNum, uint32_t offset, uint32_t colIdx, void* data,
2,147,483,647✔
153
                                    size_t length, bool isNull, uint32_t tupleLen, uint32_t* pOffset) {
154
  int32_t code = TSDB_CODE_SUCCESS;
2,147,483,647✔
155
  int32_t lino = 0;
2,147,483,647✔
156
  tupleSetOffset(*t, colIdx, offset);
2,147,483,647✔
157

158
  if (isNull) {
2,147,483,647✔
159
    tupleSetNull(*t, colIdx, colNum);
403,674,659✔
160
  } else {
161
    if (offset + length > tupleLen + tupleGetDataStartOffset(colNum)) {
2,147,483,647✔
162
      void* px = taosMemoryRealloc(*t, offset + length);
×
163
      QUERY_CHECK_NULL(px, code, lino, _end, terrno);
×
164

165
      *t = px;
×
166
    }
167
    tupleSetData(*t, offset, data, length);
2,147,483,647✔
168
  }
169

170
  (*pOffset) = offset + length;
2,147,483,647✔
171

172
_end:
2,147,483,647✔
173
  if (code != TSDB_CODE_SUCCESS) {
2,147,483,647✔
174
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
175
  }
176
  return code;
2,147,483,647✔
177
}
178

179
static void* tupleGetField(char* t, uint32_t colIdx, uint32_t colNum) {
2,147,483,647✔
180
  if (tupleColIsNull(t, colIdx, colNum)) {
2,147,483,647✔
181
    return NULL;
2,147,483,647✔
182
  }
183

184
  return t + *tupleOffset(t, colIdx);
2,147,483,647✔
185
}
186

187
int32_t tsortGetSortedDataBlock(const SSortHandle* pSortHandle, SSDataBlock** pBlock) {
125,917,787✔
188
  *pBlock = NULL;
125,917,787✔
189
  if (pSortHandle->pDataBlock == NULL) {
125,921,542✔
190
    return TSDB_CODE_SUCCESS;
16,704,889✔
191
  }
192
  return createOneDataBlock(pSortHandle->pDataBlock, false, pBlock);
109,217,991✔
193
}
194

195
typedef struct TupleDesc {
196
  uint8_t type;
197
  char*   data;  // if type is AllocatedTuple, then points to the created tuple, otherwise points to the DataBlock
198
} TupleDesc;
199

200
typedef struct ReferencedTuple {
201
  TupleDesc desc;
202
  size_t    rowIndex;
203
} ReferencedTuple;
204

205
static int32_t createAllocatedTuple(SSDataBlock* pBlock, size_t colNum, uint32_t tupleLen, size_t rowIdx,
2,147,483,647✔
206
                                    TupleDesc** pDesc) {
207
  int32_t    code = TSDB_CODE_SUCCESS;
2,147,483,647✔
208
  TupleDesc* t = taosMemoryCalloc(1, sizeof(TupleDesc));
2,147,483,647✔
209
  if (t == NULL) {
2,147,483,647✔
210
    return terrno;
×
211
  }
212

213
  void* pTuple = createTuple(colNum, tupleLen);
2,147,483,647✔
214
  if (!pTuple) {
2,147,483,647✔
215
    taosMemoryFree(t);
×
216
    return terrno;
×
217
  }
218

219
  size_t   colLen = 0;
2,147,483,647✔
220
  uint32_t offset = tupleGetDataStartOffset(colNum);
2,147,483,647✔
221
  for (size_t colIdx = 0; colIdx < colNum; ++colIdx) {
2,147,483,647✔
222
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, colIdx);
2,147,483,647✔
223
    if (pCol == NULL) {
2,147,483,647✔
224
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
225
      return terrno;
×
226
    }
227

228
    if (colDataIsNull_s(pCol, rowIdx)) {
2,147,483,647✔
229
      code = tupleAddField((char**)&pTuple, colNum, offset, colIdx, 0, 0, true, tupleLen, &offset);
403,672,366✔
230
    } else {
231
      colLen = colDataGetRowLength(pCol, rowIdx);
2,147,483,647✔
232
      code = tupleAddField((char**)&pTuple, colNum, offset, colIdx, colDataGetData(pCol, rowIdx), colLen, false,
2,147,483,647✔
233
                           tupleLen, &offset);
234
    }
235
    if (code != TSDB_CODE_SUCCESS) {
2,147,483,647✔
236
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
237
      return code;
×
238
    }
239
  }
240

241
  t->type = AllocatedTupleType;
2,147,483,647✔
242
  t->data = pTuple;
2,147,483,647✔
243

244
  *pDesc = t;
2,147,483,647✔
245
  return code;
2,147,483,647✔
246
}
247

248
int32_t tupleDescGetField(const TupleDesc* pDesc, int32_t colIdx, uint32_t colNum, void** pResult) {
2,147,483,647✔
249
  *pResult = NULL;
2,147,483,647✔
250

251
  if (pDesc->type == ReferencedTupleType) {
2,147,483,647✔
252
    ReferencedTuple* pRefTuple = (ReferencedTuple*)pDesc;
2,147,483,647✔
253
    SColumnInfoData* pCol = taosArrayGet(((SSDataBlock*)pDesc->data)->pDataBlock, colIdx);
2,147,483,647✔
254
    if (pCol == NULL) {
2,147,483,647✔
255
      return terrno;
×
256
    }
257

258
    if (colDataIsNull_s(pCol, pRefTuple->rowIndex)) {
2,147,483,647✔
259
      return TSDB_CODE_SUCCESS;
2,147,483,647✔
260
    }
261

262
    *pResult = colDataGetData(pCol, pRefTuple->rowIndex);
2,147,483,647✔
263
  } else {
264
    *pResult = tupleGetField(pDesc->data, colIdx, colNum);
2,147,483,647✔
265
  }
266

267
  return 0;
2,147,483,647✔
268
}
269

270
void destroyTuple(void* t) {
2,147,483,647✔
271
  TupleDesc* pDesc = t;
2,147,483,647✔
272
  if (pDesc != NULL && pDesc->type == AllocatedTupleType) {
2,147,483,647✔
273
    destoryAllocatedTuple(pDesc->data);
2,147,483,647✔
274
    taosMemoryFree(pDesc);
2,147,483,647✔
275
  }
276
}
2,147,483,647✔
277

278
/**
279
 *
280
 * @param type
281
 * @return
282
 */
283
int32_t tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t pageSize, int32_t numOfPages,
122,992,480✔
284
                              SSDataBlock* pBlock, const char* idstr, uint64_t pqMaxRows, uint32_t pqMaxTupleLength,
285
                              uint32_t pqSortBufSize, SSortHandle** pHandle) {
286
  int32_t code = 0;
122,992,480✔
287
  int32_t lino = 0;
122,992,480✔
288

289
  QRY_PARAM_CHECK(pHandle);
122,992,480✔
290
  SSortHandle* pSortHandle = taosMemoryCalloc(1, sizeof(SSortHandle));
123,067,087✔
291
  QUERY_CHECK_NULL(pSortHandle, code, lino, _err, terrno);
122,832,744✔
292

293
  pSortHandle->type = type;
122,832,744✔
294
  pSortHandle->pageSize = pageSize;
122,841,717✔
295
  pSortHandle->numOfPages = numOfPages;
122,927,254✔
296
  pSortHandle->pSortInfo = taosArrayDup(pSortInfo, NULL);
122,940,948✔
297
  QUERY_CHECK_NULL(pSortHandle->pSortInfo, code, lino, _err, terrno);
123,007,485✔
298

299
  pSortHandle->loops = 0;
122,986,559✔
300
  pSortHandle->pqMaxTupleLength = pqMaxTupleLength;
123,010,658✔
301
  if (pqMaxRows != 0) {
122,963,087✔
302
    pSortHandle->pqSortBufSize = pqSortBufSize;
56,336,030✔
303
    pSortHandle->pqMaxRows = pqMaxRows;
56,358,428✔
304
  }
305

306
  pSortHandle->forceUsePQSort = false;
122,942,705✔
307
  if (pBlock != NULL) {
122,963,682✔
308
    code = createOneDataBlock(pBlock, false, &pSortHandle->pDataBlock);
66,548,844✔
309
    QUERY_CHECK_CODE(code, lino, _err);
66,647,378✔
310
  }
311

312
  pSortHandle->mergeLimit = -1;
123,062,216✔
313

314
  pSortHandle->pOrderedSource = taosArrayInit(4, POINTER_BYTES);
123,065,605✔
315
  QUERY_CHECK_NULL(pSortHandle->pOrderedSource, code, lino, _err, terrno);
122,924,358✔
316

317
  pSortHandle->cmpParam.orderInfo = pSortInfo;
122,947,136✔
318
  pSortHandle->cmpParam.cmpGroupId = false;
122,893,846✔
319
  pSortHandle->cmpParam.sortType = type;
123,027,832✔
320

321
  if (type == SORT_BLOCK_TS_MERGE) {
123,024,752✔
322
    SBlockOrderInfo* pTsOrder = TARRAY_GET_ELEM(pSortInfo, 0);
41,766,937✔
323
    pSortHandle->cmpParam.tsSlotId = pTsOrder->slotId;
41,755,173✔
324
    pSortHandle->cmpParam.tsOrder = pTsOrder->order;
41,832,999✔
325
    pSortHandle->cmpParam.cmpTsFn = pTsOrder->compFn;
41,718,235✔
326
    if (taosArrayGetSize(pSortHandle->pSortInfo) == 2) {
41,827,727✔
327
      pSortHandle->cmpParam.pPkOrder = taosArrayGet(pSortHandle->pSortInfo, 1);
8,553,852✔
328
      pSortHandle->bSortPk = true;
8,554,434✔
329
    } else {
330
      pSortHandle->cmpParam.pPkOrder = NULL;
33,132,903✔
331
      pSortHandle->bSortPk = false;
33,228,335✔
332
    }
333
  }
334
  tsortSetComparFp(pSortHandle, msortComparFn);
123,039,055✔
335

336
  if (idstr != NULL) {
122,975,896✔
337
    pSortHandle->idStr = taosStrdup(idstr);
122,994,428✔
338
    QUERY_CHECK_NULL(pSortHandle->idStr, code, lino, _err, terrno);
122,999,054✔
339
  }
340

341
  *pHandle = pSortHandle;
122,986,622✔
342
  return code;
122,979,430✔
343

344
_err:
×
345
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
346
  if (pSortHandle) {
×
347
    tsortDestroySortHandle(pSortHandle);
×
348
  }
349
  return code;
×
350
}
351

352
static int32_t sortComparCleanup(SMsortComparParam* cmpParam) {
×
353
  // NOTICE: pSource may be, if it is SORT_MULTISOURCE_MERGE
354
  for (int32_t i = 0; i < cmpParam->numOfSources; ++i) {
×
355
    SSortSource* pSource = cmpParam->pSources[i];
×
356
    blockDataDestroy(pSource->src.pBlock);
×
357
    if (pSource->pageIdList) {
×
358
      taosArrayDestroy(pSource->pageIdList);
×
359
    }
360
    taosMemoryFreeClear(pSource);
×
361
    cmpParam->pSources[i] = NULL;
×
362
  }
363

364
  cmpParam->numOfSources = 0;
×
365
  return TSDB_CODE_SUCCESS;
×
366
}
367

368
void tsortClearOrderedSource(SArray* pOrderedSource, int64_t* fetchUs, int64_t* fetchNum) {
135,939,578✔
369
  for (size_t i = 0; i < taosArrayGetSize(pOrderedSource); i++) {
255,620,439✔
370
    SSortSource** pSource = taosArrayGet(pOrderedSource, i);
119,706,327✔
371
    if (NULL == *pSource) {
119,707,540✔
372
      continue;
×
373
    }
374

375
    if (fetchUs) {
119,712,262✔
376
      *fetchUs += (*pSource)->fetchUs;
106,830,707✔
377
      *fetchNum += (*pSource)->fetchNum;
106,829,719✔
378
    }
379

380
    // release pageIdList
381
    if ((*pSource)->pageIdList) {
119,708,613✔
382
      taosArrayDestroy((*pSource)->pageIdList);
11,524,805✔
383
      (*pSource)->pageIdList = NULL;
11,519,225✔
384
    }
385
    if ((*pSource)->param && !(*pSource)->onlyRef) {
119,693,231✔
386
      taosMemoryFree((*pSource)->param);
41,845,272✔
387
      (*pSource)->param = NULL;
41,842,616✔
388
    }
389

390
    if (!(*pSource)->onlyRef && (*pSource)->src.pBlock) {
119,692,321✔
391
      blockDataDestroy((*pSource)->src.pBlock);
27,361✔
392
      (*pSource)->src.pBlock = NULL;
27,361✔
393
    }
394

395
    taosMemoryFreeClear(*pSource);
119,716,836✔
396
  }
397

398
  taosArrayClear(pOrderedSource);
135,921,804✔
399
}
135,925,417✔
400

401
void tsortDestroySortHandle(SSortHandle* pSortHandle) {
157,503,323✔
402
  if (pSortHandle == NULL) {
157,503,323✔
403
    return;
34,436,109✔
404
  }
405

406
  tsortClose(pSortHandle);
123,067,214✔
407
  if (pSortHandle->pMergeTree != NULL) {
123,081,718✔
408
    tMergeTreeDestroy(&pSortHandle->pMergeTree);
34,800,376✔
409
  }
410

411
  destroyDiskbasedBuf(pSortHandle->pBuf);
123,076,103✔
412
  taosMemoryFreeClear(pSortHandle->idStr);
123,046,008✔
413
  blockDataDestroy(pSortHandle->pDataBlock);
123,052,875✔
414

415
  if (pSortHandle->pBoundedQueue) destroyBoundedQueue(pSortHandle->pBoundedQueue);
123,054,522✔
416

417
  int64_t fetchUs = 0, fetchNum = 0;
123,061,264✔
418
  tsortClearOrderedSource(pSortHandle->pOrderedSource, &fetchUs, &fetchNum);
123,051,601✔
419
  qDebug("all source fetch time: %" PRId64 "us num:%" PRId64 " %s", fetchUs, fetchNum, pSortHandle->idStr);
123,050,789✔
420

421
  taosArrayDestroy(pSortHandle->pOrderedSource);
123,058,126✔
422
  if (pSortHandle->pExtRowsMemFile != NULL) {
123,040,583✔
423
    destroySortMemFile(pSortHandle);
2,865,699✔
424
  }
425

426
  taosArrayDestroy(pSortHandle->pSortInfo);
123,041,513✔
427
  taosArrayDestroy(pSortHandle->aExtRowsOrders);
123,051,488✔
428
  pSortHandle->aExtRowsOrders = NULL;
123,052,452✔
429
  taosMemoryFreeClear(pSortHandle);
123,052,064✔
430
}
431

432
int32_t tsortAddSource(SSortHandle* pSortHandle, void* pSource) {
151,072,715✔
433
  void* p = taosArrayPush(pSortHandle->pOrderedSource, &pSource);
151,072,715✔
434
  return (p != NULL) ? TSDB_CODE_SUCCESS : terrno;
151,117,881✔
435
}
436

437
static int32_t doAddNewExternalMemSource(SDiskbasedBuf* pBuf, SArray* pAllSources, SSDataBlock* pBlock,
11,523,875✔
438
                                         int32_t* sourceId, SArray* pPageIdList) {
439
  int32_t      code = 0;
11,523,875✔
440
  int32_t      lino = 0;
11,523,875✔
441
  SSortSource* pSource = taosMemoryCalloc(1, sizeof(SSortSource));
11,523,875✔
442
  QUERY_CHECK_NULL(pSource, code, lino, _err, terrno);
11,516,908✔
443

444
  pSource->src.pBlock = pBlock;
11,516,908✔
445
  pSource->pageIdList = pPageIdList;
11,523,875✔
446

447
  SSortSource** p = taosArrayPush(pAllSources, &pSource);
11,523,758✔
448
  QUERY_CHECK_NULL(p, code, lino, _err, terrno);
11,523,758✔
449
  pSource = NULL;
11,523,758✔
450

451
  (*sourceId) += 1;
11,523,758✔
452

453
  int32_t rowSize = blockDataGetSerialRowSize((*p)->src.pBlock);
11,525,278✔
454

455
  // The value of numOfRows must be greater than 0, which is guaranteed by the previous memory allocation
456
  int32_t numOfRows =
34,572,058✔
457
      (getBufPageSize(pBuf) - blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock))) / rowSize;
11,525,238✔
458
  QUERY_CHECK_CONDITION((numOfRows > 0), code, lino, _err, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
11,523,410✔
459

460
  code = blockDataEnsureCapacity((*p)->src.pBlock, numOfRows);
11,523,410✔
461
  QUERY_CHECK_CODE(code, lino, _err);
11,520,251✔
462

463
  return code;
11,520,251✔
464

465
_err:
×
466
  if (pSource) taosMemoryFree(pSource);
×
467
  qError("sort failed at %s:%d since %s", __func__, lino, tstrerror(code));
×
468
  return code;
×
469
}
470

471
static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
42,037✔
472
  int32_t start = 0;
42,037✔
473

474
  if (pHandle->pBuf == NULL) {
42,037✔
475
    if (!osTempSpaceAvailable()) {
15,286✔
476
      terrno = TSDB_CODE_NO_DISKSPACE;
×
477
      qError("Add to buf failed since %s, tempDir:%s", terrstr(), tsTempDir);
×
478
      return terrno;
×
479
    }
480

481
    int32_t code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize,
15,286✔
482
                                      "sortExternalBuf", tsTempDir);
483
    if (code != TSDB_CODE_SUCCESS) {
15,286✔
484
      return code;
×
485
    }
486
    dBufSetPrintInfo(pHandle->pBuf);
15,286✔
487
  }
488

489
  SArray* pPageIdList = taosArrayInit(4, sizeof(int32_t));
42,037✔
490
  if (pPageIdList == NULL) {
42,037✔
491
    return terrno;
×
492
  }
493

494
  while (start < pDataBlock->info.rows) {
40,411,225✔
495
    int32_t stop = 0;
40,369,188✔
496

497
    int32_t code = blockDataSplitRows(pDataBlock, pDataBlock->info.hasVarCol, start, &stop, pHandle->pageSize);
40,369,188✔
498
    if (code) {
40,369,188✔
499
      taosArrayDestroy(pPageIdList);
×
500
      return code;
×
501
    }
502

503
    SSDataBlock* p = NULL;
40,369,188✔
504
    code = blockDataExtractBlock(pDataBlock, start, stop - start + 1, &p);
40,369,188✔
505
    if (code) {
40,369,188✔
506
      taosArrayDestroy(pPageIdList);
×
507
      return code;
×
508
    }
509

510
    int32_t pageId = -1;
40,369,188✔
511
    void*   pPage = getNewBufPage(pHandle->pBuf, &pageId);
40,369,188✔
512
    if (pPage == NULL) {
40,369,188✔
513
      taosArrayDestroy(pPageIdList);
×
514
      blockDataDestroy(p);
×
515
      return terrno;
×
516
    }
517

518
    void* px = taosArrayPush(pPageIdList, &pageId);
40,369,188✔
519
    if (px == NULL) {
40,369,188✔
520
      taosArrayDestroy(pPageIdList);
×
521
      blockDataDestroy(p);
×
522
      return terrno;
×
523
    }
524

525
    int32_t size = blockDataGetSize(p) + sizeof(int32_t) + taosArrayGetSize(p->pDataBlock) * sizeof(int32_t);
40,369,188✔
526
    if (size > getBufPageSize(pHandle->pBuf)) {
40,369,188✔
527
      qError("sort failed at: %s:%d", __func__, __LINE__);
×
528
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
529
    }
530

531
    code = blockDataToBuf(pPage, p);
40,369,188✔
532
    if (code) {
40,369,188✔
533
      return code;
×
534
    }
535

536
    setBufPageDirty(pPage, true);
40,369,188✔
537
    releaseBufPage(pHandle->pBuf, pPage);
40,369,188✔
538

539
    blockDataDestroy(p);
40,369,188✔
540
    start = stop + 1;
40,369,188✔
541
  }
542

543
  blockDataCleanup(pDataBlock);
42,037✔
544

545
  SSDataBlock* pBlock = NULL;
42,037✔
546
  int32_t      code = createOneDataBlock(pDataBlock, false, &pBlock);
42,037✔
547
  if (code) {
42,037✔
548
    return code;
×
549
  }
550

551
  code = doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId, pPageIdList);
42,037✔
552
  if (code) {
42,037✔
553
    blockDataDestroy(pBlock);
×
554
    taosArrayDestroy(pPageIdList);
×
555
  }
556
  return code;
42,037✔
557
}
558

559
static void setCurrentSourceDone(SSortSource* pSource, SSortHandle* pHandle) {
40,221,076✔
560
  pSource->src.rowIndex = -1;
40,221,076✔
561
  ++pHandle->numOfCompletedSources;
40,222,970✔
562
}
40,222,136✔
563

564
static int32_t sortComparInit(SMsortComparParam* pParam, SArray* pSources, int32_t startIndex, int32_t endIndex,
34,803,304✔
565
                              SSortHandle* pHandle) {
566
  pParam->pSources = taosArrayGet(pSources, startIndex);
34,803,304✔
567
  if (pParam->pSources == NULL) {
34,801,928✔
568
    return terrno;
×
569
  }
570

571
  pParam->numOfSources = (endIndex - startIndex + 1);
34,797,621✔
572
  int32_t code = 0;
34,801,394✔
573

574
  // multi-pass internal merge sort is required
575
  if (pHandle->pBuf == NULL) {
34,801,394✔
576
    if (!osTempSpaceAvailable()) {
23,307,073✔
577
      code = terrno = TSDB_CODE_NO_DISKSPACE;
×
578
      qError("Sort compare init failed since %s, tempDir:%s, idStr:%s", terrstr(), tsTempDir, pHandle->idStr);
×
579
      return code;
×
580
    }
581

582
    code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize,
23,307,286✔
583
                              "sortComparInit", tsTempDir);
584
    if (code != TSDB_CODE_SUCCESS) {
23,307,641✔
585
      return code;
×
586
    } else {
587
      dBufSetPrintInfo(pHandle->pBuf);
23,307,641✔
588
    }
589
  }
590

591
  if (pHandle->type == SORT_SINGLESOURCE_SORT) {
34,802,072✔
592
    for (int32_t i = 0; i < pParam->numOfSources; ++i) {
23,009,984✔
593
      SSortSource* pSource = pParam->pSources[i];
11,524,340✔
594

595
      // set current source is done
596
      if (taosArrayGetSize(pSource->pageIdList) == 0) {
11,520,601✔
597
        setCurrentSourceDone(pSource, pHandle);
×
598
        continue;
×
599
      }
600

601
      int32_t* pPgId = taosArrayGet(pSource->pageIdList, pSource->pageIndex);
11,514,895✔
602
      if (pPgId == NULL) {
11,508,385✔
603
        return terrno;
×
604
      }
605

606
      void* pPage = getBufPage(pHandle->pBuf, *pPgId);
11,508,385✔
607
      if (NULL == pPage) {
11,519,390✔
608
        return terrno;
×
609
      }
610

611
      code = blockDataFromBuf(pSource->src.pBlock, pPage);
11,519,390✔
612
      if (code != TSDB_CODE_SUCCESS) {
11,513,750✔
613
        terrno = code;
×
614
        return code;
×
615
      }
616

617
      releaseBufPage(pHandle->pBuf, pPage);
11,513,750✔
618
    }
619
  } else {
620
    qDebug("start init for the multiway merge sort, %s", pHandle->idStr);
23,305,762✔
621
    int64_t st = taosGetTimestampUs();
23,307,641✔
622

623
    for (int32_t i = 0; i < pParam->numOfSources; ++i) {
76,189,427✔
624
      SSortSource* pSource = pParam->pSources[i];
52,882,053✔
625
      TAOS_CHECK_RETURN(pHandle->fetchfp(pSource->param, &pSource->src.pBlock));
52,881,440✔
626

627
      // set current source is done
628
      if (pSource->src.pBlock == NULL) {
52,881,786✔
629
        setCurrentSourceDone(pSource, pHandle);
11,363,811✔
630
      }
631
    }
632

633
    int64_t et = taosGetTimestampUs();
23,307,019✔
634
    qDebug("init for merge sort completed, elapsed time:%.2f ms, %s", (et - st) / 1000.0, pHandle->idStr);
23,307,019✔
635
  }
636

637
  return code;
34,797,550✔
638
}
639

640
static int32_t appendOneRowToDataBlock(SSDataBlock* pBlock, const SSDataBlock* pSource, int32_t* rowIndex) {
2,147,483,647✔
641
  int32_t code = 0;
2,147,483,647✔
642

643
  for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
2,147,483,647✔
644
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
2,147,483,647✔
645
    if (pColInfo == NULL) {
2,147,483,647✔
646
      return terrno;
×
647
    }
648

649
    SColumnInfoData* pSrcColInfo = taosArrayGet(pSource->pDataBlock, i);
2,147,483,647✔
650
    if (pSrcColInfo == NULL) {
2,147,483,647✔
651
      return terrno;
×
652
    }
653

654
    bool isNull = colDataIsNull(pSrcColInfo, pSource->info.rows, *rowIndex, NULL);
2,147,483,647✔
655
    if (isNull) {
2,147,483,647✔
656
      code = colDataSetVal(pColInfo, pBlock->info.rows, NULL, true);
638,455,368✔
657
      if (code) {
638,461,164✔
658
        return code;
×
659
      }
660
    } else {
661
      if (!pSrcColInfo->pData) continue;
2,147,483,647✔
662
      char* pData = colDataGetData(pSrcColInfo, *rowIndex);
2,147,483,647✔
663
      code = colDataSetVal(pColInfo, pBlock->info.rows, pData, false);
2,147,483,647✔
664
      if (code) {
2,147,483,647✔
665
        return code;
×
666
      }
667
    }
668
  }
669

670
  pBlock->info.rows += 1;
2,147,483,647✔
671
  *rowIndex += 1;
2,147,483,647✔
672
  return code;
2,147,483,647✔
673
}
674

675
static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeTreeInfo* pTree, SSortHandle* pHandle,
2,147,483,647✔
676
                                           int32_t* numOfCompleted) {
677
  /*
678
   * load a new SDataBlock into memory of a given intermediate data-set source,
679
   * since it's last record in buffer has been chosen to be processed, as the winner of loser-tree
680
   */
681
  if (pSource->src.rowIndex >= pSource->src.pBlock->info.rows) {
2,147,483,647✔
682
    pSource->src.rowIndex = 0;
178,774,593✔
683

684
    if (pHandle->type == SORT_SINGLESOURCE_SORT) {
178,774,593✔
685
      pSource->pageIndex++;
74,070,673✔
686
      if (pSource->pageIndex >= taosArrayGetSize(pSource->pageIdList)) {
74,068,813✔
687
        qDebug("adjust merge tree. %d source completed %d", *numOfCompleted, pSource->pageIndex);
11,494,708✔
688
        (*numOfCompleted) += 1;
11,494,951✔
689
        pSource->src.rowIndex = -1;
11,495,119✔
690
        pSource->pageIndex = -1;
11,496,979✔
691
        blockDataDestroy(pSource->src.pBlock);
11,493,259✔
692
        pSource->src.pBlock = NULL;
11,493,481✔
693
      } else {
694
        if (pSource->pageIndex % 512 == 0) {
62,573,694✔
695
          qDebug("begin source %p page %d", pSource, pSource->pageIndex);
75,421✔
696
        }
697

698
        int32_t* pPgId = taosArrayGet(pSource->pageIdList, pSource->pageIndex);
62,573,694✔
699
        if (pPgId == NULL) {
62,573,694✔
700
          return terrno;
×
701
        }
702

703
        void* pPage = getBufPage(pHandle->pBuf, *pPgId);
62,573,694✔
704
        if (pPage == NULL) {
62,573,694✔
705
          qError("failed to get buffer, code:%s", tstrerror(terrno));
×
706
          return terrno;
×
707
        }
708

709
        int32_t code = blockDataFromBuf(pSource->src.pBlock, pPage);
62,573,694✔
710
        if (code != TSDB_CODE_SUCCESS) {
62,573,694✔
711
          return code;
×
712
        }
713
        releaseBufPage(pHandle->pBuf, pPage);
62,573,694✔
714
      }
715
    } else {
716
      int64_t st = taosGetTimestampUs();
104,702,060✔
717
      TAOS_CHECK_RETURN(pHandle->fetchfp(((SSortSource*)pSource)->param, &pSource->src.pBlock));
104,702,060✔
718
      pSource->fetchUs += taosGetTimestampUs() - st;
104,702,060✔
719
      pSource->fetchNum++;
104,702,060✔
720
      if (pSource->src.pBlock == NULL) {
104,702,060✔
721
        (*numOfCompleted) += 1;
39,865,097✔
722
        pSource->src.rowIndex = -1;
39,865,097✔
723
        qDebug("adjust merge tree. %d source completed", *numOfCompleted);
39,865,097✔
724
      }
725
    }
726
  }
727

728
  /*
729
   * Adjust loser tree otherwise, according to new candidate data
730
   * if the loser tree is rebuild completed, we do not need to adjust
731
   */
732
  int32_t leafNodeIndex = tMergeTreeGetAdjustIndex(pTree);
2,147,483,647✔
733

734
#ifdef _DEBUG_VIEW
735
  printf("before adjust:\t");
736
  tMergeTreePrint(pTree);
737
#endif
738

739
  int32_t code = tMergeTreeAdjust(pTree, leafNodeIndex);
2,147,483,647✔
740
  if (TSDB_CODE_SUCCESS != code) {
2,147,483,647✔
741
    return code;
×
742
  }
743

744
#ifdef _DEBUG_VIEW
745
  printf("\nafter adjust:\t");
746
  tMergeTreePrint(pTree);
747
#endif
748
  return TSDB_CODE_SUCCESS;
2,147,483,647✔
749
}
750

751
static int32_t getSortedBlockDataInner(SSortHandle* pHandle, SMsortComparParam* cmpParam, int32_t capacity,
×
752
                                       SSDataBlock** pRes) {
753
  *pRes = NULL;
×
754

755
  int32_t code = 0;
×
756
  blockDataCleanup(pHandle->pDataBlock);
×
757

758
  while (1) {
×
759
    if (cmpParam->numOfSources == pHandle->numOfCompletedSources) {
×
760
      break;
×
761
    }
762

763
    int32_t index = tMergeTreeGetChosenIndex(pHandle->pMergeTree);
×
764

765
    SSortSource* pSource = (*cmpParam).pSources[index];
×
766
    code = appendOneRowToDataBlock(pHandle->pDataBlock, pSource->src.pBlock, &pSource->src.rowIndex);
×
767
    if (code) {
×
768
      return code;
×
769
    }
770

771
    code = adjustMergeTreeForNextTuple(pSource, pHandle->pMergeTree, pHandle, &pHandle->numOfCompletedSources);
×
772
    if (code != TSDB_CODE_SUCCESS) {
×
773
      return code;
×
774
    }
775

776
    if (pHandle->pDataBlock->info.rows >= capacity) {
×
777
      *pRes = pHandle->pDataBlock;
×
778
      return code;
×
779
    }
780
  }
781

782
  *pRes = (pHandle->pDataBlock->info.rows > 0) ? pHandle->pDataBlock : NULL;
×
783
  return code;
×
784
}
785

786
// TODO: improve this function performance
787

788
int32_t tsortComparBlockCell(SSDataBlock* pLeftBlock, SSDataBlock* pRightBlock, int32_t leftRowIndex,
69,696,481✔
789
                             int32_t rightRowIndex, void* pCompareOrder) {
790
  SBlockOrderInfo* pOrder = pCompareOrder;
69,696,481✔
791
  SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->slotId);
69,696,481✔
792
  SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->slotId);
69,695,899✔
793

794
  bool isVarType = IS_VAR_DATA_TYPE(pLeftColInfoData->info.type);
69,697,147✔
795
  if (pLeftColInfoData->hasNull || pRightColInfoData->hasNull) {
69,696,025✔
796
    bool leftNull = false;
454,980✔
797
    if (pLeftColInfoData->hasNull) {
454,980✔
798
      if (pLeftBlock->pBlockAgg == NULL) {
456,768✔
799
        leftNull = colDataIsNull_t(pLeftColInfoData, leftRowIndex, isVarType);
913,536✔
800
      } else {
801
        leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, leftRowIndex,
×
802
                                 &pLeftBlock->pBlockAgg[pOrder->slotId]);
×
803
      }
804
    }
805

806
    bool rightNull = false;
456,768✔
807
    if (pRightColInfoData->hasNull) {
456,768✔
808
      if (pRightBlock->pBlockAgg == NULL) {
456,768✔
809
        rightNull = colDataIsNull_t(pRightColInfoData, rightRowIndex, isVarType);
913,536✔
810
      } else {
811
        rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, rightRowIndex,
×
812
                                  &pRightBlock->pBlockAgg[pOrder->slotId]);
×
813
      }
814
    }
815

816
    if (leftNull && rightNull) {
456,768✔
817
      return 0;
×
818
    }
819

820
    if (rightNull) {
456,768✔
821
      return pOrder->nullFirst ? 1 : -1;
×
822
    }
823

824
    if (leftNull) {
456,768✔
825
      return pOrder->nullFirst ? -1 : 1;
×
826
    }
827
  }
828

829
  void *left1, *right1;
830
  left1 = colDataGetData(pLeftColInfoData, leftRowIndex);
69,697,771✔
831
  right1 = colDataGetData(pRightColInfoData, rightRowIndex);
69,696,523✔
832
  __compar_fn_t fn = pOrder->compFn;
69,698,353✔
833
  int32_t       ret = fn(left1, right1);
69,698,893✔
834
  return ret;
69,694,693✔
835
}
836

837
int32_t msortComparFn(const void* pLeft, const void* pRight, void* param) {
2,147,483,647✔
838
  int32_t pLeftIdx = *(int32_t*)pLeft;
2,147,483,647✔
839
  int32_t pRightIdx = *(int32_t*)pRight;
2,147,483,647✔
840

841
  SMsortComparParam* pParam = (SMsortComparParam*)param;
2,147,483,647✔
842

843
  SArray* pInfo = pParam->orderInfo;
2,147,483,647✔
844

845
  SSortSource* pLeftSource = pParam->pSources[pLeftIdx];
2,147,483,647✔
846
  SSortSource* pRightSource = pParam->pSources[pRightIdx];
2,147,483,647✔
847

848
  // this input is exhausted, set the special value to denote this
849
  if (UNLIKELY(pLeftSource->src.rowIndex == -1)) {
2,147,483,647✔
850
    return 1;
2,147,483,647✔
851
  }
852

853
  if (UNLIKELY(pRightSource->src.rowIndex == -1)) {
2,147,483,647✔
854
    return -1;
20,525,327✔
855
  }
856

857
  SSDataBlock* pLeftBlock = pLeftSource->src.pBlock;
2,147,483,647✔
858
  SSDataBlock* pRightBlock = pRightSource->src.pBlock;
2,147,483,647✔
859

860
  if (pParam->cmpGroupId) {
2,147,483,647✔
861
    if (pLeftBlock->info.id.groupId != pRightBlock->info.id.groupId) {
2,147,483,647✔
862
      return pLeftBlock->info.id.groupId < pRightBlock->info.id.groupId ? -1 : 1;
2,147,483,647✔
863
    }
864
  }
865

866
  if (pParam->sortType == SORT_BLOCK_TS_MERGE) {
2,147,483,647✔
867
    SColumnInfoData* pLeftTsCol = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pParam->tsSlotId);
976,999,023✔
868
    SColumnInfoData* pRightTsCol = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pParam->tsSlotId);
976,999,023✔
869
    int64_t*         leftTs = (int64_t*)(pLeftTsCol->pData) + pLeftSource->src.rowIndex;
976,999,023✔
870
    int64_t*         rightTs = (int64_t*)(pRightTsCol->pData) + pRightSource->src.rowIndex;
976,999,023✔
871

872
    int32_t ret = pParam->cmpTsFn(leftTs, rightTs);
976,999,023✔
873
    if (ret == 0 && pParam->pPkOrder) {
976,999,023✔
874
      ret = tsortComparBlockCell(pLeftBlock, pRightBlock, pLeftSource->src.rowIndex, pRightSource->src.rowIndex,
×
875
                                 (SBlockOrderInfo*)pParam->pPkOrder);
876
    }
877
    return ret;
976,999,023✔
878
  } else if (pParam->sortType == SORT_MULTISOURCE_TS_MERGE) {
2,147,483,647✔
879
    SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pInfo, 0);
2,147,483,647✔
880
    SColumnInfoData* pLeftTsCol = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->slotId);
2,147,483,647✔
881
    SColumnInfoData* pRightTsCol = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->slotId);
2,147,483,647✔
882
    int64_t*         leftTs = (int64_t*)(pLeftTsCol->pData) + pLeftSource->src.rowIndex;
2,147,483,647✔
883
    int64_t*         rightTs = (int64_t*)(pRightTsCol->pData) + pRightSource->src.rowIndex;
2,147,483,647✔
884

885

886
    __compar_fn_t fn = pOrder->compFn;
2,147,483,647✔
887
    if (!fn) {
2,147,483,647✔
888
      fn = getKeyComparFunc(pLeftTsCol->info.type, pOrder->order);
3,505,995✔
889
      pOrder->compFn = fn;
3,505,995✔
890
    }
891

892
    return fn(leftTs, rightTs);
2,147,483,647✔
893
  } else {
894
    bool isVarType;
895
    for (int32_t i = 0; i < pInfo->size; ++i) {
2,147,483,647✔
896
      SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pInfo, i);
2,147,483,647✔
897
      SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->slotId);
2,147,483,647✔
898
      SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->slotId);
2,147,483,647✔
899
      isVarType = IS_VAR_DATA_TYPE(pLeftColInfoData->info.type);
2,147,483,647✔
900

901
      if (pLeftColInfoData->hasNull || pRightColInfoData->hasNull) {
2,147,483,647✔
902
        bool leftNull = false;
2,147,483,647✔
903
        if (pLeftColInfoData->hasNull) {
2,147,483,647✔
904
          if (pLeftBlock->pBlockAgg == NULL) {
2,147,483,647✔
905
            leftNull = colDataIsNull_t(pLeftColInfoData, pLeftSource->src.rowIndex, isVarType);
2,147,483,647✔
906
          } else {
907
            leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex,
×
908
                                     &pLeftBlock->pBlockAgg[i]);
286,980✔
909
          }
910
        }
911

912
        bool rightNull = false;
2,147,483,647✔
913
        if (pRightColInfoData->hasNull) {
2,147,483,647✔
914
          if (pRightBlock->pBlockAgg == NULL) {
2,147,483,647✔
915
            rightNull = colDataIsNull_t(pRightColInfoData, pRightSource->src.rowIndex, isVarType);
2,147,483,647✔
916
          } else {
917
            rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex,
×
918
                                      &pRightBlock->pBlockAgg[i]);
×
919
          }
920
        }
921

922
        if (leftNull && rightNull) {
2,147,483,647✔
923
          continue;  // continue to next slot
297,220,929✔
924
        }
925

926
        if (rightNull) {
2,147,483,647✔
927
          return pOrder->nullFirst ? 1 : -1;
147,853,819✔
928
        }
929

930
        if (leftNull) {
2,147,483,647✔
931
          return pOrder->nullFirst ? -1 : 1;
103,723✔
932
        }
933
      }
934

935
      void *left1, *right1;
936
      if (isVarType) {
2,147,483,647✔
937
        left1 = colDataGetVarData(pLeftColInfoData, pLeftSource->src.rowIndex);
2,147,483,647✔
938
        right1 = colDataGetVarData(pRightColInfoData, pRightSource->src.rowIndex);
2,147,483,647✔
939
      } else {
940
        left1 = colDataGetNumData(pLeftColInfoData, pLeftSource->src.rowIndex);
2,147,483,647✔
941
        right1 = colDataGetNumData(pRightColInfoData, pRightSource->src.rowIndex);
2,147,483,647✔
942
      }
943

944
      __compar_fn_t fn = pOrder->compFn;
2,147,483,647✔
945
      if (!fn) {
2,147,483,647✔
946
        fn = getKeyComparFunc(pLeftColInfoData->info.type, pOrder->order);
11,052,911✔
947
        pOrder->compFn = fn;
11,052,911✔
948
      }
949

950
      int32_t ret = fn(left1, right1);
2,147,483,647✔
951
      if (ret == 0) {
2,147,483,647✔
952
        continue;
2,147,483,647✔
953
      } else {
954
        return ret;
2,147,483,647✔
955
      }
956
    }
957
  }
958

959
  return 0;
2,147,483,647✔
960
}
961

962
static int32_t doSortForEachGroup(SSortHandle* pHandle, int32_t sortTimes, int32_t numOfSorted,
×
963
                                  int32_t numOfInputSources, SArray* pResList, int32_t sortGroup, int32_t numOfRows) {
964
  int32_t code = 0;
×
965
  int32_t lino = 0;
×
966
  SArray* pPageIdList = NULL;
×
967

968
  for (int32_t i = 0; i < sortGroup; ++i) {
×
969
    qDebug("internal merge sort pass %d group %d. num input sources %d ", sortTimes, i, numOfInputSources);
×
970
    pHandle->sourceId += 1;
×
971

972
    int32_t end = (i + 1) * numOfInputSources - 1;
×
973
    if (end > numOfSorted - 1) {
×
974
      end = numOfSorted - 1;
×
975
    }
976

977
    pHandle->cmpParam.numOfSources = end - i * numOfInputSources + 1;
×
978

979
    code = sortComparInit(&pHandle->cmpParam, pHandle->pOrderedSource, i * numOfInputSources, end, pHandle);
×
980
    QUERY_CHECK_CODE(code, lino, _err);
×
981

982
    code =
983
        tMergeTreeCreate(&pHandle->pMergeTree, pHandle->cmpParam.numOfSources, &pHandle->cmpParam, pHandle->comparFn);
×
984
    QUERY_CHECK_CODE(code, lino, _err);
×
985

986
    int32_t nMergedRows = 0;
×
987
    pPageIdList = taosArrayInit(4, sizeof(int32_t));
×
988
    QUERY_CHECK_NULL(pPageIdList, code, lino, _err, terrno);
×
989

990
    while (1) {
×
991
      if (tsortIsClosed(pHandle) || (pHandle->abortCheckFn && pHandle->abortCheckFn(pHandle->abortCheckParam))) {
×
992
        code = TSDB_CODE_TSC_QUERY_CANCELLED;
×
993
        goto _err;
×
994
      }
995

996
      SSDataBlock* pDataBlock = NULL;
×
997
      code = getSortedBlockDataInner(pHandle, &pHandle->cmpParam, numOfRows, &pDataBlock);
×
998
      if (pDataBlock == NULL || code != 0) {
×
999
        break;
1000
      }
1001

1002
      int32_t pageId = -1;
×
1003
      void*   pPage = getNewBufPage(pHandle->pBuf, &pageId);
×
1004
      QUERY_CHECK_NULL(pPage, code, lino, _err, terrno);
×
1005

1006
      void* px = taosArrayPush(pPageIdList, &pageId);
×
1007
      QUERY_CHECK_NULL(px, code, lino, _err, terrno);
×
1008

1009
      int32_t size =
×
1010
          blockDataGetSize(pDataBlock) + sizeof(int32_t) + taosArrayGetSize(pDataBlock->pDataBlock) * sizeof(int32_t);
×
1011
      if (size > getBufPageSize(pHandle->pBuf)) {
×
1012
        code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1013
        goto _err;
×
1014
      }
1015

1016
      code = blockDataToBuf(pPage, pDataBlock);
×
1017
      QUERY_CHECK_CODE(code, lino, _err);
×
1018

1019
      setBufPageDirty(pPage, true);
×
1020
      releaseBufPage(pHandle->pBuf, pPage);
×
1021
      nMergedRows += pDataBlock->info.rows;
×
1022

1023
      blockDataCleanup(pDataBlock);
×
1024
      if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
×
1025
        break;
×
1026
      }
1027
    }
1028

1029
    code = sortComparCleanup(&pHandle->cmpParam);
×
1030
    QUERY_CHECK_CODE(code, lino, _err);
×
1031

1032
    tMergeTreeDestroy(&pHandle->pMergeTree);
×
1033
    pHandle->numOfCompletedSources = 0;
×
1034

1035
    SSDataBlock* pBlock = NULL;
×
1036
    code = createOneDataBlock(pHandle->pDataBlock, false, &pBlock);
×
1037
    QUERY_CHECK_CODE(code, lino, _err);
×
1038

1039
    code = doAddNewExternalMemSource(pHandle->pBuf, pResList, pBlock, &pHandle->sourceId, pPageIdList);
×
1040
    if (code != TSDB_CODE_SUCCESS) {
×
1041
      blockDataDestroy(pBlock);
×
1042
    }
1043
    QUERY_CHECK_CODE(code, lino, _err);
×
1044
  }
1045

1046
  return code;
×
1047

1048
_err:
×
1049
  taosArrayDestroy(pPageIdList);
×
1050
  qError("%s error happens:%s line:%d, code:%s", pHandle->idStr, __func__, lino, tstrerror(code));
×
1051
  return code;
×
1052
}
1053

1054
static int32_t doInternalMergeSort(SSortHandle* pHandle) {
80,659,326✔
1055
  size_t numOfSources = taosArrayGetSize(pHandle->pOrderedSource);
80,659,326✔
1056
  if (numOfSources == 0) {
80,657,203✔
1057
    return 0;
45,855,538✔
1058
  }
1059

1060
  // Calculate the I/O counts to complete the data sort.
1061
  double sortPass = floorl(log2(numOfSources) / log2(pHandle->numOfPages));
34,801,665✔
1062

1063
  pHandle->totalElapsed = taosGetTimestampUs() - pHandle->startTs;
34,802,626✔
1064

1065
  if (sortPass > 0) {
34,793,954✔
1066
    size_t s = pHandle->pBuf ? getTotalBufSize(pHandle->pBuf) : 0;
×
1067
    qDebug("%s %d rounds mergesort required to complete the sort, first-round sorted data size:%" PRIzu
×
1068
           ", sort elapsed:%" PRId64 ", total elapsed:%" PRId64,
1069
           pHandle->idStr, (int32_t)(sortPass + 1), s, pHandle->sortElapsed, pHandle->totalElapsed);
1070
  } else {
1071
    qDebug("%s ordered source:%" PRIzu ", available buf:%d, no need internal sort", pHandle->idStr, numOfSources,
34,793,954✔
1072
           pHandle->numOfPages);
1073
  }
1074

1075
  int32_t size = (int32_t)blockDataGetSerialMetaSize(taosArrayGetSize(pHandle->pDataBlock->pDataBlock));
34,793,954✔
1076
  size_t  rowSize = blockDataGetRowSize(pHandle->pDataBlock);
34,799,584✔
1077
  int32_t numOfRows = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize, size);
34,798,151✔
1078
  if (numOfRows < 0) {
34,802,755✔
1079
    return terrno;
×
1080
  }
1081

1082
  int32_t code = blockDataEnsureCapacity(pHandle->pDataBlock, numOfRows);
34,802,755✔
1083
  if (code) {
34,803,304✔
1084
    return code;
×
1085
  }
1086

1087
  // the initial pass + sortPass + final mergePass
1088
  pHandle->loops = sortPass + 2;
34,803,304✔
1089

1090
  size_t numOfSorted = taosArrayGetSize(pHandle->pOrderedSource);
34,802,374✔
1091
  for (int32_t t = 0; t < sortPass; ++t) {
34,802,722✔
1092
    int64_t st = taosGetTimestampUs();
×
1093
    SArray* pResList = taosArrayInit(4, POINTER_BYTES);
×
1094
    if (pResList == NULL) {
×
1095
      return terrno;
×
1096
    }
1097

1098
    int32_t numOfInputSources = pHandle->numOfPages;
×
1099
    int32_t sortGroup = (numOfSorted + numOfInputSources - 1) / numOfInputSources;
×
1100

1101
    // Only *numOfInputSources* can be loaded into buffer to perform the external sort.
1102
    code = doSortForEachGroup(pHandle, t, numOfSorted, numOfInputSources, pResList, sortGroup, numOfRows);
×
1103
    if (code != 0) {
×
1104
      tsortClearOrderedSource(pResList, NULL, NULL);
×
1105
      taosArrayDestroy(pResList);
×
1106
      return code;
×
1107
    }
1108

1109
    tsortClearOrderedSource(pHandle->pOrderedSource, NULL, NULL);
×
1110
    void* px = taosArrayAddAll(pHandle->pOrderedSource, pResList);
×
1111
    if (px == NULL) {
×
1112
      tsortClearOrderedSource(pResList, NULL, NULL);
×
1113
      taosArrayDestroy(pResList);
×
1114
      return terrno;
×
1115
    }
1116

1117
    taosArrayDestroy(pResList);
×
1118
    numOfSorted = taosArrayGetSize(pHandle->pOrderedSource);
×
1119

1120
    int64_t el = taosGetTimestampUs() - st;
×
1121
    pHandle->totalElapsed += el;
×
1122

1123
    SDiskbasedBufStatis statis = getDBufStatis(pHandle->pBuf);
×
1124
    qDebug("%s %d round mergesort, elapsed:%" PRId64 " readDisk:%.2f Kb, flushDisk:%.2f Kb", pHandle->idStr, t + 1, el,
×
1125
           statis.loadBytes / 1024.0, statis.flushBytes / 1024.0);
1126

1127
    if (pHandle->type == SORT_MULTISOURCE_MERGE) {
×
1128
      pHandle->type = SORT_SINGLESOURCE_SORT;
×
1129
      pHandle->comparFn = msortComparFn;
×
1130
    }
1131
  }
1132

1133
  pHandle->cmpParam.numOfSources = taosArrayGetSize(pHandle->pOrderedSource);
34,802,722✔
1134
  return 0;
34,799,069✔
1135
}
1136

1137
// get sort page size
1138
int32_t getProperSortPageSize(size_t rowSize, uint32_t numOfCols) {
79,249,512✔
1139
  uint32_t pgSize = rowSize * 4 + blockDataGetSerialMetaSize(numOfCols);
79,249,512✔
1140
  if (pgSize < DEFAULT_PAGESIZE) {
79,259,636✔
1141
    return DEFAULT_PAGESIZE;
76,411,318✔
1142
  }
1143

1144
  return pgSize;
2,848,318✔
1145
}
1146

1147
static int32_t createPageBuf(SSortHandle* pHandle) {
12,934,417✔
1148
  if (pHandle->pBuf == NULL) {
12,934,417✔
1149
    if (!osTempSpaceAvailable()) {
12,935,366✔
1150
      terrno = TSDB_CODE_NO_DISKSPACE;
×
1151
      qError("create page buf failed since %s, tempDir:%s", terrstr(), tsTempDir);
×
1152
      return terrno;
×
1153
    }
1154

1155
    int32_t code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize,
12,932,073✔
1156
                                      "tableBlocksBuf", tsTempDir);
1157
    if (code != TSDB_CODE_SUCCESS) {
12,929,171✔
1158
      return code;
×
1159
    } else {
1160
      dBufSetPrintInfo(pHandle->pBuf);
12,929,171✔
1161
    }
1162
  }
1163
  return 0;
12,930,137✔
1164
}
1165

1166
int32_t tsortAppendTupleToBlock(SSortHandle* pHandle, SSDataBlock* pBlock, STupleHandle* pTupleHandle) {
2,147,483,647✔
1167
  int32_t code = 0;
2,147,483,647✔
1168

1169
  if (pHandle->bSortByRowId) {
2,147,483,647✔
1170
    int32_t *p1, *p2, *p3;
634,147,892✔
1171
    tsortGetValue(pTupleHandle, 1, (void**)&p1);
634,147,408✔
1172
    tsortGetValue(pTupleHandle, 2, (void**)&p2);
634,145,136✔
1173
    tsortGetValue(pTupleHandle, 3, (void**)&p3);
634,145,674✔
1174

1175
    int32_t regionId = *p1;
634,147,268✔
1176
    int32_t offset = *p2;
634,147,268✔
1177
    int32_t length = *p3;
634,145,716✔
1178

1179
    char* buf = NULL;
634,145,814✔
1180
    bool  bFreeRow = false;
634,145,814✔
1181

1182
    code = getRowBufFromExtMemFile(pHandle, regionId, offset, length, &buf, &bFreeRow);
634,146,300✔
1183
    if (code) {
634,143,222✔
1184
      return code;
×
1185
    }
1186

1187
    int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
634,143,222✔
1188
    char*   isNull = (char*)buf;
634,143,804✔
1189
    char*   pStart = (char*)buf + sizeof(int8_t) * numOfCols;
634,143,804✔
1190
    for (int32_t i = 0; i < numOfCols; ++i) {
2,147,483,647✔
1191
      SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
2,147,483,647✔
1192
      if (pColInfo == NULL) {
2,147,483,647✔
1193
        return terrno;
×
1194
      }
1195

1196
      if (!isNull[i]) {
2,147,483,647✔
1197
        code = colDataSetVal(pColInfo, pBlock->info.rows, pStart, false);
2,147,483,647✔
1198
        if (code) {
2,147,483,647✔
1199
          return code;
×
1200
        }
1201

1202
        if (pColInfo->info.type == TSDB_DATA_TYPE_JSON) {
2,147,483,647✔
1203
          int32_t dataLen = getJsonValueLen(pStart);
20,113✔
1204
          pStart += dataLen;
20,113✔
1205
        } else if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
2,147,483,647✔
1206
          if (IS_STR_DATA_BLOB(pColInfo->info.type)) {
1,104,089,223✔
1207
            pStart += blobDataTLen(pStart);
3,872✔
1208
          } else {
1209
            pStart += varDataTLen(pStart);
1,105,009,971✔
1210
          }
1211
        } else {
1212
          int32_t bytes = pColInfo->info.bytes;
1,491,576,345✔
1213
          pStart += bytes;
1,491,576,885✔
1214
        }
1215
      } else {
1216
        colDataSetNULL(pColInfo, pBlock->info.rows);
717,256,106✔
1217
      }
1218
    }
1219

1220
    if (*(int32_t*)pStart != pStart - buf) {
634,145,536✔
1221
      qError("table merge scan row buf deserialization. length error %d != %d ", *(int32_t*)pStart,
×
1222
             (int32_t)(pStart - buf));
1223
    }
1224

1225
    if (bFreeRow) {
634,145,536✔
1226
      taosMemoryFree(buf);
×
1227
    }
1228

1229
    pBlock->info.dataLoad = 1;
634,145,536✔
1230

1231
    SDataBlockInfo info = {0};
634,144,912✔
1232
    tsortGetBlockInfo(pTupleHandle, &info);
634,143,844✔
1233

1234
    pBlock->info.scanFlag = info.scanFlag;
634,148,762✔
1235
    pBlock->info.rows += 1;
634,148,762✔
1236

1237
  } else {
1238
    for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
2,147,483,647✔
1239
      SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
2,147,483,647✔
1240
      if (pColInfo == NULL) {
2,147,483,647✔
1241
        return terrno;
×
1242
      }
1243

1244
      bool isNull = tsortIsNullVal(pTupleHandle, i);
2,147,483,647✔
1245
      if (isNull) {
2,147,483,647✔
1246
        colDataSetNULL(pColInfo, pBlock->info.rows);
1,285,847,351✔
1247
      } else {
1248
        char* pData = NULL;
2,147,483,647✔
1249
        tsortGetValue(pTupleHandle, i, (void**)&pData);
2,147,483,647✔
1250
        if (pData != NULL) {
2,147,483,647✔
1251
          code = colDataSetVal(pColInfo, pBlock->info.rows, pData, false);
2,147,483,647✔
1252
          if (code) {
2,147,483,647✔
1253
            return code;
×
1254
          }
1255
        }
1256
      }
1257
    }
1258

1259
    pBlock->info.dataLoad = 1;
2,147,483,647✔
1260
    SDataBlockInfo info = {0};
2,147,483,647✔
1261
    tsortGetBlockInfo(pTupleHandle, &info);
2,147,483,647✔
1262

1263
    pBlock->info.scanFlag = info.scanFlag;
2,147,483,647✔
1264
    pBlock->info.rows += 1;
2,147,483,647✔
1265
  }
1266

1267
  return code;
2,147,483,647✔
1268
}
1269

1270
static int32_t blockRowToBuf(SSDataBlock* pBlock, int32_t rowIdx, char* buf) {
634,132,992✔
1271
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
634,132,992✔
1272

1273
  char* isNull = (char*)buf;
634,133,314✔
1274
  char* pStart = (char*)buf + sizeof(int8_t) * numOfCols;
634,133,314✔
1275
  for (int32_t i = 0; i < numOfCols; ++i) {
2,147,483,647✔
1276
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
2,147,483,647✔
1277
    if (pCol == NULL) {
2,147,483,647✔
1278
      return terrno;
×
1279
    }
1280

1281
    if (colDataIsNull_s(pCol, rowIdx)) {
2,147,483,647✔
1282
      isNull[i] = 1;
717,132,207✔
1283
      continue;
717,161,979✔
1284
    }
1285

1286
    isNull[i] = 0;
2,147,483,647✔
1287
    char* pData = colDataGetData(pCol, rowIdx);
2,147,483,647✔
1288
    if (pCol->info.type == TSDB_DATA_TYPE_JSON) {
2,147,483,647✔
1289
      if (pCol->pData) {
20,113✔
1290
        int32_t dataLen = getJsonValueLen(pData);
17,949✔
1291
        memcpy(pStart, pData, dataLen);
17,949✔
1292
        pStart += dataLen;
17,949✔
1293
      } else {
1294
        // the column that is pre-allocated has no data and has offset
1295
        *pStart = 0;
2,164✔
1296
        pStart += 1;
2,164✔
1297
      }
1298
    } else if (IS_VAR_DATA_TYPE(pCol->info.type)) {
2,147,483,647✔
1299
      if (IS_STR_DATA_BLOB(pCol->info.type)) {
1,104,133,634✔
1300
        if (pCol->pData) {
×
1301
          blobDataCopy(pStart, pData);
×
1302
          pStart += blobDataTLen(pData);
×
1303
        } else {
1304
          // the column that is pre-allocated has no data and has offset
1305
          *(BlobDataLenT*)(pStart) = 0;
×
1306
          pStart += BLOBSTR_HEADER_SIZE;
×
1307
        }
1308
      } else {
1309
        if (pCol->pData) {
1,104,871,429✔
1310
          varDataCopy(pStart, pData);
1,104,838,179✔
1311
          pStart += varDataTLen(pData);
1,104,845,115✔
1312
        } else {
1313
          // the column that is pre-allocated has no data and has offset
1314
          *(VarDataLenT*)(pStart) = 0;
34,831✔
1315
          pStart += VARSTR_HEADER_SIZE;
39,964✔
1316
        }
1317
      }
1318
    } else {
1319
      int32_t bytes = pCol->info.bytes;
1,491,400,574✔
1320
      memcpy(pStart, pData, bytes);
1,488,872,260✔
1321
      pStart += bytes;
1,488,872,260✔
1322
    }
1323
  }
1324
  *(int32_t*)pStart = (char*)pStart - (char*)buf;
634,119,770✔
1325
  pStart += sizeof(int32_t);
634,119,286✔
1326
  return (int32_t)(pStart - (char*)buf);
634,117,414✔
1327
}
1328

1329
static int32_t getRowBufFromExtMemFile(SSortHandle* pHandle, int32_t regionId, int32_t tupleOffset, int32_t rowLen,
634,144,568✔
1330
                                       char** ppRow, bool* pFreeRow) {
1331
  SSortMemFile*       pMemFile = pHandle->pExtRowsMemFile;
634,144,568✔
1332
  SSortMemFileRegion* pRegion = taosArrayGet(pMemFile->aFileRegions, regionId);
634,143,944✔
1333
  if (pRegion == NULL) {
634,143,320✔
1334
    return terrno;
×
1335
  }
1336

1337
  if (pRegion->buf == NULL) {
634,143,320✔
1338
    pRegion->bufRegOffset = 0;
2,280,265✔
1339
    pRegion->buf = taosMemoryMalloc(pMemFile->blockSize);
2,280,265✔
1340
    if (pRegion->buf == NULL) {
2,280,265✔
1341
      return terrno;
×
1342
    }
1343

1344
    TAOS_CHECK_RETURN(taosSeekCFile(pMemFile->pTdFile, pRegion->fileOffset, SEEK_SET));
2,279,683✔
1345

1346
    int32_t readBytes = TMIN(pMemFile->blockSize, pRegion->regionSize);
2,279,683✔
1347
    int32_t ret = taosReadFromCFile(pRegion->buf, readBytes, 1, pMemFile->pTdFile);
2,280,265✔
1348
    if (ret != 1) {
2,280,265✔
1349
      terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
1350
      return terrno;
×
1351
    }
1352
    pRegion->bufLen = readBytes;
2,280,265✔
1353
  }
1354
  if (pRegion->bufRegOffset > tupleOffset) {
634,143,804✔
1355
    qError("sort failed at: %s:%d", __func__, __LINE__);
×
1356
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1357
  }
1358
  if (pRegion->bufRegOffset + pRegion->bufLen >= tupleOffset + rowLen) {
634,143,804✔
1359
    *pFreeRow = false;
634,143,804✔
1360
    *ppRow = pRegion->buf + tupleOffset - pRegion->bufRegOffset;
634,143,222✔
1361
  } else {
1362
    *ppRow = taosMemoryMalloc(rowLen);
×
1363
    if (*ppRow == NULL) {
×
1364
      return terrno;
×
1365
    }
1366
    int32_t szThisBlock = pRegion->bufLen - (tupleOffset - pRegion->bufRegOffset);
×
1367
    memcpy(*ppRow, pRegion->buf + tupleOffset - pRegion->bufRegOffset, szThisBlock);
×
1368

1369
    // todo
1370
    if (taosSeekCFile(pMemFile->pTdFile, pRegion->fileOffset + pRegion->bufRegOffset + pRegion->bufLen, SEEK_SET) < 0) {
×
1371
      return terrno;
×
1372
    }
1373
    int32_t readBytes = TMIN(pMemFile->blockSize, pRegion->regionSize - (pRegion->bufRegOffset + pRegion->bufLen));
×
1374
    int32_t ret = taosReadFromCFile(pRegion->buf, readBytes, 1, pMemFile->pTdFile);
×
1375
    if (ret != 1) {
×
1376
      taosMemoryFreeClear(*ppRow);
×
1377
      terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
1378
      return terrno;
×
1379
    }
1380
    memcpy(*ppRow + szThisBlock, pRegion->buf, rowLen - szThisBlock);
×
1381
    *pFreeRow = true;
×
1382
    pRegion->bufRegOffset += pRegion->bufLen;
×
1383
    pRegion->bufLen = readBytes;
×
1384
  }
1385
  return TSDB_CODE_SUCCESS;
634,143,320✔
1386
}
1387

1388
static int32_t createSortMemFile(SSortHandle* pHandle) {
2,865,117✔
1389
  if (pHandle->pExtRowsMemFile != NULL) {
2,865,117✔
1390
    return TSDB_CODE_SUCCESS;
×
1391
  }
1392
  int32_t       code = TSDB_CODE_SUCCESS;
2,865,117✔
1393
  SSortMemFile* pMemFile = taosMemoryCalloc(1, sizeof(SSortMemFile));
2,865,117✔
1394
  if (pMemFile == NULL) {
2,862,985✔
1395
    code = terrno;
×
1396
  }
1397
  if (code == TSDB_CODE_SUCCESS) {
2,862,985✔
1398
    taosGetTmpfilePath(tsTempDir, "sort-ext-mem", pMemFile->memFilePath);
2,862,985✔
1399
    pMemFile->pTdFile = taosOpenCFile(pMemFile->memFilePath, "w+b");
2,864,633✔
1400
    if (pMemFile->pTdFile == NULL) {
2,865,699✔
1401
      code = terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
1402
    }
1403
  }
1404
  if (code == TSDB_CODE_SUCCESS) {
2,865,699✔
1405
    code = taosSetAutoDelFile(pMemFile->memFilePath);
2,865,699✔
1406
    if (code) {
2,865,699✔
1407
      qError("failed to set the auto-delete file attribute");
×
1408
      return code;
×
1409
    }
1410

1411
    pMemFile->currRegionId = -1;
2,865,699✔
1412
    pMemFile->currRegionOffset = -1;
2,865,699✔
1413

1414
    pMemFile->writeBufSize = 4 * 1024 * 1024;
2,865,699✔
1415
    pMemFile->writeFileOffset = -1;
2,865,699✔
1416
    pMemFile->bRegionDirty = false;
2,865,699✔
1417

1418
    pMemFile->writeBuf = taosMemoryMalloc(pMemFile->writeBufSize);
2,865,699✔
1419
    if (pMemFile->writeBuf == NULL) {
2,865,699✔
1420
      code = terrno;
×
1421
    }
1422
  }
1423

1424
  if (code == TSDB_CODE_SUCCESS) {
2,865,699✔
1425
    pMemFile->cacheSize = pHandle->extRowsMemSize;
2,865,699✔
1426
    pMemFile->aFileRegions = taosArrayInit(64, sizeof(SSortMemFileRegion));
2,865,699✔
1427
    if (pMemFile->aFileRegions == NULL) {
2,865,215✔
1428
      code = terrno;
×
1429
    }
1430
  }
1431

1432
  if (code == TSDB_CODE_SUCCESS) {
2,865,699✔
1433
    pHandle->pExtRowsMemFile = pMemFile;
2,865,699✔
1434
  } else {
1435
    if (pMemFile) {
×
1436
      if (pMemFile->aFileRegions) taosMemoryFreeClear(pMemFile->aFileRegions);
×
1437
      if (pMemFile->writeBuf) taosMemoryFreeClear(pMemFile->writeBuf);
×
1438
      if (pMemFile->pTdFile) {
×
1439
        (void)taosCloseCFile(pMemFile->pTdFile);
×
1440
        pMemFile->pTdFile = NULL;
×
1441
      }
UNCOV
1442
      taosMemoryFreeClear(pMemFile);
×
1443
    }
1444
  }
1445
  return code;
2,865,215✔
1446
}
1447

1448
static void destroySortMemFile(SSortHandle* pHandle) {
2,865,699✔
1449
  if (pHandle->pExtRowsMemFile == NULL) {
2,865,699✔
1450
    return;
×
1451
  }
1452

1453
  SSortMemFile* pMemFile = pHandle->pExtRowsMemFile;
2,865,699✔
1454
  for (int32_t i = 0; i < taosArrayGetSize(pMemFile->aFileRegions); ++i) {
5,145,964✔
1455
    SSortMemFileRegion* pRegion = taosArrayGet(pMemFile->aFileRegions, i);
2,280,265✔
1456
    if (pRegion == NULL) {
2,280,265✔
1457
      continue;
×
1458
    }
1459

1460
    taosMemoryFree(pRegion->buf);
2,280,265✔
1461
  }
1462

1463
  taosArrayDestroy(pMemFile->aFileRegions);
2,865,699✔
1464
  pMemFile->aFileRegions = NULL;
2,865,699✔
1465

1466
  taosMemoryFree(pMemFile->writeBuf);
2,865,699✔
1467
  pMemFile->writeBuf = NULL;
2,865,699✔
1468

1469
  (void)taosCloseCFile(pMemFile->pTdFile);
2,865,699✔
1470
  pMemFile->pTdFile = NULL;
2,865,699✔
1471
  (void)taosRemoveFile(pMemFile->memFilePath);
2,865,215✔
1472
  taosMemoryFree(pMemFile);
2,865,699✔
1473
  pHandle->pExtRowsMemFile = NULL;
2,865,699✔
1474
}
1475

1476
static int32_t tsortOpenRegion(SSortHandle* pHandle) {
2,280,265✔
1477
  SSortMemFile* pMemFile = pHandle->pExtRowsMemFile;
2,280,265✔
1478
  int32_t       code = 0;
2,280,265✔
1479

1480
  if (pMemFile->currRegionId == -1) {
2,280,265✔
1481
    SSortMemFileRegion region = {0};
2,280,265✔
1482
    region.fileOffset = 0;
2,280,265✔
1483
    region.bufRegOffset = 0;
2,280,265✔
1484
    void* px = taosArrayPush(pMemFile->aFileRegions, &region);
2,280,265✔
1485
    if (px == NULL) {
2,280,265✔
1486
      code = terrno;
×
1487
    }
1488

1489
    pMemFile->currRegionId = 0;
2,280,265✔
1490
    pMemFile->currRegionOffset = 0;
2,280,265✔
1491
    pMemFile->writeFileOffset = 0;
2,280,265✔
1492
  } else {
1493
    SSortMemFileRegion  regionNew = {0};
×
1494
    SSortMemFileRegion* pRegion = taosArrayGet(pMemFile->aFileRegions, pMemFile->currRegionId);
×
1495
    if (pRegion == NULL) {
×
1496
      return terrno;
×
1497
    }
1498

1499
    regionNew.fileOffset = pRegion->fileOffset + pRegion->regionSize;
×
1500
    regionNew.bufRegOffset = 0;
×
1501

1502
    void* px = taosArrayPush(pMemFile->aFileRegions, &regionNew);
×
1503
    if (px == NULL) {
×
1504
      code = terrno;
×
1505
    }
1506
    ++pMemFile->currRegionId;
×
1507
    pMemFile->currRegionOffset = 0;
×
1508
    pMemFile->writeFileOffset = regionNew.fileOffset;
×
1509
  }
1510
  return code;
2,280,265✔
1511
}
1512

1513
static int32_t tsortCloseRegion(SSortHandle* pHandle) {
2,280,265✔
1514
  SSortMemFile*       pMemFile = pHandle->pExtRowsMemFile;
2,280,265✔
1515
  SSortMemFileRegion* pRegion = taosArrayGet(pMemFile->aFileRegions, pMemFile->currRegionId);
2,280,265✔
1516
  if (pRegion == NULL) {
2,280,265✔
1517
    return terrno;
×
1518
  }
1519

1520
  pRegion->regionSize = pMemFile->currRegionOffset;
2,280,265✔
1521
  int32_t writeBytes = pRegion->regionSize - (pMemFile->writeFileOffset - pRegion->fileOffset);
2,280,265✔
1522
  if (writeBytes > 0) {
2,280,265✔
1523
    int32_t ret = fwrite(pMemFile->writeBuf, writeBytes, 1, pMemFile->pTdFile);
2,280,265✔
1524
    if (ret != 1) {
2,280,265✔
1525
      terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
1526
      return terrno;
×
1527
    }
1528
    pMemFile->bRegionDirty = false;
2,280,265✔
1529
  }
1530
  return TSDB_CODE_SUCCESS;
2,280,265✔
1531
}
1532

1533
static int32_t tsortFinalizeRegions(SSortHandle* pHandle) {
2,865,699✔
1534
  SSortMemFile* pMemFile = pHandle->pExtRowsMemFile;
2,865,699✔
1535
  size_t        numRegions = taosArrayGetSize(pMemFile->aFileRegions);
2,865,699✔
1536
  if (numRegions != (pMemFile->currRegionId + 1)) {
2,865,215✔
1537
    qError("sort failed at: %s:%d", __func__, __LINE__);
×
1538
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1539
  }
1540
  if (numRegions == 0) {
2,865,699✔
1541
    return TSDB_CODE_SUCCESS;
585,434✔
1542
  }
1543

1544
  int32_t blockReadBytes = (pMemFile->cacheSize / numRegions + 4095) & ~4095;
2,280,265✔
1545
  pMemFile->blockSize = blockReadBytes;
2,279,781✔
1546

1547
  for (int32_t i = 0; i < numRegions; ++i) {
4,558,882✔
1548
    SSortMemFileRegion* pRegion = taosArrayGet(pMemFile->aFileRegions, i);
2,279,101✔
1549
    if (pRegion == NULL) {
2,279,297✔
1550
      return terrno;
×
1551
    }
1552

1553
    pRegion->bufRegOffset = 0;
2,279,297✔
1554
  }
1555

1556
  taosMemoryFree(pMemFile->writeBuf);
2,279,781✔
1557
  pMemFile->writeBuf = NULL;
2,280,265✔
1558
  return TSDB_CODE_SUCCESS;
2,280,265✔
1559
}
1560

1561
static int32_t saveBlockRowToExtRowsMemFile(SSortHandle* pHandle, SSDataBlock* pBlock, int32_t rowIdx,
634,137,568✔
1562
                                            int32_t* pRegionId, int32_t* pOffset, int32_t* pLength) {
1563
  SSortMemFile*       pMemFile = pHandle->pExtRowsMemFile;
634,137,568✔
1564
  SSortMemFileRegion* pRegion = taosArrayGet(pMemFile->aFileRegions, pMemFile->currRegionId);
634,138,052✔
1565
  if (pRegion == NULL) {
634,140,730✔
1566
    return terrno;
×
1567
  }
1568

1569
  {
1570
    if (pMemFile->currRegionOffset + pHandle->extRowBytes >= pMemFile->writeBufSize) {
634,140,730✔
1571
      int32_t writeBytes = pMemFile->currRegionOffset - (pMemFile->writeFileOffset - pRegion->fileOffset);
×
1572
      int32_t ret = fwrite(pMemFile->writeBuf, writeBytes, 1, pMemFile->pTdFile);
×
1573
      if (ret != 1) {
×
1574
        terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
1575
        return terrno;
×
1576
      }
1577
      pMemFile->writeFileOffset = pRegion->fileOffset + pMemFile->currRegionOffset;
×
1578
    }
1579
  }
1580

1581
  *pRegionId = pMemFile->currRegionId;
634,137,408✔
1582
  *pOffset = pMemFile->currRegionOffset;
634,142,808✔
1583
  int32_t writeBufOffset = pMemFile->currRegionOffset - (pMemFile->writeFileOffset - pRegion->fileOffset);
634,143,776✔
1584
  int32_t blockLen = blockRowToBuf(pBlock, rowIdx, pMemFile->writeBuf + writeBufOffset);
634,144,006✔
1585
  *pLength = blockLen;
634,116,928✔
1586

1587
  pMemFile->currRegionOffset += blockLen;
634,116,928✔
1588
  pMemFile->bRegionDirty = true;
634,118,532✔
1589
  return TSDB_CODE_SUCCESS;
634,119,982✔
1590
}
1591

1592
static int32_t appendToRowIndexDataBlock(SSortHandle* pHandle, SSDataBlock* pSource, int32_t* rowIndex) {
634,126,630✔
1593
  int32_t pageId = -1;
634,126,630✔
1594
  int32_t offset = -1;
634,127,114✔
1595
  int32_t length = -1;
634,128,566✔
1596
  int32_t code = 0;
634,136,112✔
1597

1598
  code = saveBlockRowToExtRowsMemFile(pHandle, pSource, *rowIndex, &pageId, &offset, &length);
634,136,112✔
1599
  if (code) {
634,118,874✔
1600
    return code;
×
1601
  }
1602

1603
  SSDataBlock*     pBlock = pHandle->pDataBlock;
634,118,874✔
1604
  SBlockOrderInfo* extRowsTsOrder = taosArrayGet(pHandle->aExtRowsOrders, 0);
634,118,874✔
1605
  if (extRowsTsOrder == NULL) {
634,127,252✔
1606
    return terrno;
×
1607
  }
1608

1609
  SColumnInfoData* pSrcTsCol = taosArrayGet(pSource->pDataBlock, extRowsTsOrder->slotId);
634,127,252✔
1610
  if (pSrcTsCol == NULL) {
634,118,238✔
1611
    return terrno;
×
1612
  }
1613

1614
  SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, 0);
634,118,238✔
1615
  if (pTsCol == NULL) {
634,067,674✔
1616
    return terrno;
×
1617
  }
1618

1619
  char* pData = colDataGetData(pSrcTsCol, *rowIndex);
634,067,674✔
1620
  code = colDataSetVal(pTsCol, pBlock->info.rows, pData, false);
634,118,596✔
1621
  if (code) {
634,136,196✔
1622
    return code;
×
1623
  }
1624

1625
  SColumnInfoData* pRegionIdCol = taosArrayGet(pBlock->pDataBlock, 1);
634,136,196✔
1626
  if (pRegionIdCol == NULL) {
634,134,602✔
1627
    return terrno;
×
1628
  }
1629

1630
  colDataSetInt32(pRegionIdCol, pBlock->info.rows, &pageId);
634,134,602✔
1631

1632
  SColumnInfoData* pOffsetCol = taosArrayGet(pBlock->pDataBlock, 2);
634,133,188✔
1633
  if (pOffsetCol == NULL) {
634,109,906✔
1634
    return terrno;
×
1635
  }
1636

1637
  colDataSetInt32(pOffsetCol, pBlock->info.rows, &offset);
634,109,906✔
1638

1639
  SColumnInfoData* pLengthCol = taosArrayGet(pBlock->pDataBlock, 3);
634,113,396✔
1640
  if (pLengthCol == NULL) {
634,106,476✔
1641
    return terrno;
×
1642
  }
1643

1644
  colDataSetInt32(pLengthCol, pBlock->info.rows, &length);
634,106,476✔
1645

1646
  if (pHandle->bSortPk) {
634,116,692✔
1647
    SBlockOrderInfo* extRowsPkOrder = taosArrayGet(pHandle->aExtRowsOrders, 1);
32,899,770✔
1648
    if (extRowsPkOrder == NULL) {
32,896,110✔
1649
      return terrno;
×
1650
    }
1651

1652
    SColumnInfoData* pSrcPkCol = taosArrayGet(pSource->pDataBlock, extRowsPkOrder->slotId);
32,896,110✔
1653
    if (pSrcPkCol == NULL) {
32,895,444✔
1654
      return terrno;
×
1655
    }
1656

1657
    SColumnInfoData* pPkCol = taosArrayGet(pBlock->pDataBlock, 4);
32,895,444✔
1658
    if (pPkCol == NULL) {
32,891,076✔
1659
      return terrno;
×
1660
    }
1661

1662
    if (colDataIsNull_s(pSrcPkCol, *rowIndex)) {
65,784,648✔
1663
      colDataSetNULL(pPkCol, pBlock->info.rows);
×
1664
    } else {
1665
      char* pPkData = colDataGetData(pSrcPkCol, *rowIndex);
32,893,572✔
1666
      code = colDataSetVal(pPkCol, pBlock->info.rows, pPkData, false);
32,900,394✔
1667
      if (code) {
32,902,266✔
1668
        return code;
×
1669
      }
1670
    }
1671
  }
1672

1673
  pBlock->info.rows += 1;
634,117,112✔
1674
  *rowIndex += 1;
634,118,080✔
1675
  return code;
634,119,574✔
1676
}
1677

1678
static int32_t initRowIdSort(SSortHandle* pHandle) {
2,865,117✔
1679
  SBlockOrderInfo* pkOrder = (pHandle->bSortPk) ? taosArrayGet(pHandle->aExtRowsOrders, 1) : NULL;
2,865,117✔
1680
  SColumnInfoData* extPkCol =
2,859,885✔
1681
      (pHandle->bSortPk) ? taosArrayGet(pHandle->pDataBlock->pDataBlock, pkOrder->slotId) : NULL;
2,865,699✔
1682

1683
  SColumnInfoData pkCol = {0};
2,859,885✔
1684
  SSDataBlock*    pSortInput = NULL;
2,864,535✔
1685
  int32_t         code = createDataBlock(&pSortInput);
2,864,051✔
1686
  if (code) {
2,862,985✔
1687
    return code;
×
1688
  }
1689

1690
  SColumnInfoData tsCol = createColumnInfoData(TSDB_DATA_TYPE_TIMESTAMP, 8, 1);
2,862,985✔
1691
  code = blockDataAppendColInfo(pSortInput, &tsCol);
2,864,633✔
1692
  if (code) {
2,865,699✔
1693
    blockDataDestroy(pSortInput);
×
1694
    return code;
×
1695
  }
1696

1697
  SColumnInfoData regionIdCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 2);
2,865,699✔
1698
  code = blockDataAppendColInfo(pSortInput, &regionIdCol);
2,864,247✔
1699
  if (code) {
2,865,215✔
1700
    blockDataDestroy(pSortInput);
×
1701
    return code;
×
1702
  }
1703

1704
  SColumnInfoData offsetCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 3);
2,865,215✔
1705
  code = blockDataAppendColInfo(pSortInput, &offsetCol);
2,865,117✔
1706
  if (code) {
2,865,699✔
1707
    blockDataDestroy(pSortInput);
×
1708
    return code;
×
1709
  }
1710

1711
  SColumnInfoData lengthCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 4);
2,865,699✔
1712
  code = blockDataAppendColInfo(pSortInput, &lengthCol);
2,864,633✔
1713
  if (code) {
2,865,699✔
1714
    blockDataDestroy(pSortInput);
×
1715
    return code;
×
1716
  }
1717

1718
  if (pHandle->bSortPk) {
2,865,699✔
1719
    pkCol = createColumnInfoData(extPkCol->info.type, extPkCol->info.bytes, 5);
1,537,722✔
1720
    code = blockDataAppendColInfo(pSortInput, &pkCol);
1,537,140✔
1721
    if (code) {
1,538,304✔
1722
      blockDataDestroy(pSortInput);
×
1723
      return code;
×
1724
    }
1725
  }
1726

1727
  blockDataDestroy(pHandle->pDataBlock);
2,865,699✔
1728
  pHandle->pDataBlock = pSortInput;
2,863,665✔
1729

1730
  //  int32_t  rowSize = blockDataGetRowSize(pHandle->pDataBlock);
1731
  //  size_t nCols = taosArrayGetSize(pHandle->pDataBlock->pDataBlock);
1732
  pHandle->pageSize = 256 * 1024;  // 256k
2,864,633✔
1733
  pHandle->numOfPages = 256;
2,864,247✔
1734

1735
  SArray* pOrderInfoList = taosArrayInit(1, sizeof(SBlockOrderInfo));
2,864,149✔
1736
  if (pOrderInfoList == NULL) {
2,859,549✔
1737
    return terrno;
×
1738
  }
1739

1740
  int32_t tsOrder = ((SBlockOrderInfo*)taosArrayGet(pHandle->pSortInfo, 0))->order;
2,859,549✔
1741

1742
  SBlockOrderInfo biTs = {0};
2,864,149✔
1743
  biTs.order = tsOrder;
2,864,051✔
1744
  biTs.slotId = 0;
2,864,051✔
1745
  biTs.nullFirst = (biTs.order == TSDB_ORDER_ASC);
2,864,051✔
1746
  biTs.compFn = getKeyComparFunc(TSDB_DATA_TYPE_TIMESTAMP, biTs.order);
2,864,051✔
1747
  void* p = taosArrayPush(pOrderInfoList, &biTs);
2,865,117✔
1748
  if (p == NULL) {
2,865,117✔
1749
    taosArrayDestroy(pOrderInfoList);
×
1750
    return terrno;
×
1751
  }
1752

1753
  if (pHandle->bSortPk) {
2,865,117✔
1754
    SBlockOrderInfo biPk = {0};
1,537,140✔
1755
    biPk.order = pkOrder->order;
1,537,722✔
1756
    biPk.slotId = 4;
1,536,558✔
1757
    biPk.nullFirst = (biPk.order == TSDB_ORDER_ASC);
1,536,558✔
1758
    biPk.compFn = getKeyComparFunc(pkCol.info.type, biPk.order);
1,536,558✔
1759

1760
    void* px = taosArrayPush(pOrderInfoList, &biPk);
1,537,722✔
1761
    if (px == NULL) {
1,537,722✔
1762
      taosArrayDestroy(pOrderInfoList);
×
1763
      return terrno;
×
1764
    }
1765
  }
1766

1767
  taosArrayDestroy(pHandle->pSortInfo);
2,864,535✔
1768
  pHandle->pSortInfo = pOrderInfoList;
2,860,615✔
1769
  pHandle->cmpParam.pPkOrder = (pHandle->bSortPk) ? taosArrayGet(pHandle->pSortInfo, 1) : NULL;
2,860,615✔
1770
  return TSDB_CODE_SUCCESS;
2,861,337✔
1771
}
1772

1773
int32_t tsortSetSortByRowId(SSortHandle* pHandle, int32_t extRowsMemSize) {
2,865,117✔
1774
  pHandle->extRowBytes =
2,864,051✔
1775
      blockDataGetRowSize(pHandle->pDataBlock) + taosArrayGetSize(pHandle->pDataBlock->pDataBlock) + sizeof(int32_t);
2,865,117✔
1776
  pHandle->extRowsMemSize = extRowsMemSize;
2,865,117✔
1777
  pHandle->aExtRowsOrders = taosArrayDup(pHandle->pSortInfo, NULL);
2,864,149✔
1778
  if (pHandle->aExtRowsOrders == NULL) {
2,865,117✔
1779
    return terrno;
×
1780
  }
1781

1782
  int32_t code = initRowIdSort(pHandle);
2,865,215✔
1783
  if (code) {
2,862,887✔
1784
    return code;
×
1785
  }
1786

1787
  if (!osTempSpaceAvailable()) {
2,862,887✔
1788
    qError("create sort mem file failed since %s, tempDir:%s", terrstr(), tsTempDir);
×
1789
    return TSDB_CODE_NO_DISKSPACE;
×
1790
  }
1791

1792
  code = createSortMemFile(pHandle);
2,864,633✔
1793
  pHandle->bSortByRowId = true;
2,865,215✔
1794
  return code;
2,865,215✔
1795
}
1796

1797
typedef struct SBlkMergeSupport {
1798
  int64_t** aTs;
1799
  int32_t*  aRowIdx;
1800
  int32_t   tsOrder;
1801

1802
  SBlockOrderInfo* pPkOrder;
1803
  SSDataBlock**    aBlks;
1804
} SBlkMergeSupport;
1805

1806
static int32_t blockCompareTsFn(const void* pLeft, const void* pRight, void* param) {
2,147,483,647✔
1807
  int32_t left = *(int32_t*)pLeft;
2,147,483,647✔
1808
  int32_t right = *(int32_t*)pRight;
2,147,483,647✔
1809

1810
  SBlkMergeSupport* pSup = (SBlkMergeSupport*)param;
2,147,483,647✔
1811
  if (pSup->aRowIdx[left] == -1) {
2,147,483,647✔
1812
    return 1;
2,147,483,647✔
1813
  } else if (pSup->aRowIdx[right] == -1) {
2,147,483,647✔
1814
    return -1;
96,604,642✔
1815
  }
1816

1817
  int64_t leftTs = pSup->aTs[left][pSup->aRowIdx[left]];
2,147,483,647✔
1818
  int64_t rightTs = pSup->aTs[right][pSup->aRowIdx[right]];
2,147,483,647✔
1819

1820
  int32_t ret = leftTs > rightTs ? 1 : ((leftTs < rightTs) ? -1 : 0);
2,147,483,647✔
1821
  if (pSup->tsOrder == TSDB_ORDER_DESC) {
2,147,483,647✔
1822
    ret = -1 * ret;
2,147,483,647✔
1823
  }
1824
  return ret;
2,147,483,647✔
1825
}
1826

1827
static int32_t blockCompareTsPkFn(const void* pLeft, const void* pRight, void* param) {
90,726,850✔
1828
  int32_t left = *(int32_t*)pLeft;
90,726,850✔
1829
  int32_t right = *(int32_t*)pRight;
90,726,850✔
1830

1831
  SBlkMergeSupport* pSup = (SBlkMergeSupport*)param;
90,728,014✔
1832
  if (pSup->aRowIdx[left] == -1) {
90,728,014✔
1833
    return 1;
3,353,303✔
1834
  } else if (pSup->aRowIdx[right] == -1) {
87,377,705✔
1835
    return -1;
3,210,544✔
1836
  }
1837

1838
  int64_t leftTs = pSup->aTs[left][pSup->aRowIdx[left]];
84,168,949✔
1839
  int64_t rightTs = pSup->aTs[right][pSup->aRowIdx[right]];
84,168,367✔
1840

1841
  int32_t ret = leftTs > rightTs ? 1 : ((leftTs < rightTs) ? -1 : 0);
84,168,325✔
1842
  if (pSup->tsOrder == TSDB_ORDER_DESC) {
84,168,325✔
1843
    ret = -1 * ret;
27,627,216✔
1844
  }
1845
  if (ret == 0 && pSup->pPkOrder) {
84,167,743✔
1846
    ret = tsortComparBlockCell(pSup->aBlks[left], pSup->aBlks[right], pSup->aRowIdx[left], pSup->aRowIdx[right],
69,697,063✔
1847
                               pSup->pPkOrder);
69,697,687✔
1848
  }
1849
  return ret;
84,162,379✔
1850
}
1851

1852
static int32_t appendDataBlockToPageBuf(SSortHandle* pHandle, SSDataBlock* blk, SArray* aPgId) {
38,086,722✔
1853
  int32_t pageId = -1;
38,086,722✔
1854
  void*   pPage = getNewBufPage(pHandle->pBuf, &pageId);
38,087,187✔
1855
  if (pPage == NULL) {
38,085,240✔
1856
    return terrno;
×
1857
  }
1858

1859
  void* px = taosArrayPush(aPgId, &pageId);
38,088,117✔
1860
  if (px == NULL) {
38,088,117✔
1861
    return terrno;
×
1862
  }
1863

1864
  int32_t size = blockDataGetSize(blk) + sizeof(int32_t) + taosArrayGetSize(blk->pDataBlock) * sizeof(int32_t);
38,088,117✔
1865
  if (size > getBufPageSize(pHandle->pBuf)) {
38,087,119✔
1866
    qError("sort failed at: %s:%d", __func__, __LINE__);
×
1867
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1868
  }
1869

1870
  int32_t code = blockDataToBuf(pPage, blk);
38,078,272✔
1871

1872
  setBufPageDirty(pPage, true);
38,081,802✔
1873
  releaseBufPage(pHandle->pBuf, pPage);
38,077,794✔
1874

1875
  return code;
38,082,592✔
1876
}
1877

1878
static int32_t getPageBufIncForRow(SSDataBlock* pSrcBlock, int32_t srcRowIndex, int32_t dstRowIndex) {
2,147,483,647✔
1879
  int32_t size = 0;
2,147,483,647✔
1880
  int32_t numCols = taosArrayGetSize(pSrcBlock->pDataBlock);
2,147,483,647✔
1881

1882
  if (!pSrcBlock->info.hasVarCol) {
2,147,483,647✔
1883
    size += numCols * ((dstRowIndex & 0x7) == 0 ? 1 : 0);
2,147,483,647✔
1884
    size += blockDataGetRowSize(pSrcBlock);
2,147,483,647✔
1885
  } else {
1886
    for (int32_t i = 0; i < numCols; ++i) {
2,147,483,647✔
1887
      SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pSrcBlock->pDataBlock, i);
2,147,483,647✔
1888
      if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
2,147,483,647✔
1889
        if ((pColInfoData->varmeta.offset[srcRowIndex] != -1) && (pColInfoData->pData)) {
463,959,010✔
1890
          char* p = colDataGetData(pColInfoData, srcRowIndex);
396,985,457✔
1891

1892
          size += calcStrBytesByType(pColInfoData->info.type, p);
396,990,209✔
1893
          // if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
1894
          //   size += getJsonValueLen(p);
1895
          // } else {
1896
          //   size += varDataTLen(p);
1897
          // }
1898
        }
1899

1900
        size += sizeof(pColInfoData->varmeta.offset[0]);
464,230,822✔
1901
      } else {
1902
        size += pColInfoData->info.bytes;
1,748,495,895✔
1903

1904
        if (((dstRowIndex)&0x07) == 0) {
1,748,313,641✔
1905
          size += 1;  // bitmap
237,115,174✔
1906
        }
1907
      }
1908
    }
1909
  }
1910

1911
  return size;
2,147,483,647✔
1912
}
1913

1914
static int32_t getPageBufIncForRowIdSort(SSDataBlock* pDstBlock, int32_t srcRowIndex, int32_t dstRowIndex,
634,101,572✔
1915
                                         SColumnInfoData* pPkCol) {
1916
  int32_t size = 0;
634,101,572✔
1917
  int32_t numOfCols = blockDataGetNumOfCols(pDstBlock);
634,101,572✔
1918

1919
  if (pPkCol == NULL) {  // no var column
634,094,260✔
1920
    if (!((numOfCols == 4) && (!pDstBlock->info.hasVarCol))) {
601,195,114✔
1921
      qError("sort failed at: %s:%d", __func__, __LINE__);
1,452✔
1922
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1923
    }
1924

1925
    size += numOfCols * ((dstRowIndex & 0x7) == 0 ? 1 : 0);
601,196,570✔
1926
    size += blockDataGetRowSize(pDstBlock);
601,196,570✔
1927
  } else {
1928
    if (numOfCols != 5) {
32,899,146✔
1929
      qError("sort failed at: %s:%d", __func__, __LINE__);
×
1930
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1931
    }
1932

1933
    size += (numOfCols - 1) * (((dstRowIndex & 0x7) == 0) ? 1 : 0);
32,899,146✔
1934
    for (int32_t i = 0; i < numOfCols - 1; ++i) {
164,473,392✔
1935
      SColumnInfoData* pColInfo = TARRAY_GET_ELEM(pDstBlock->pDataBlock, i);
131,572,332✔
1936
      size += pColInfo->info.bytes;
131,576,160✔
1937
    }
1938

1939
    // handle the pk column, the last column, may be the var char column
1940
    if (IS_VAR_DATA_TYPE(pPkCol->info.type)) {
32,901,060✔
1941
      if ((pPkCol->varmeta.offset[srcRowIndex] != -1) && (pPkCol->pData)) {
11,712,930✔
1942
        char* p = colDataGetData(pPkCol, srcRowIndex);
11,716,632✔
1943
        if (IS_STR_DATA_BLOB(pPkCol->info.type)) {
11,716,008✔
1944
          size += blobDataTLen(p);
×
1945
        } else {
1946
          size += varDataTLen(p);
11,716,632✔
1947
        }
1948
      }
1949

1950
      size += sizeof(pPkCol->varmeta.offset[0]);
11,716,632✔
1951
    } else {
1952
      size += pPkCol->info.bytes;
21,187,464✔
1953
      if (((dstRowIndex)&0x07) == 0) {
21,187,506✔
1954
        size += 1;  // bitmap
2,751,600✔
1955
      }
1956
    }
1957
  }
1958

1959
  return size;
634,098,284✔
1960
}
1961

1962
static int32_t getBufIncForNewRow(SSortHandle* pHandle, int32_t dstRowIndex, SSDataBlock* pSrcBlock,
2,147,483,647✔
1963
                                  int32_t srcRowIndex) {
1964
  int32_t inc = 0;
2,147,483,647✔
1965

1966
  if (pHandle->bSortByRowId) {
2,147,483,647✔
1967
    SColumnInfoData* pPkCol = NULL;
634,104,306✔
1968

1969
    // there may be varchar column exists, so we need to get the pk info, and then calculate the row length
1970
    if (pHandle->bSortPk) {
634,104,306✔
1971
      SBlockOrderInfo* extRowsPkOrder = taosArrayGet(pHandle->aExtRowsOrders, 1);
32,899,728✔
1972
      pPkCol = taosArrayGet(pSrcBlock->pDataBlock, extRowsPkOrder->slotId);
32,897,316✔
1973
    }
1974

1975
    inc = getPageBufIncForRowIdSort(pHandle->pDataBlock, srcRowIndex, dstRowIndex, pPkCol);
634,101,428✔
1976
  } else {
1977
    inc = getPageBufIncForRow(pSrcBlock, srcRowIndex, dstRowIndex);
2,147,483,647✔
1978
  }
1979

1980
  return inc;
2,147,483,647✔
1981
}
1982

1983
static int32_t initMergeSup(SBlkMergeSupport* pSup, SArray* pBlockList, int32_t tsOrder, int32_t tsSlotId,
11,481,305✔
1984
                            SBlockOrderInfo* pPkOrderInfo) {
1985
  int32_t code = TSDB_CODE_SUCCESS;
11,481,305✔
1986
  int32_t lino = 0;
11,481,305✔
1987
  memset(pSup, 0, sizeof(SBlkMergeSupport));
11,481,305✔
1988

1989
  int32_t numOfBlocks = taosArrayGetSize(pBlockList);
11,481,305✔
1990

1991
  pSup->aRowIdx = taosMemoryCalloc(numOfBlocks, sizeof(int32_t));
11,481,897✔
1992
  QUERY_CHECK_NULL(pSup->aRowIdx, code, lino, _end, terrno);
11,482,768✔
1993

1994
  pSup->aTs = taosMemoryCalloc(numOfBlocks, sizeof(int64_t*));
11,481,838✔
1995
  QUERY_CHECK_NULL(pSup->aTs, code, lino, _end, terrno);
11,483,233✔
1996

1997
  pSup->tsOrder = tsOrder;
11,480,908✔
1998
  pSup->aBlks = taosMemoryCalloc(numOfBlocks, sizeof(SSDataBlock*));
11,482,768✔
1999
  QUERY_CHECK_NULL(pSup->aBlks, code, lino, _end, terrno);
11,481,721✔
2000

2001
  for (int32_t i = 0; i < numOfBlocks; ++i) {
128,700,845✔
2002
    SSDataBlock*     pBlock = taosArrayGetP(pBlockList, i);
117,219,007✔
2003
    SColumnInfoData* col = taosArrayGet(pBlock->pDataBlock, tsSlotId);
117,221,043✔
2004
    pSup->aTs[i] = (int64_t*)col->pData;
117,221,102✔
2005
    pSup->aRowIdx[i] = 0;
117,221,102✔
2006
    pSup->aBlks[i] = pBlock;
117,219,648✔
2007
  }
2008

2009
  pSup->pPkOrder = pPkOrderInfo;
11,481,838✔
2010

2011
_end:
11,484,163✔
2012
  if (code != TSDB_CODE_SUCCESS) {
11,484,163✔
2013
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2014
  }
2015
  return code;
11,481,432✔
2016
}
2017

2018
static void cleanupMergeSup(SBlkMergeSupport* pSup) {
11,479,978✔
2019
  taosMemoryFree(pSup->aRowIdx);
11,479,978✔
2020
  taosMemoryFree(pSup->aTs);
11,478,880✔
2021
  taosMemoryFree(pSup->aBlks);
11,476,354✔
2022
}
11,476,400✔
2023

2024
static int32_t getTotalRows(SArray* pBlockList) {
11,481,838✔
2025
  int32_t totalRows = 0;
11,481,838✔
2026

2027
  for (int32_t i = 0; i < taosArrayGetSize(pBlockList); ++i) {
128,701,526✔
2028
    SSDataBlock* blk = taosArrayGetP(pBlockList, i);
117,219,282✔
2029
    totalRows += blk->info.rows;
117,218,352✔
2030
  }
2031

2032
  return totalRows;
11,482,284✔
2033
}
2034

2035
static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* aExtSrc) {
11,481,838✔
2036
  int32_t code = TSDB_CODE_SUCCESS;
11,481,838✔
2037
  int32_t pageHeaderSize = sizeof(int32_t) + sizeof(int32_t) * blockDataGetNumOfCols(pHandle->pDataBlock);
11,481,838✔
2038
  int32_t rowCap = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize, pageHeaderSize);
11,484,163✔
2039
  if (rowCap < 0) {
11,481,838✔
2040
    return terrno;
×
2041
  }
2042

2043
  code = blockDataEnsureCapacity(pHandle->pDataBlock, rowCap);
11,481,838✔
2044
  if (code) {
11,482,768✔
2045
    return code;
×
2046
  }
2047

2048
  blockDataCleanup(pHandle->pDataBlock);
11,482,768✔
2049
  SBlkMergeSupport sup = {0};
11,484,163✔
2050

2051
  SBlockOrderInfo* pOrigBlockTsOrder =
22,964,606✔
2052
      (!pHandle->bSortByRowId) ? taosArrayGet(pHandle->pSortInfo, 0) : taosArrayGet(pHandle->aExtRowsOrders, 0);
11,484,163✔
2053
  if (pOrigBlockTsOrder == NULL) {
11,482,768✔
2054
    return terrno;
×
2055
  }
2056

2057
  SBlockOrderInfo* pHandleBlockTsOrder = taosArrayGet(pHandle->pSortInfo, 0);
11,482,768✔
2058
  if (pHandleBlockTsOrder == NULL) {
11,481,838✔
2059
    return terrno;
×
2060
  }
2061

2062
  SBlockOrderInfo* pOrigBlockPkOrder = NULL;
11,481,838✔
2063
  if (pHandle->bSortPk) {
11,481,838✔
2064
    pOrigBlockPkOrder =
5,848,112✔
2065
        (!pHandle->bSortByRowId) ? taosArrayGet(pHandle->pSortInfo, 1) : taosArrayGet(pHandle->aExtRowsOrders, 1);
2,924,056✔
2066
    if (pOrigBlockPkOrder == NULL) {
2,924,056✔
2067
      return terrno;
×
2068
    }
2069
  }
2070

2071
  code = initMergeSup(&sup, aBlk, pOrigBlockTsOrder->order, pOrigBlockTsOrder->slotId, pOrigBlockPkOrder);
11,481,373✔
2072
  if (code) {
11,481,897✔
2073
    return code;
×
2074
  }
2075

2076
  int32_t totalRows = getTotalRows(aBlk);
11,481,897✔
2077

2078
  SMultiwayMergeTreeInfo* pTree = NULL;
11,480,307✔
2079
  __merge_compare_fn_t    mergeCompareFn = (!pHandle->bSortPk) ? blockCompareTsFn : blockCompareTsPkFn;
11,481,354✔
2080

2081
  code = tMergeTreeCreate(&pTree, taosArrayGetSize(aBlk), &sup, mergeCompareFn);
11,479,959✔
2082
  if (TSDB_CODE_SUCCESS != code) {
11,482,768✔
2083
    cleanupMergeSup(&sup);
×
2084
    return code;
×
2085
  }
2086

2087
  SArray* aPgId = taosArrayInit(8, sizeof(int32_t));
11,482,768✔
2088
  if (aPgId == NULL) {
11,480,908✔
2089
    goto _error;
×
2090
  }
2091

2092
  int32_t nRows = 0;
11,480,908✔
2093
  int32_t nMergedRows = 0;
11,480,908✔
2094
  bool    mergeLimitReached = false;
11,480,908✔
2095
  size_t  blkPgSz = pageHeaderSize;
11,480,908✔
2096
  int64_t lastPageBufTs = (pHandleBlockTsOrder->order == TSDB_ORDER_ASC) ? INT64_MAX : INT64_MIN;
11,480,908✔
2097

2098
  while (nRows < totalRows) {
2,147,483,647✔
2099
    int32_t      minIdx = tMergeTreeGetChosenIndex(pTree);
2,147,483,647✔
2100
    SSDataBlock* minBlk = taosArrayGetP(aBlk, minIdx);
2,147,483,647✔
2101
    int32_t      minRow = sup.aRowIdx[minIdx];
2,147,483,647✔
2102

2103
    int32_t bufInc = getBufIncForNewRow(pHandle, pHandle->pDataBlock->info.rows, minBlk, minRow);
2,147,483,647✔
2104

2105
    if (blkPgSz <= pHandle->pageSize && blkPgSz + bufInc > pHandle->pageSize) {
2,147,483,647✔
2106
      SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, pHandleBlockTsOrder->slotId);
26,682,350✔
2107
      lastPageBufTs = ((int64_t*)tsCol->pData)[pHandle->pDataBlock->info.rows - 1];
26,682,350✔
2108
      code = appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId);
26,682,350✔
2109
      if (code != TSDB_CODE_SUCCESS) {
26,680,751✔
2110
        goto _error;
×
2111
      }
2112

2113
      nMergedRows += pHandle->pDataBlock->info.rows;
26,680,751✔
2114
      blockDataCleanup(pHandle->pDataBlock);
26,681,817✔
2115
      blkPgSz = pageHeaderSize;
26,681,817✔
2116

2117
      bufInc = getBufIncForNewRow(pHandle, 0, minBlk, minRow);
26,681,817✔
2118

2119
      if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
26,681,284✔
2120
        mergeLimitReached = true;
77,931✔
2121
        if ((lastPageBufTs < pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_ASC) ||
77,931✔
2122
            (lastPageBufTs > pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_DESC)) {
46,880✔
2123
          pHandle->currMergeLimitTs = lastPageBufTs;
77,931✔
2124
        }
2125

2126
        break;
77,931✔
2127
      }
2128
    }
2129

2130
    code = blockDataEnsureCapacity(pHandle->pDataBlock, pHandle->pDataBlock->info.rows + 1);
2,147,483,647✔
2131
    if (code) {
2,147,483,647✔
2132
      goto _error;
×
2133
    }
2134

2135
    if (pHandle->bSortByRowId) {
2,147,483,647✔
2136
      code = appendToRowIndexDataBlock(pHandle, minBlk, &minRow);
634,111,824✔
2137
    } else {
2138
      code = appendOneRowToDataBlock(pHandle->pDataBlock, minBlk, &minRow);
2,147,483,647✔
2139
    }
2140

2141
    if (code) {
2,147,483,647✔
2142
      goto _error;
×
2143
    }
2144

2145
    blkPgSz += bufInc;
2,147,483,647✔
2146
    if (blkPgSz != blockDataGetSize(pHandle->pDataBlock) + pageHeaderSize) {
2,147,483,647✔
2147
      qError("sort failed at: %s:%d", __func__, __LINE__);
×
2148
      goto _error;
×
2149
    }
2150

2151
    ++nRows;
2,147,483,647✔
2152

2153
    if (sup.aRowIdx[minIdx] == minBlk->info.rows - 1) {
2,147,483,647✔
2154
      sup.aRowIdx[minIdx] = -1;
111,215,595✔
2155
    } else {
2156
      ++sup.aRowIdx[minIdx];
2,147,483,647✔
2157
    }
2158
    code = tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
2,147,483,647✔
2159
    if (TSDB_CODE_SUCCESS != code) {
2,147,483,647✔
2160
      goto _error;
×
2161
    }
2162
  }
2163

2164
  if (pHandle->pDataBlock->info.rows > 0) {
11,484,163✔
2165
    if (!mergeLimitReached) {
11,406,232✔
2166
      SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, pHandleBlockTsOrder->slotId);
11,405,287✔
2167
      lastPageBufTs = ((int64_t*)tsCol->pData)[pHandle->pDataBlock->info.rows - 1];
11,402,958✔
2168
      code = appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId);
11,402,977✔
2169
      if (code != TSDB_CODE_SUCCESS) {
11,401,240✔
2170
        goto _error;
×
2171
      }
2172
      nMergedRows += pHandle->pDataBlock->info.rows;
11,401,240✔
2173
      if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
11,403,739✔
2174
        mergeLimitReached = true;
197,993✔
2175
        if ((lastPageBufTs < pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_ASC) ||
197,993✔
2176
            (lastPageBufTs > pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_DESC)) {
140,567✔
2177
          pHandle->currMergeLimitTs = lastPageBufTs;
197,509✔
2178
        }
2179
      }
2180
    }
2181
    blockDataCleanup(pHandle->pDataBlock);
11,403,501✔
2182
  }
2183

2184
  SSDataBlock* pMemSrcBlk = NULL;
11,480,326✔
2185
  code = createOneDataBlock(pHandle->pDataBlock, false, &pMemSrcBlk);
11,480,305✔
2186
  if (code) goto _error;
11,483,116✔
2187

2188
  code = doAddNewExternalMemSource(pHandle->pBuf, aExtSrc, pMemSrcBlk, &pHandle->sourceId, aPgId);
11,483,116✔
2189
  if (code != TSDB_CODE_SUCCESS) {
11,478,998✔
2190
    blockDataDestroy(pMemSrcBlk);
×
2191
    goto _error;
×
2192
  }
2193

2194
  cleanupMergeSup(&sup);
11,478,998✔
2195
  tMergeTreeDestroy(&pTree);
11,475,005✔
2196

2197
  return code;
11,476,819✔
2198

2199
_error:
42✔
2200
  tMergeTreeDestroy(&pTree);
×
2201
  cleanupMergeSup(&sup);
×
2202
  if (aPgId) taosArrayDestroy(aPgId);
×
2203
  return code;
×
2204
}
2205

2206
static int32_t getRowsBlockWithinMergeLimit(const SSortHandle* pHandle, SSHashObj* mTableNumRows, SSDataBlock* pOrigBlk,
3,605,656✔
2207
                                            bool* pExtractedBlock, bool* pSkipBlock, SSDataBlock** pRes) {
2208
  int64_t nRows = 0;
3,605,656✔
2209
  int64_t prevRows = 0;
3,605,656✔
2210
  int32_t code = 0;
3,605,656✔
2211

2212
  *pRes = NULL;
3,605,656✔
2213

2214
  void* pNum = tSimpleHashGet(mTableNumRows, &pOrigBlk->info.id.uid, sizeof(pOrigBlk->info.id.uid));
3,605,656✔
2215
  if (pNum == NULL) {
3,605,656✔
2216
    prevRows = 0;
3,595,700✔
2217
    nRows = pOrigBlk->info.rows;
3,595,700✔
2218
    code = tSimpleHashPut(mTableNumRows, &pOrigBlk->info.id.uid, sizeof(pOrigBlk->info.id.uid), &nRows, sizeof(nRows));
3,595,700✔
2219
    if (code) {
3,595,700✔
2220
      return code;
×
2221
    }
2222
  } else {
2223
    prevRows = *(int64_t*)pNum;
9,956✔
2224
    *(int64_t*)pNum = *(int64_t*)pNum + pOrigBlk->info.rows;
9,956✔
2225
    nRows = *(int64_t*)pNum;
9,956✔
2226
  }
2227

2228
  int64_t keepRows = pOrigBlk->info.rows;
3,605,656✔
2229
  if (nRows >= pHandle->mergeLimit) {
3,605,656✔
2230
    if (pHandle->mergeLimitReachedFn) {
3,392,158✔
2231
      pHandle->mergeLimitReachedFn(pOrigBlk->info.id.uid, pHandle->mergeLimitReachedParam);
3,392,158✔
2232
    }
2233
    keepRows = pHandle->mergeLimit > prevRows ? (pHandle->mergeLimit - prevRows) : 0;
3,392,158✔
2234
  }
2235

2236
  if (keepRows == 0) {
3,605,656✔
2237
    *pSkipBlock = true;
×
2238
    *pRes = pOrigBlk;
×
2239
  }
2240

2241
  *pSkipBlock = false;
3,605,656✔
2242
  SSDataBlock* pBlock = NULL;
3,605,656✔
2243
  if (keepRows != pOrigBlk->info.rows) {
3,605,656✔
2244
    code = blockDataExtractBlock(pOrigBlk, 0, keepRows, &pBlock);
3,368,935✔
2245
    if (code) {
3,368,935✔
2246
      return code;
×
2247
    }
2248

2249
    *pExtractedBlock = true;
3,368,935✔
2250
  } else {
2251
    *pExtractedBlock = false;
236,721✔
2252
    pBlock = pOrigBlk;
236,721✔
2253
  }
2254

2255
  *pRes = pBlock;
3,605,656✔
2256
  return code;
3,605,656✔
2257
}
2258

2259
static void freeHelp(void* param) {
117,215,202✔
2260
  SSDataBlock** ptr = param;
117,215,202✔
2261
  if (*ptr != NULL) {
117,215,202✔
2262
    blockDataDestroy(*ptr);
117,215,261✔
2263
  }
2264
}
117,216,653✔
2265

2266
static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
12,931,755✔
2267
  int32_t          szSort = 0;
12,931,755✔
2268
  int32_t          code = 0;
12,931,755✔
2269
  int32_t          lino = 0;
12,931,755✔
2270
  size_t           nSrc = taosArrayGetSize(pHandle->pOrderedSource);
12,931,755✔
2271
  SArray*          aExtSrc = NULL;
12,918,587✔
2272
  SArray*          aBlkSort = NULL;
12,918,587✔
2273
  SSHashObj*       mTableNumRows = NULL;
12,918,587✔
2274
  SSHashObj*       mUidBlk = NULL;
12,918,587✔
2275
  SBlockOrderInfo* pOrigTsOrder = NULL;
12,918,587✔
2276

2277
  aExtSrc = taosArrayInit(nSrc, POINTER_BYTES);
12,918,587✔
2278
  QUERY_CHECK_NULL(aExtSrc, code, lino, _err, terrno);
12,926,182✔
2279

2280
  mTableNumRows = tSimpleHashInit(8192, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT));
12,926,182✔
2281
  QUERY_CHECK_NULL(mTableNumRows, code, lino, _err, terrno);
12,934,784✔
2282

2283
  aBlkSort = taosArrayInit(8, POINTER_BYTES);
12,934,784✔
2284
  QUERY_CHECK_NULL(aBlkSort, code, lino, _err, terrno);
12,932,281✔
2285

2286
  mUidBlk = tSimpleHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT));
12,932,281✔
2287
  QUERY_CHECK_NULL(mUidBlk, code, lino, _err, terrno);
12,933,347✔
2288

2289
  size_t maxBufSize = (pHandle->bSortByRowId) ? pHandle->extRowsMemSize : (pHandle->numOfPages * pHandle->pageSize);
12,933,347✔
2290
  code = createPageBuf(pHandle);
12,933,952✔
2291
  QUERY_CHECK_CODE(code, lino, _err);
12,930,118✔
2292

2293
  SSortSource* pSrc = taosArrayGetP(pHandle->pOrderedSource, 0);
12,930,118✔
2294
  QUERY_CHECK_NULL(pSrc, code, lino, _err, terrno);
12,932,056✔
2295

2296
  pOrigTsOrder =
25,860,541✔
2297
      (!pHandle->bSortByRowId) ? taosArrayGet(pHandle->pSortInfo, 0) : taosArrayGet(pHandle->aExtRowsOrders, 0);
12,932,056✔
2298
  QUERY_CHECK_NULL(pOrigTsOrder, code, lino, _err, terrno);
12,927,534✔
2299

2300
  pHandle->currMergeLimitTs = (pOrigTsOrder->order == TSDB_ORDER_ASC) ? INT64_MAX : INT64_MIN;
12,927,534✔
2301

2302
  while (1) {
118,166,130✔
2303
    bool         bExtractedBlock = false;
131,092,774✔
2304
    bool         bSkipBlock = false;
131,092,086✔
2305
    SSDataBlock* pBlk = NULL;
131,090,488✔
2306

2307
    code = pHandle->fetchfp(pSrc->param, &pBlk);
131,091,462✔
2308
    QUERY_CHECK_CODE(code, lino, _err);
131,099,742✔
2309

2310
    if (pBlk != NULL && pHandle->mergeLimit > 0) {
131,041,170✔
2311
      SSDataBlock* p = NULL;
3,605,656✔
2312
      code = getRowsBlockWithinMergeLimit(pHandle, mTableNumRows, pBlk, &bExtractedBlock, &bSkipBlock, &p);
3,605,656✔
2313
      if (bSkipBlock || code != 0) {
3,605,656✔
2314
        continue;
×
2315
      }
2316

2317
      pBlk = p;
3,605,656✔
2318
    }
2319

2320
    if (pBlk != NULL) {
131,040,240✔
2321
      SColumnInfoData* tsCol = taosArrayGet(pBlk->pDataBlock, pOrigTsOrder->slotId);
118,166,266✔
2322
      QUERY_CHECK_NULL(tsCol, code, lino, _err, terrno);
118,165,336✔
2323

2324
      int64_t firstRowTs = *(int64_t*)tsCol->pData;
118,165,336✔
2325
      if ((pOrigTsOrder->order == TSDB_ORDER_ASC && firstRowTs > pHandle->currMergeLimitTs) ||
118,167,661✔
2326
          (pOrigTsOrder->order == TSDB_ORDER_DESC && firstRowTs < pHandle->currMergeLimitTs)) {
118,166,538✔
2327
        if (bExtractedBlock) {
×
2328
          blockDataDestroy(pBlk);
×
2329
        }
2330
        continue;
×
2331
      }
2332
    }
2333

2334
    if (pBlk != NULL) {
131,039,503✔
2335
      szSort += blockDataGetSize(pBlk);
118,162,671✔
2336
      void* ppBlk = tSimpleHashGet(mUidBlk, &pBlk->info.id.uid, sizeof(pBlk->info.id.uid));
118,166,663✔
2337
      if (ppBlk != NULL) {
118,162,943✔
2338
        SSDataBlock* tBlk = *(SSDataBlock**)(ppBlk);
944,005✔
2339
        code = blockDataMerge(tBlk, pBlk);
944,005✔
2340
        QUERY_CHECK_CODE(code, lino, _err);
944,005✔
2341

2342
        if (bExtractedBlock) {
944,005✔
2343
          blockDataDestroy(pBlk);
524✔
2344
        }
2345
      } else {
2346
        SSDataBlock* tBlk = NULL;
117,218,938✔
2347
        if (bExtractedBlock) {
117,221,263✔
2348
          tBlk = pBlk;
3,368,411✔
2349
        } else {
2350
          code = createOneDataBlock(pBlk, true, &tBlk);
113,852,852✔
2351
          QUERY_CHECK_CODE(code, lino, _err);
113,849,936✔
2352
        }
2353

2354
        code = tSimpleHashPut(mUidBlk, &pBlk->info.id.uid, sizeof(pBlk->info.id.uid), &tBlk, POINTER_BYTES);
117,218,347✔
2355
        if (code != TSDB_CODE_SUCCESS) {
117,221,524✔
2356
          blockDataDestroy(tBlk);
×
2357
        }
2358
        QUERY_CHECK_CODE(code, lino, _err);
117,210,603✔
2359

2360
        void* px = taosArrayPush(aBlkSort, &tBlk);
117,223,656✔
2361
        if (px == NULL) {
117,223,656✔
2362
          blockDataDestroy(tBlk);
×
2363
        }
2364
        QUERY_CHECK_NULL(px, code, lino, _err, terrno);
117,223,656✔
2365
      }
2366
    }
2367

2368
    if ((pBlk != NULL && szSort > maxBufSize) || (pBlk == NULL && szSort > 0)) {
131,044,493✔
2369
      tSimpleHashClear(mUidBlk);
11,483,233✔
2370

2371
      int64_t p = taosGetTimestampUs();
11,482,303✔
2372
      if (pHandle->bSortByRowId) {
11,482,303✔
2373
        code = tsortOpenRegion(pHandle);
2,280,265✔
2374
        QUERY_CHECK_CODE(code, lino, _err);
2,280,265✔
2375
      }
2376

2377
      code = sortBlocksToExtSource(pHandle, aBlkSort, aExtSrc);
11,482,768✔
2378
      QUERY_CHECK_CODE(code, lino, _err);
11,476,304✔
2379

2380
      if (pHandle->bSortByRowId) {
11,476,304✔
2381
        code = tsortCloseRegion(pHandle);  // ignore this error code
2,280,265✔
2382
      }
2383

2384
      int64_t el = taosGetTimestampUs() - p;
11,478,660✔
2385
      pHandle->sortElapsed += el;
11,478,660✔
2386
      taosArrayClearEx(aBlkSort, freeHelp);
11,480,151✔
2387

2388
      szSort = 0;
11,482,864✔
2389
      qDebug("%s source %zu created", pHandle->idStr, taosArrayGetSize(aExtSrc));
11,482,864✔
2390
    }
2391

2392
    if (pBlk == NULL) {
131,041,915✔
2393
      break;
12,877,762✔
2394
    }
2395

2396
    if (tsortIsClosed(pHandle)) {
118,164,153✔
2397
      break;
×
2398
    }
2399
  }
2400

2401
  tsortClearOrderedSource(pHandle->pOrderedSource, NULL, NULL);
12,877,297✔
2402
  if (!tsortIsClosed(pHandle)) {
12,874,042✔
2403
    void* px = taosArrayAddAll(pHandle->pOrderedSource, aExtSrc);
12,878,227✔
2404
    QUERY_CHECK_NULL(px, code, lino, _err, terrno);
12,876,367✔
2405
  }
2406

2407
  if (pHandle->bSortByRowId) {
12,875,902✔
2408
    code = tsortFinalizeRegions(pHandle);
2,865,699✔
2409
  }
2410

2411
  pHandle->type = SORT_SINGLESOURCE_SORT;
12,876,367✔
2412

2413
_err:
12,936,818✔
2414
  if (code) {
12,936,818✔
2415
    qError("%s %s failed at line %d since %s", pHandle->idStr, __func__, lino, tstrerror(code));
58,572✔
2416
  }
2417

2418
  if (aExtSrc) {
12,936,818✔
2419
    taosArrayDestroy(aExtSrc);
12,934,822✔
2420
  }
2421
  if (aBlkSort) {
12,931,305✔
2422
    taosArrayDestroyEx(aBlkSort, freeHelp);
12,928,844✔
2423
  }
2424
  if (mTableNumRows) {
12,931,536✔
2425
    tSimpleHashCleanup(mTableNumRows);
12,930,122✔
2426
  }
2427
  if (mUidBlk) {
12,934,958✔
2428
    tSimpleHashCleanup(mUidBlk);
12,933,544✔
2429
  }
2430
  return code;
12,930,704✔
2431
}
2432

2433
static void freeSortSource(void* p) {
42,985,805✔
2434
  SSortSource** pSource = (SSortSource**)p;
42,985,805✔
2435
  if (NULL == pSource || NULL == *pSource) {
42,985,805✔
2436
    return;
×
2437
  }
2438

2439
  if ((*pSource)->pageIdList) {
42,988,173✔
2440
    taosArrayDestroy((*pSource)->pageIdList);
×
2441
  }
2442

2443
  if (!(*pSource)->onlyRef) {
42,990,684✔
2444
    if ((*pSource)->param) {
64,934✔
2445
      taosMemoryFree((*pSource)->param);
64,934✔
2446
    }
2447
    if ((*pSource)->src.pBlock) {
64,934✔
2448
      blockDataDestroy((*pSource)->src.pBlock);
×
2449
    }
2450
  }
2451

2452
  taosMemoryFreeClear(*pSource);
42,989,364✔
2453
}
2454

2455
static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) {
45,960,310✔
2456
  int32_t       code = 0;
45,960,310✔
2457
  int32_t       lino = 0;
45,960,310✔
2458
  size_t        sortBufSize = pHandle->numOfPages * pHandle->pageSize;
45,960,310✔
2459
  SSortSource** p = taosArrayGet(pHandle->pOrderedSource, 0);
45,960,310✔
2460
  if (p == NULL) {
45,903,073✔
2461
    return terrno;
×
2462
  }
2463

2464
  SSortSource* pSource = *p;
45,903,073✔
2465
  size_t       origSourceCount = taosArrayGetSize(pHandle->pOrderedSource);
45,871,128✔
2466

2467
  while (1) {
149,036,665✔
2468
    SSDataBlock* pBlock = NULL;
194,985,933✔
2469
    code = pHandle->fetchfp(pSource->param, &pBlock);
194,981,333✔
2470
    QUERY_CHECK_CODE(code, lino, _end);
192,005,650✔
2471

2472
    if (pBlock == NULL) {
192,005,650✔
2473
      break;
42,990,737✔
2474
    }
2475

2476
    if (pHandle->pDataBlock == NULL) {
149,014,913✔
2477
      uint32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
28,611,045✔
2478
      pHandle->pageSize = getProperSortPageSize(blockDataGetRowSize(pBlock), numOfCols);
28,607,799✔
2479

2480
      // todo, number of pages are set according to the total available sort buffer
2481
      pHandle->numOfPages = 1024;
28,609,735✔
2482
      sortBufSize = pHandle->numOfPages * pHandle->pageSize;
28,605,314✔
2483
      code = createOneDataBlock(pBlock, false, &pHandle->pDataBlock);
28,609,067✔
2484
      QUERY_CHECK_CODE(code, lino, _end);
28,613,669✔
2485
    }
2486

2487
    if (pHandle->beforeFp != NULL) {
149,034,770✔
2488
      pHandle->beforeFp(pBlock, pHandle->param);
149,033,414✔
2489
    }
2490

2491
    code = blockDataMerge(pHandle->pDataBlock, pBlock);
149,020,042✔
2492
    QUERY_CHECK_CODE(code, lino, _end);
149,035,596✔
2493

2494
    size_t size = blockDataGetSize(pHandle->pDataBlock);
149,035,596✔
2495
    if (size > sortBufSize) {
149,039,639✔
2496
      // Perform the in-memory sort and then flush data in the buffer into disk.
2497
      int64_t st = taosGetTimestampUs();
30,928✔
2498
      code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
30,928✔
2499
      QUERY_CHECK_CODE(code, lino, _end);
30,928✔
2500

2501
      pHandle->sortElapsed += (taosGetTimestampUs() - st);
30,928✔
2502

2503
      if (pHandle->pqMaxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->pqMaxRows);
30,928✔
2504
      code = doAddToBuf(pHandle->pDataBlock, pHandle);
30,928✔
2505
      QUERY_CHECK_CODE(code, lino, _end);
25,347✔
2506
    }
2507
  }
2508

2509
  if (pHandle->pDataBlock != NULL && pHandle->pDataBlock->info.rows > 0) {
42,990,737✔
2510
    size_t size = blockDataGetSize(pHandle->pDataBlock);
28,609,651✔
2511

2512
    // Perform the in-memory sort and then flush data in the buffer into disk.
2513
    int64_t st = taosGetTimestampUs();
28,610,978✔
2514
    code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
28,610,978✔
2515
    QUERY_CHECK_CODE(code, lino, _end);
28,606,283✔
2516

2517
    if (pHandle->pqMaxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->pqMaxRows);
28,605,832✔
2518
    pHandle->sortElapsed += (taosGetTimestampUs() - st);
28,599,723✔
2519

2520
    // All sorted data can fit in memory, external memory sort is not needed. Return to directly
2521
    if (size <= sortBufSize && pHandle->pBuf == NULL) {
28,604,437✔
2522
      pHandle->cmpParam.numOfSources = 1;
28,590,932✔
2523
      pHandle->inMemSort = true;
28,591,987✔
2524

2525
      pHandle->loops = 1;
28,592,863✔
2526
      pHandle->tupleHandle.rowIndex = -1;
28,588,995✔
2527
      pHandle->tupleHandle.pBlock = pHandle->pDataBlock;
28,591,463✔
2528
    } else {
2529
      code = doAddToBuf(pHandle->pDataBlock, pHandle);
14,443✔
2530
    }
2531
  }
2532

2533
_end:
14,380,012✔
2534
  if (code != TSDB_CODE_SUCCESS) {
42,982,703✔
2535
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
451✔
2536
  }
2537
  taosArrayRemoveBatch(pHandle->pOrderedSource, 0, origSourceCount, freeSortSource);
42,982,703✔
2538
  return code;
42,985,826✔
2539
}
2540

2541
static int32_t createInitialSources(SSortHandle* pHandle) {
83,684,568✔
2542
  int32_t code = 0;
83,684,568✔
2543

2544
  if (pHandle->type == SORT_SINGLESOURCE_SORT) {
83,684,568✔
2545
    code = createBlocksQuickSortInitialSources(pHandle);
45,956,092✔
2546
  } else if (pHandle->type == SORT_BLOCK_TS_MERGE) {
37,738,803✔
2547
    code = createBlocksMergeSortInitialSources(pHandle);
12,930,118✔
2548
  }
2549

2550
  qDebug("%s %zu sources created", pHandle->idStr, taosArrayGetSize(pHandle->pOrderedSource));
80,711,776✔
2551
  return code;
80,714,579✔
2552
}
2553

2554
static int32_t tsortOpenForBufMergeSort(SSortHandle* pHandle) {
83,682,708✔
2555
  int32_t code = createInitialSources(pHandle);
83,682,708✔
2556
  if (code != TSDB_CODE_SUCCESS) {
80,715,509✔
2557
    return code;
59,023✔
2558
  }
2559

2560
  // do internal sort
2561
  code = doInternalMergeSort(pHandle);
80,656,486✔
2562
  if (code != TSDB_CODE_SUCCESS) {
80,650,887✔
2563
    return code;
×
2564
  }
2565

2566
  int32_t numOfSources = taosArrayGetSize(pHandle->pOrderedSource);
80,650,887✔
2567
  if (pHandle->pBuf != NULL) {
80,649,027✔
2568
    if (numOfSources > getNumOfInMemBufPages(pHandle->pBuf)) {
12,886,054✔
2569
      qError("sort failed at: %s:%d", __func__, __LINE__);
×
2570
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2571
    }
2572
  }
2573

2574
  if (numOfSources == 0) {
80,655,537✔
2575
    return 0;
45,856,003✔
2576
  }
2577

2578
  code = sortComparInit(&pHandle->cmpParam, pHandle->pOrderedSource, 0, numOfSources - 1, pHandle);
34,799,534✔
2579
  if (code != TSDB_CODE_SUCCESS) {
34,798,461✔
2580
    return code;
×
2581
  }
2582

2583
  code = tMergeTreeCreate(&pHandle->pMergeTree, pHandle->cmpParam.numOfSources, &pHandle->cmpParam, pHandle->comparFn);
34,798,461✔
2584
  return code;
34,798,976✔
2585
}
2586

2587
void tsortClose(SSortHandle* pHandle) { (void)atomic_val_compare_exchange_8(&pHandle->closed, 0, 1); }
123,067,759✔
2588

2589
bool tsortIsClosed(SSortHandle* pHandle) { return atomic_val_compare_exchange_8(&pHandle->closed, 1, 2); }
2,147,483,647✔
2590

2591
void tsortSetClosed(SSortHandle* pHandle) { atomic_store_8(&pHandle->closed, 2); }
×
2592

2593
void tsortSetMergeLimit(SSortHandle* pHandle, int64_t mergeLimit) { pHandle->mergeLimit = mergeLimit; }
41,786,756✔
2594

2595
void tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fetchFp, void (*fp)(SSDataBlock*, void*),
122,966,366✔
2596
                            void* param) {
2597
  pHandle->fetchfp = fetchFp;
122,966,366✔
2598
  pHandle->beforeFp = fp;
123,021,150✔
2599
  pHandle->param = param;
122,965,009✔
2600
}
123,018,919✔
2601

2602
void tsortSetComparFp(SSortHandle* pHandle, _sort_merge_compar_fn_t fp) {
133,400,348✔
2603
  if (pHandle) {
133,400,348✔
2604
    pHandle->comparFn = fp;
133,443,045✔
2605
  }
2606
}
133,381,186✔
2607

2608
void tsortSetCompareGroupId(SSortHandle* pHandle, bool compareGroupId) {
17,587,801✔
2609
  if (pHandle) {
17,587,801✔
2610
    pHandle->cmpParam.cmpGroupId = compareGroupId;
17,587,801✔
2611
  }
2612
}
17,588,170✔
2613

2614
static int32_t tsortBufMergeSortNextTuple(SSortHandle* pHandle, STupleHandle** pTupleHandle) {
2,147,483,647✔
2615
  *pTupleHandle = NULL;
2,147,483,647✔
2616
  int32_t code = 0;
2,147,483,647✔
2617

2618
  if (tsortIsClosed(pHandle)) {
2,147,483,647✔
2619
    return code;
×
2620
  }
2621

2622
  if (pHandle->cmpParam.numOfSources == pHandle->numOfCompletedSources) {
2,147,483,647✔
2623
    return code;
61,172,193✔
2624
  }
2625

2626
  // All the data are hold in the buffer, no external sort is invoked.
2627
  if (pHandle->inMemSort) {
2,147,483,647✔
2628
    pHandle->tupleHandle.rowIndex += 1;
2,147,483,647✔
2629
    if (pHandle->tupleHandle.rowIndex == pHandle->pDataBlock->info.rows) {
2,147,483,647✔
2630
      pHandle->numOfCompletedSources = 1;
28,581,886✔
2631
      return code;
28,581,886✔
2632
    }
2633

2634
    *pTupleHandle = &pHandle->tupleHandle;
2,147,483,647✔
2635
    return code;
2,147,483,647✔
2636
  }
2637

2638
  int32_t      index = tMergeTreeGetChosenIndex(pHandle->pMergeTree);
2,147,483,647✔
2639
  SSortSource* pSource = pHandle->cmpParam.pSources[index];
2,147,483,647✔
2640

2641
  if (pHandle->needAdjust) {
2,147,483,647✔
2642
    code = adjustMergeTreeForNextTuple(pSource, pHandle->pMergeTree, pHandle, &pHandle->numOfCompletedSources);
2,147,483,647✔
2643
    if (code != TSDB_CODE_SUCCESS) {
2,147,483,647✔
2644
      return code;
×
2645
    }
2646
  }
2647

2648
  // all sources are completed.
2649
  if (pHandle->cmpParam.numOfSources == pHandle->numOfCompletedSources) {
2,147,483,647✔
2650
    return code;
32,559,566✔
2651
  }
2652

2653
  // Get the adjusted value after the loser tree is updated.
2654
  index = tMergeTreeGetChosenIndex(pHandle->pMergeTree);
2,147,483,647✔
2655
  pSource = pHandle->cmpParam.pSources[index];
2,147,483,647✔
2656

2657
  if (UNLIKELY(pSource->src.pBlock == NULL)) {
2,147,483,647✔
2658
    qError("sort failed at: %s:%d", __func__, __LINE__);
×
2659
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2660
  }
2661

2662
  pHandle->tupleHandle.rowIndex = pSource->src.rowIndex;
2,147,483,647✔
2663
  pHandle->tupleHandle.pBlock = pSource->src.pBlock;
2,147,483,647✔
2664

2665
  pHandle->needAdjust = true;
2,147,483,647✔
2666
  pSource->src.rowIndex += 1;
2,147,483,647✔
2667

2668
  *pTupleHandle = &pHandle->tupleHandle;
2,147,483,647✔
2669
  return code;
2,147,483,647✔
2670
}
2671

2672
static bool tsortIsForceUsePQSort(SSortHandle* pHandle) { return pHandle->forceUsePQSort == true; }
56,393,042✔
2673

2674
void tsortSetForceUsePQSort(SSortHandle* pHandle) { pHandle->forceUsePQSort = true; }
7,206,526✔
2675

2676
static bool tsortIsPQSortApplicable(SSortHandle* pHandle) {
94,125,838✔
2677
  if (pHandle->type != SORT_SINGLESOURCE_SORT) return false;
94,125,838✔
2678
  if (tsortIsForceUsePQSort(pHandle)) return true;
56,412,820✔
2679
  uint64_t maxRowsFitInMemory = pHandle->pqSortBufSize / (pHandle->pqMaxTupleLength + sizeof(char*));
56,308,180✔
2680
  return maxRowsFitInMemory > pHandle->pqMaxRows;
56,344,535✔
2681
}
2682

2683
static bool tsortPQCompFn(void* a, void* b, void* param) {
2,147,483,647✔
2684
  SSortHandle* pHandle = param;
2,147,483,647✔
2685
  int32_t      res = pHandle->comparFn(a, b, param);
2,147,483,647✔
2686
  if (res < 0) return 1;
2,147,483,647✔
2687
  return 0;
2,147,483,647✔
2688
}
2689

2690
static bool tsortPQComFnReverse(void* a, void* b, void* param) {
1,180,955,538✔
2691
  SSortHandle* pHandle = param;
1,180,955,538✔
2692
  int32_t      res = pHandle->comparFn(a, b, param);
1,180,955,538✔
2693
  if (res > 0) return 1;
1,180,956,510✔
2694
  return 0;
428,796,456✔
2695
}
2696

2697
static int32_t tupleComparFn(const void* pLeft, const void* pRight, void* param) {
2,147,483,647✔
2698
  TupleDesc* pLeftDesc = (TupleDesc*)pLeft;
2,147,483,647✔
2699
  TupleDesc* pRightDesc = (TupleDesc*)pRight;
2,147,483,647✔
2700

2701
  SSortHandle* pHandle = (SSortHandle*)param;
2,147,483,647✔
2702
  SArray*      orderInfo = (SArray*)pHandle->pSortInfo;
2,147,483,647✔
2703
  uint32_t     colNum = blockDataGetNumOfCols(pHandle->pDataBlock);
2,147,483,647✔
2704
  for (int32_t i = 0; i < orderInfo->size; ++i) {
2,147,483,647✔
2705
    SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(orderInfo, i);
2,147,483,647✔
2706
    void *           lData = NULL, *rData = NULL;
2,147,483,647✔
2707

2708
    int32_t ret1 = tupleDescGetField(pLeftDesc, pOrder->slotId, colNum, &lData);
2,147,483,647✔
2709
    int32_t ret2 = tupleDescGetField(pRightDesc, pOrder->slotId, colNum, &rData);
2,147,483,647✔
2710
    if (ret1) {
2,147,483,647✔
2711
      return ret1;
×
2712
    }
2713

2714
    if (ret2) {
2,147,483,647✔
2715
      return ret2;
×
2716
    }
2717

2718
    if ((!lData) && (!rData)) {
2,147,483,647✔
2719
      continue;
2,147,483,647✔
2720
    }
2721

2722
    if (!lData) return pOrder->nullFirst ? -1 : 1;
2,147,483,647✔
2723
    if (!rData) return pOrder->nullFirst ? 1 : -1;
2,147,483,647✔
2724

2725
    SColumnInfoData* p = (SColumnInfoData*)taosArrayGet(pHandle->pDataBlock->pDataBlock, pOrder->slotId);
2,147,483,647✔
2726
    if (p == NULL) {
2,147,483,647✔
2727
      return terrno;
×
2728
    }
2729

2730
    __compar_fn_t fn = getKeyComparFunc(p->info.type, pOrder->order);
2,147,483,647✔
2731

2732
    int32_t ret = fn(lData, rData);
2,147,483,647✔
2733
    if (ret == 0) {
2,147,483,647✔
2734
      continue;
2,147,483,647✔
2735
    } else {
2736
      return ret;
2,147,483,647✔
2737
    }
2738
  }
2739

2740
  return 0;
2,147,483,647✔
2741
}
2742

2743
static int32_t tsortOpenForPQSort(SSortHandle* pHandle) {
10,437,848✔
2744
  pHandle->pBoundedQueue = createBoundedQueue(pHandle->pqMaxRows, tsortPQCompFn, destroyTuple, pHandle);
10,437,848✔
2745
  if (NULL == pHandle->pBoundedQueue) {
10,437,023✔
2746
    return TSDB_CODE_OUT_OF_MEMORY;
×
2747
  }
2748

2749
  tsortSetComparFp(pHandle, tupleComparFn);
10,433,507✔
2750

2751
  SSortSource** pSource = taosArrayGet(pHandle->pOrderedSource, 0);
10,422,423✔
2752
  if (pSource == NULL) {
10,435,106✔
2753
    return terrno;
×
2754
  }
2755

2756
  SSortSource*      source = *pSource;
10,435,106✔
2757
  uint32_t          tupleLen = 0;
10,430,683✔
2758
  PriorityQueueNode pqNode;
10,430,683✔
2759
  pHandle->pDataBlock = NULL;
10,426,361✔
2760

2761
  while (1) {
1,892,061,878✔
2762
    // fetch data
2763
    SSDataBlock* pBlock = NULL;
1,902,471,741✔
2764
    TAOS_CHECK_RETURN(pHandle->fetchfp(source->param, &pBlock));
1,902,496,387✔
2765
    if (NULL == pBlock) {
1,902,495,576✔
2766
      break;
10,451,368✔
2767
    }
2768

2769
    if (pHandle->beforeFp != NULL) {
1,892,044,208✔
2770
      pHandle->beforeFp(pBlock, pHandle->param);
1,892,046,998✔
2771
    }
2772

2773
    if (pHandle->pDataBlock == NULL) {
1,892,044,616✔
2774
      int32_t code = createOneDataBlock(pBlock, false, &pHandle->pDataBlock);
8,099,631✔
2775
      if (code) {
8,108,523✔
2776
        return code;
×
2777
      }
2778
    }
2779

2780
    size_t colNum = blockDataGetNumOfCols(pBlock);
1,892,051,648✔
2781

2782
    if (tupleLen == 0) {
1,892,044,151✔
2783
      for (size_t colIdx = 0; colIdx < colNum; ++colIdx) {
31,718,421✔
2784
        SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, colIdx);
23,633,670✔
2785
        if (pCol == NULL) {
23,595,891✔
2786
          return terrno;
×
2787
        }
2788

2789
        tupleLen += pCol->info.bytes;
23,595,891✔
2790
        if (IS_VAR_DATA_TYPE(pCol->info.type)) {
23,621,523✔
2791
          if (IS_STR_DATA_BLOB(pCol->info.type)) {
2,763,381✔
2792
            tupleLen += sizeof(BlobDataLenT);
×
2793
          } else {
2794
            tupleLen += sizeof(VarDataLenT);
2,750,883✔
2795
          }
2796
        }
2797
      }
2798
    }
2799

2800
    ReferencedTuple refTuple = {.desc.data = (char*)pBlock, .desc.type = ReferencedTupleType, .rowIndex = 0};
1,892,020,844✔
2801
    for (size_t rowIdx = 0; rowIdx < pBlock->info.rows; ++rowIdx) {
2,147,483,647✔
2802
      refTuple.rowIndex = rowIdx;
2,147,483,647✔
2803
      pqNode.data = &refTuple;
2,147,483,647✔
2804
      PriorityQueueNode* pPushedNode = taosBQPush(pHandle->pBoundedQueue, &pqNode);
2,147,483,647✔
2805
      if (!pPushedNode) {
2,147,483,647✔
2806
        // do nothing if push failed
2807
      } else {
2808
        pPushedNode->data = NULL;
2,147,483,647✔
2809
        int32_t code = createAllocatedTuple(pBlock, colNum, tupleLen, rowIdx, (TupleDesc**)&pPushedNode->data);
2,147,483,647✔
2810
        if (code) {
2,147,483,647✔
2811
          return code;
×
2812
        }
2813
      }
2814
    }
2815
  }
2816

2817
  return TSDB_CODE_SUCCESS;
10,451,368✔
2818
}
2819

2820
static int32_t tsortPQSortNextTuple(SSortHandle* pHandle, STupleHandle** pTupleHandle) {
102,200,175✔
2821
  int32_t code = 0;
102,200,175✔
2822

2823
  *pTupleHandle = NULL;
102,200,175✔
2824
  if (pHandle->pDataBlock == NULL) {  // when no input stream datablock
102,199,710✔
2825
    return code;
×
2826
  }
2827

2828
  blockDataCleanup(pHandle->pDataBlock);
102,198,673✔
2829
  code = blockDataEnsureCapacity(pHandle->pDataBlock, 1);
102,201,105✔
2830
  if (code) {
102,201,105✔
2831
    return code;
×
2832
  }
2833

2834
  // abandon the top tuple if queue size bigger than max size
2835
  if (taosBQSize(pHandle->pBoundedQueue) == taosBQMaxSize(pHandle->pBoundedQueue) + 1) {
102,201,105✔
2836
    taosBQPop(pHandle->pBoundedQueue);
6,441,218✔
2837
  }
2838
  if (pHandle->tmpRowIdx == 0) {
102,197,385✔
2839
    // sort the results
2840
    taosBQSetFn(pHandle->pBoundedQueue, tsortPQComFnReverse);
8,121,543✔
2841
    taosBQBuildHeap(pHandle->pBoundedQueue);
8,122,008✔
2842
  }
2843
  if (taosBQSize(pHandle->pBoundedQueue) > 0) {
102,196,920✔
2844
    uint32_t           colNum = blockDataGetNumOfCols(pHandle->pDataBlock);
90,236,772✔
2845
    PriorityQueueNode* node = taosBQTop(pHandle->pBoundedQueue);
90,236,307✔
2846
    char*              pTuple = ((TupleDesc*)node->data)->data;
90,237,702✔
2847

2848
    for (uint32_t i = 0; i < colNum; ++i) {
828,447,485✔
2849
      void* pData = tupleGetField(pTuple, i, colNum);
738,209,318✔
2850

2851
      SColumnInfoData* p = NULL;
738,211,643✔
2852
      TAOS_CHECK_RETURN(bdGetColumnInfoData(pHandle->pDataBlock, i, &p));
738,210,713✔
2853

2854
      if (!pData) {
738,210,713✔
2855
        colDataSetNULL(p, 0);
113,959,211✔
2856
      } else {
2857
        TAOS_CHECK_RETURN(colDataSetVal(p, 0, pData, false));
624,251,502✔
2858
      }
2859
    }
2860
    pHandle->pDataBlock->info.rows++;
90,238,167✔
2861
    pHandle->tmpRowIdx++;
90,237,180✔
2862
    taosBQPop(pHandle->pBoundedQueue);
90,235,263✔
2863
  }
2864

2865
  if (pHandle->pDataBlock->info.rows == 0) {
102,200,118✔
2866
    return code;
11,961,543✔
2867
  }
2868

2869
  pHandle->tupleHandle.pBlock = pHandle->pDataBlock;
90,236,307✔
2870
  *pTupleHandle = &pHandle->tupleHandle;
90,237,702✔
2871
  return code;
90,232,065✔
2872
}
2873

2874
static int32_t tsortSingleTableMergeNextTuple(SSortHandle* pHandle, STupleHandle** pTupleHandle) {
2,147,483,647✔
2875
  *pTupleHandle = NULL;
2,147,483,647✔
2876
  int32_t code = 0;
2,147,483,647✔
2877

2878
  if (1 == pHandle->numOfCompletedSources) {
2,147,483,647✔
2879
    return code;
17,348,258✔
2880
  }
2881

2882
  if (pHandle->tupleHandle.pBlock && pHandle->tupleHandle.rowIndex + 1 < pHandle->tupleHandle.pBlock->info.rows) {
2,147,483,647✔
2883
    pHandle->tupleHandle.rowIndex++;
2,147,483,647✔
2884
  } else {
2885
    if (pHandle->tupleHandle.rowIndex == -1) {
46,562,996✔
2886
      return code;
×
2887
    }
2888

2889
    SSortSource** pSource = taosArrayGet(pHandle->pOrderedSource, 0);
46,551,052✔
2890
    if (pSource == NULL) {
46,549,398✔
2891
      return terrno;
×
2892
    }
2893

2894
    SSortSource* source = *pSource;
46,549,398✔
2895
    SSDataBlock* pBlock = NULL;
46,549,372✔
2896
    TAOS_CHECK_RETURN(pHandle->fetchfp(source->param, &pBlock));
46,548,391✔
2897

2898
    if (!pBlock || pBlock->info.rows == 0) {
46,498,592✔
2899
      setCurrentSourceDone(source, pHandle);
28,861,450✔
2900
      pHandle->tupleHandle.pBlock = NULL;
28,856,800✔
2901
      return code;
28,857,265✔
2902
    }
2903

2904
    pHandle->tupleHandle.pBlock = pBlock;
17,641,792✔
2905
    pHandle->tupleHandle.rowIndex = 0;
17,639,467✔
2906
  }
2907

2908
  *pTupleHandle = &pHandle->tupleHandle;
2,147,483,647✔
2909
  return code;
2,147,483,647✔
2910
}
2911

2912
int32_t tsortOpen(SSortHandle* pHandle) {
94,124,326✔
2913
  int32_t code = 0;
94,124,326✔
2914
  if (pHandle->opened) {
94,124,326✔
2915
    return code;
×
2916
  }
2917

2918
  if (pHandle == NULL || pHandle->fetchfp == NULL || pHandle->comparFn == NULL) {
94,144,537✔
2919
    return TSDB_CODE_INVALID_PARA;
×
2920
  }
2921

2922
  pHandle->opened = true;
94,120,709✔
2923
  if (tsortIsPQSortApplicable(pHandle)) {
94,110,338✔
2924
    code = tsortOpenForPQSort(pHandle);
10,399,377✔
2925
  } else {
2926
    code = tsortOpenForBufMergeSort(pHandle);
83,605,414✔
2927
  }
2928

2929
  return code;
91,165,737✔
2930
}
2931

2932
int32_t tsortNextTuple(SSortHandle* pHandle, STupleHandle** pTupleHandle) {
2,147,483,647✔
2933
  int32_t code = 0;
2,147,483,647✔
2934

2935
  if (pHandle->singleTableMerge) {
2,147,483,647✔
2936
    code = tsortSingleTableMergeNextTuple(pHandle, pTupleHandle);
2,147,483,647✔
2937
  } else if (pHandle->pBoundedQueue) {
2,147,483,647✔
2938
    code = tsortPQSortNextTuple(pHandle, pTupleHandle);
102,197,743✔
2939
  } else {
2940
    code = tsortBufMergeSortNextTuple(pHandle, pTupleHandle);
2,147,483,647✔
2941
  }
2942

2943
  return code;
2,147,483,647✔
2944
}
2945

2946
bool tsortIsNullVal(STupleHandle* pVHandle, int32_t colIndex) {
2,147,483,647✔
2947
  SColumnInfoData* pColInfoSrc = taosArrayGet(pVHandle->pBlock->pDataBlock, colIndex);
2,147,483,647✔
2948
  if (pColInfoSrc == NULL || pColInfoSrc->pData == NULL) {
2,147,483,647✔
2949
    return true;
2,147,483,647✔
2950
  }
2951

2952
  return colDataIsNull_s(pColInfoSrc, pVHandle->rowIndex);
2,147,483,647✔
2953
}
2954

2955
void tsortGetValue(STupleHandle* pVHandle, int32_t colIndex, void** pVal) {
2,147,483,647✔
2956
  *pVal = NULL;
2,147,483,647✔
2957
  SColumnInfoData* pColInfo = TARRAY_GET_ELEM(pVHandle->pBlock->pDataBlock, colIndex);
2,147,483,647✔
2958
  if (pColInfo->pData != NULL) {
2,147,483,647✔
2959
    *pVal = colDataGetData(pColInfo, pVHandle->rowIndex);
2,147,483,647✔
2960
  }
2961
}
2,147,483,647✔
2962

2963
void tsortGetColumnInfo(STupleHandle* pVHandle, int32_t colIndex, SColumnInfoData** pColInfo) {
2,147,483,647✔
2964
  *pColInfo = TARRAY_GET_ELEM(pVHandle->pBlock->pDataBlock, colIndex);
2,147,483,647✔
2965
}
2,147,483,647✔
2966

2967
size_t   tsortGetColNum(STupleHandle* pVHandle) { return blockDataGetNumOfCols(pVHandle->pBlock); }
2,147,483,647✔
2968
uint64_t tsortGetGroupId(STupleHandle* pVHandle) { return pVHandle->pBlock->info.id.groupId; }
2,147,483,647✔
2969
uint64_t tsortGetBlockId(STupleHandle* pVHandle) { return pVHandle->pBlock->info.id.blockId; }
2,147,483,647✔
2970
void     tsortGetBlockInfo(STupleHandle* pVHandle, SDataBlockInfo* pBlockInfo) { *pBlockInfo = pVHandle->pBlock->info; }
2,147,483,647✔
2971

2972
SSortExecInfo tsortGetSortExecInfo(SSortHandle* pHandle) {
44,663,699✔
2973
  SSortExecInfo info = {0};
44,663,699✔
2974

2975
  if (pHandle == NULL) {
44,663,699✔
2976
    info.sortMethod = SORT_QSORT_T;  // by default
×
2977
    info.sortBuffer = 2 * 1048576;   // 2mb by default
×
2978
  } else {
2979
    info.sortBuffer = pHandle->pageSize * pHandle->numOfPages;
44,663,699✔
2980
    info.sortMethod = pHandle->inMemSort ? SORT_QSORT_T : SORT_SPILLED_MERGE_SORT_T;
44,665,559✔
2981
    info.loops = pHandle->loops;
44,653,748✔
2982

2983
    if (pHandle->pBuf != NULL) {
44,657,468✔
2984
      SDiskbasedBufStatis st = getDBufStatis(pHandle->pBuf);
13,301,283✔
2985
      info.writeBytes = st.flushBytes;
13,298,822✔
2986
      info.readBytes = st.loadBytes;
13,298,822✔
2987
    }
2988
  }
2989

2990
  return info;
44,659,843✔
2991
}
2992

2993
int32_t tsortCompAndBuildKeys(const SArray* pSortCols, char* keyBuf, int32_t* keyLen, const STupleHandle* pTuple) {
2,147,483,647✔
2994
  int32_t ret;
2995
  if (0 == compKeys(pSortCols, keyBuf, *keyLen, pTuple->pBlock, pTuple->rowIndex)) {
2,147,483,647✔
2996
    ret = 0;
2,147,483,647✔
2997
  } else {
2998
    *keyLen = buildKeys(keyBuf, pSortCols, pTuple->pBlock, pTuple->rowIndex);
1,635,417✔
2999
    ret = 1;
1,746,036✔
3000
  }
3001
  return ret;
2,147,483,647✔
3002
}
3003

3004
void tsortSetMergeLimitReachedFp(SSortHandle* pHandle, void (*mergeLimitReachedCb)(uint64_t tableUid, void* param),
41,787,872✔
3005
                                 void*        param) {
3006
  pHandle->mergeLimitReachedFn = mergeLimitReachedCb;
41,787,872✔
3007
  pHandle->mergeLimitReachedParam = param;
41,820,633✔
3008
}
41,757,684✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc