• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4720

08 Sep 2025 08:43AM UTC coverage: 58.139% (-0.6%) from 58.762%
#4720

push

travis-ci

web-flow
Merge pull request #32881 from taosdata/enh/add-new-windows-ci

fix(ci): update workflow reference to use new Windows CI YAML

133181 of 292179 branches covered (45.58%)

Branch coverage included in aggregate %.

201691 of 283811 relevant lines covered (71.07%)

5442780.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.51
/source/libs/executor/src/tsort.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "query.h"
17
#include "tcommon.h"
18

19
#include "executil.h"
20
#include "tcompare.h"
21
#include "tdatablock.h"
22
#include "tdef.h"
23
#include "theap.h"
24
#include "tlosertree.h"
25
#include "tpagedbuf.h"
26
#include "tsimplehash.h"
27
#include "tsort.h"
28
#include "tutil.h"
29

30
#define AllocatedTupleType  0
31
#define ReferencedTupleType 1  // tuple references to one row in pDataBlock
32

33
struct STupleHandle {
34
  SSDataBlock* pBlock;
35
  int32_t      rowIndex;
36
};
37

38
typedef struct SSortMemFileRegion {
39
  int64_t fileOffset;
40
  int32_t regionSize;
41

42
  int32_t bufRegOffset;
43
  int32_t bufLen;
44
  char*   buf;
45
} SSortMemFileRegion;
46

47
typedef struct SSortMemFile {
48
  char*   writeBuf;
49
  int32_t writeBufSize;
50
  int64_t writeFileOffset;
51

52
  int32_t currRegionId;
53
  int32_t currRegionOffset;
54
  bool    bRegionDirty;
55

56
  SArray* aFileRegions;
57
  int32_t cacheSize;
58
  int32_t blockSize;
59

60
  FILE* pTdFile;
61
  char  memFilePath[PATH_MAX];
62
} SSortMemFile;
63

64
struct SSortHandle {
65
  int32_t        type;
66
  int32_t        pageSize;
67
  int32_t        numOfPages;
68
  SDiskbasedBuf* pBuf;
69
  SArray*        pSortInfo;
70
  SArray*        pOrderedSource;
71
  int32_t        loops;
72
  uint64_t       sortElapsed;
73
  int64_t        startTs;
74
  uint64_t       totalElapsed;
75

76
  uint64_t      pqMaxRows;
77
  uint32_t      pqMaxTupleLength;
78
  uint32_t      pqSortBufSize;
79
  bool          forceUsePQSort;
80
  BoundedQueue* pBoundedQueue;
81
  uint32_t      tmpRowIdx;
82
  int64_t       mergeLimit;
83
  int64_t       currMergeLimitTs;
84

85
  int32_t           sourceId;
86
  SSDataBlock*      pDataBlock;
87
  SMsortComparParam cmpParam;
88
  int32_t           numOfCompletedSources;
89
  bool              opened;
90
  int8_t            closed;
91
  const char*       idStr;
92
  bool              inMemSort;
93
  bool              needAdjust;
94
  STupleHandle      tupleHandle;
95
  void*             param;
96
  void (*beforeFp)(SSDataBlock* pBlock, void* param);
97

98
  _sort_fetch_block_fn_t  fetchfp;
99
  _sort_merge_compar_fn_t comparFn;
100
  SMultiwayMergeTreeInfo* pMergeTree;
101

102
  bool singleTableMerge;
103

104
  bool (*abortCheckFn)(void* param);
105
  void* abortCheckParam;
106

107
  bool          bSortByRowId;
108
  SSortMemFile* pExtRowsMemFile;
109
  int32_t       extRowBytes;
110
  int32_t       extRowsPageSize;
111
  int32_t       extRowsMemSize;
112
  int32_t       srcTsSlotId;
113
  SArray*       aExtRowsOrders;
114
  bool          bSortPk;
115
  void (*mergeLimitReachedFn)(uint64_t tableUid, void* param);
116
  void* mergeLimitReachedParam;
117
};
118

119
static void    destroySortMemFile(SSortHandle* pHandle);
120
static int32_t getRowBufFromExtMemFile(SSortHandle* pHandle, int32_t regionId, int32_t tupleOffset, int32_t rowLen,
121
                                       char** ppRow, bool* pFreeRow);
122
void           tsortSetSingleTableMerge(SSortHandle* pHandle) { pHandle->singleTableMerge = true; }
146,152✔
123

124
void tsortSetAbortCheckFn(SSortHandle* pHandle, bool (*checkFn)(void*), void* param) {
213,041✔
125
  pHandle->abortCheckFn = checkFn;
213,041✔
126
  pHandle->abortCheckParam = param;
213,041✔
127
}
213,041✔
128

129
static int32_t msortComparFn(const void* pLeft, const void* pRight, void* param);
130

131
// | offset[0] | offset[1] |....| nullbitmap | data |...|
132
static void* createTuple(uint32_t columnNum, uint32_t tupleLen) {
560,886✔
133
  uint32_t totalLen = sizeof(uint32_t) * columnNum + BitmapLen(columnNum) + tupleLen;
560,886✔
134
  return taosMemoryCalloc(1, totalLen);
560,886✔
135
}
136

137
static void destoryAllocatedTuple(void* t) { taosMemoryFree(t); }
559,448!
138

139
#define tupleOffset(tuple, colIdx)                ((uint32_t*)(tuple + sizeof(uint32_t) * colIdx))
140
#define tupleSetOffset(tuple, colIdx, offset)     (*tupleOffset(tuple, colIdx) = offset)
141
#define tupleSetNull(tuple, colIdx, colNum)       colDataSetNull_f((char*)tuple + sizeof(uint32_t) * colNum, colIdx)
142
#define tupleColIsNull(tuple, colIdx, colNum)     BMIsNull((char*)tuple + sizeof(uint32_t) * colNum, colIdx)
143
#define tupleGetDataStartOffset(colNum)           (sizeof(uint32_t) * colNum + BitmapLen(colNum))
144
#define tupleSetData(tuple, offset, data, length) memcpy(tuple + offset, data, length)
145

146
/**
147
 * @param t the tuple pointer addr, if realloced, *t is changed to the new addr
148
 * @param offset copy data into pTuple start from offset
149
 * @param colIndex the columnIndex, for setting null bitmap
150
 * @return the next offset to add field
151
 * */
152
static inline int32_t tupleAddField(char** t, uint32_t colNum, uint32_t offset, uint32_t colIdx, void* data,
2,153,370✔
153
                                    size_t length, bool isNull, uint32_t tupleLen, uint32_t* pOffset) {
154
  int32_t code = TSDB_CODE_SUCCESS;
2,153,370✔
155
  int32_t lino = 0;
2,153,370✔
156
  tupleSetOffset(*t, colIdx, offset);
2,153,370✔
157

158
  if (isNull) {
2,153,370✔
159
    tupleSetNull(*t, colIdx, colNum);
240,965✔
160
  } else {
161
    if (offset + length > tupleLen + tupleGetDataStartOffset(colNum)) {
1,912,405!
162
      void* px = taosMemoryRealloc(*t, offset + length);
×
163
      QUERY_CHECK_NULL(px, code, lino, _end, terrno);
×
164

165
      *t = px;
×
166
    }
167
    tupleSetData(*t, offset, data, length);
1,912,405✔
168
  }
169

170
  (*pOffset) = offset + length;
2,153,370✔
171

172
_end:
2,153,370✔
173
  if (code != TSDB_CODE_SUCCESS) {
2,153,370!
174
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
175
  }
176
  return code;
2,154,910✔
177
}
178

179
static void* tupleGetField(char* t, uint32_t colIdx, uint32_t colNum) {
14,271,845✔
180
  if (tupleColIsNull(t, colIdx, colNum)) {
14,271,845✔
181
    return NULL;
1,706,105✔
182
  }
183

184
  return t + *tupleOffset(t, colIdx);
12,565,740✔
185
}
186

187
int32_t tsortGetSortedDataBlock(const SSortHandle* pSortHandle, SSDataBlock** pBlock) {
360,369✔
188
  *pBlock = NULL;
360,369✔
189
  if (pSortHandle->pDataBlock == NULL) {
360,369✔
190
    return TSDB_CODE_SUCCESS;
25,190✔
191
  }
192
  return createOneDataBlock(pSortHandle->pDataBlock, false, pBlock);
335,179✔
193
}
194

195
typedef struct TupleDesc {
196
  uint8_t type;
197
  char*   data;  // if type is AllocatedTuple, then points to the created tuple, otherwise points to the DataBlock
198
} TupleDesc;
199

200
typedef struct ReferencedTuple {
201
  TupleDesc desc;
202
  size_t    rowIndex;
203
} ReferencedTuple;
204

205
static int32_t createAllocatedTuple(SSDataBlock* pBlock, size_t colNum, uint32_t tupleLen, size_t rowIdx,
558,537✔
206
                                    TupleDesc** pDesc) {
207
  int32_t    code = TSDB_CODE_SUCCESS;
558,537✔
208
  TupleDesc* t = taosMemoryCalloc(1, sizeof(TupleDesc));
558,537!
209
  if (t == NULL) {
560,889!
210
    return terrno;
×
211
  }
212

213
  void* pTuple = createTuple(colNum, tupleLen);
560,889✔
214
  if (!pTuple) {
560,992✔
215
    taosMemoryFree(t);
24!
216
    return terrno;
×
217
  }
218

219
  size_t   colLen = 0;
560,968✔
220
  uint32_t offset = tupleGetDataStartOffset(colNum);
560,968✔
221
  for (size_t colIdx = 0; colIdx < colNum; ++colIdx) {
2,715,615✔
222
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, colIdx);
2,155,338✔
223
    if (pCol == NULL) {
2,154,747!
224
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
225
      return terrno;
×
226
    }
227

228
    if (colDataIsNull_s(pCol, rowIdx)) {
4,310,772✔
229
      code = tupleAddField((char**)&pTuple, colNum, offset, colIdx, 0, 0, true, tupleLen, &offset);
240,985✔
230
    } else {
231
      colLen = colDataGetRowLength(pCol, rowIdx);
1,914,401✔
232
      code = tupleAddField((char**)&pTuple, colNum, offset, colIdx, colDataGetData(pCol, rowIdx), colLen, false,
1,913,085!
233
                           tupleLen, &offset);
234
    }
235
    if (code != TSDB_CODE_SUCCESS) {
2,154,647!
236
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
237
      return code;
×
238
    }
239
  }
240

241
  t->type = AllocatedTupleType;
560,277✔
242
  t->data = pTuple;
560,277✔
243

244
  *pDesc = t;
560,277✔
245
  return code;
560,277✔
246
}
247

248
int32_t tupleDescGetField(const TupleDesc* pDesc, int32_t colIdx, uint32_t colNum, void** pResult) {
23,482,545✔
249
  *pResult = NULL;
23,482,545✔
250

251
  if (pDesc->type == ReferencedTupleType) {
23,482,545✔
252
    ReferencedTuple* pRefTuple = (ReferencedTuple*)pDesc;
9,878,394✔
253
    SColumnInfoData* pCol = taosArrayGet(((SSDataBlock*)pDesc->data)->pDataBlock, colIdx);
9,878,394✔
254
    if (pCol == NULL) {
9,878,053!
255
      return terrno;
×
256
    }
257

258
    if (colDataIsNull_s(pCol, pRefTuple->rowIndex)) {
19,756,234✔
259
      return TSDB_CODE_SUCCESS;
1,001,674✔
260
    }
261

262
    *pResult = colDataGetData(pCol, pRefTuple->rowIndex);
8,876,443!
263
  } else {
264
    *pResult = tupleGetField(pDesc->data, colIdx, colNum);
13,604,151✔
265
  }
266

267
  return 0;
22,493,549✔
268
}
269

270
void destroyTuple(void* t) {
559,457✔
271
  TupleDesc* pDesc = t;
559,457✔
272
  if (pDesc != NULL && pDesc->type == AllocatedTupleType) {
559,457!
273
    destoryAllocatedTuple(pDesc->data);
559,448✔
274
    taosMemoryFree(pDesc);
561,293✔
275
  }
276
}
561,207✔
277

278
/**
279
 *
280
 * @param type
281
 * @return
282
 */
283
int32_t tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t pageSize, int32_t numOfPages,
414,479✔
284
                              SSDataBlock* pBlock, const char* idstr, uint64_t pqMaxRows, uint32_t pqMaxTupleLength,
285
                              uint32_t pqSortBufSize, SSortHandle** pHandle) {
286
  int32_t code = 0;
414,479✔
287
  int32_t lino = 0;
414,479✔
288

289
  QRY_PARAM_CHECK(pHandle);
414,479!
290
  SSortHandle* pSortHandle = taosMemoryCalloc(1, sizeof(SSortHandle));
414,479!
291
  QUERY_CHECK_NULL(pSortHandle, code, lino, _err, terrno);
414,527!
292

293
  pSortHandle->type = type;
414,527✔
294
  pSortHandle->pageSize = pageSize;
414,527✔
295
  pSortHandle->numOfPages = numOfPages;
414,527✔
296
  pSortHandle->pSortInfo = taosArrayDup(pSortInfo, NULL);
414,527✔
297
  QUERY_CHECK_NULL(pSortHandle->pSortInfo, code, lino, _err, terrno);
414,548!
298

299
  pSortHandle->loops = 0;
414,548✔
300
  pSortHandle->pqMaxTupleLength = pqMaxTupleLength;
414,548✔
301
  if (pqMaxRows != 0) {
414,548✔
302
    pSortHandle->pqSortBufSize = pqSortBufSize;
145,960✔
303
    pSortHandle->pqMaxRows = pqMaxRows;
145,960✔
304
  }
305

306
  pSortHandle->forceUsePQSort = false;
414,548✔
307
  if (pBlock != NULL) {
414,548✔
308
    code = createOneDataBlock(pBlock, false, &pSortHandle->pDataBlock);
268,471✔
309
    QUERY_CHECK_CODE(code, lino, _err);
268,496!
310
  }
311

312
  pSortHandle->mergeLimit = -1;
414,573✔
313

314
  pSortHandle->pOrderedSource = taosArrayInit(4, POINTER_BYTES);
414,573✔
315
  QUERY_CHECK_NULL(pSortHandle->pOrderedSource, code, lino, _err, terrno);
414,551!
316

317
  pSortHandle->cmpParam.orderInfo = pSortInfo;
414,551✔
318
  pSortHandle->cmpParam.cmpGroupId = false;
414,551✔
319
  pSortHandle->cmpParam.sortType = type;
414,551✔
320

321
  if (type == SORT_BLOCK_TS_MERGE) {
414,551✔
322
    SBlockOrderInfo* pTsOrder = TARRAY_GET_ELEM(pSortInfo, 0);
213,145✔
323
    pSortHandle->cmpParam.tsSlotId = pTsOrder->slotId;
213,145✔
324
    pSortHandle->cmpParam.tsOrder = pTsOrder->order;
213,145✔
325
    pSortHandle->cmpParam.cmpTsFn = pTsOrder->compFn;
213,145✔
326
    if (taosArrayGetSize(pSortHandle->pSortInfo) == 2) {
213,145✔
327
      pSortHandle->cmpParam.pPkOrder = taosArrayGet(pSortHandle->pSortInfo, 1);
15,429✔
328
      pSortHandle->bSortPk = true;
15,417✔
329
    } else {
330
      pSortHandle->cmpParam.pPkOrder = NULL;
197,712✔
331
      pSortHandle->bSortPk = false;
197,712✔
332
    }
333
  }
334
  tsortSetComparFp(pSortHandle, msortComparFn);
414,535✔
335

336
  if (idstr != NULL) {
414,490!
337
    pSortHandle->idStr = taosStrdup(idstr);
414,494!
338
    QUERY_CHECK_NULL(pSortHandle->idStr, code, lino, _err, terrno);
414,491!
339
  }
340

341
  *pHandle = pSortHandle;
414,487✔
342
  return code;
414,487✔
343

344
_err:
×
345
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
346
  if (pSortHandle) {
×
347
    tsortDestroySortHandle(pSortHandle);
×
348
  }
349
  return code;
×
350
}
351

352
static int32_t sortComparCleanup(SMsortComparParam* cmpParam) {
×
353
  // NOTICE: pSource may be, if it is SORT_MULTISOURCE_MERGE
354
  for (int32_t i = 0; i < cmpParam->numOfSources; ++i) {
×
355
    SSortSource* pSource = cmpParam->pSources[i];
×
356
    blockDataDestroy(pSource->src.pBlock);
×
357
    if (pSource->pageIdList) {
×
358
      taosArrayDestroy(pSource->pageIdList);
×
359
    }
360
    taosMemoryFreeClear(pSource);
×
361
    cmpParam->pSources[i] = NULL;
×
362
  }
363

364
  cmpParam->numOfSources = 0;
×
365
  return TSDB_CODE_SUCCESS;
×
366
}
367

368
void tsortClearOrderedSource(SArray* pOrderedSource, int64_t* fetchUs, int64_t* fetchNum) {
481,511✔
369
  for (size_t i = 0; i < taosArrayGetSize(pOrderedSource); i++) {
907,905✔
370
    SSortSource** pSource = taosArrayGet(pOrderedSource, i);
426,381✔
371
    if (NULL == *pSource) {
426,386!
372
      continue;
×
373
    }
374

375
    if (fetchUs) {
426,386✔
376
      *fetchUs += (*pSource)->fetchUs;
359,435✔
377
      *fetchNum += (*pSource)->fetchNum;
359,435✔
378
    }
379

380
    // release pageIdList
381
    if ((*pSource)->pageIdList) {
426,386✔
382
      taosArrayDestroy((*pSource)->pageIdList);
58,005✔
383
      (*pSource)->pageIdList = NULL;
58,005✔
384
    }
385
    if ((*pSource)->param && !(*pSource)->onlyRef) {
426,386✔
386
      taosMemoryFree((*pSource)->param);
213,155✔
387
      (*pSource)->param = NULL;
213,158✔
388
    }
389

390
    if (!(*pSource)->onlyRef && (*pSource)->src.pBlock) {
426,389✔
391
      blockDataDestroy((*pSource)->src.pBlock);
48✔
392
      (*pSource)->src.pBlock = NULL;
48✔
393
    }
394

395
    taosMemoryFreeClear(*pSource);
426,389!
396
  }
397

398
  taosArrayClear(pOrderedSource);
481,494✔
399
}
481,515✔
400

401
void tsortDestroySortHandle(SSortHandle* pSortHandle) {
569,838✔
402
  if (pSortHandle == NULL) {
569,838✔
403
    return;
155,282✔
404
  }
405

406
  tsortClose(pSortHandle);
414,556✔
407
  if (pSortHandle->pMergeTree != NULL) {
414,567✔
408
    tMergeTreeDestroy(&pSortHandle->pMergeTree);
113,253✔
409
  }
410

411
  destroyDiskbasedBuf(pSortHandle->pBuf);
414,568✔
412
  taosMemoryFreeClear(pSortHandle->idStr);
414,558!
413
  blockDataDestroy(pSortHandle->pDataBlock);
414,567✔
414

415
  if (pSortHandle->pBoundedQueue) destroyBoundedQueue(pSortHandle->pBoundedQueue);
414,568✔
416

417
  int64_t fetchUs = 0, fetchNum = 0;
414,567✔
418
  tsortClearOrderedSource(pSortHandle->pOrderedSource, &fetchUs, &fetchNum);
414,567✔
419
  qDebug("all source fetch time: %" PRId64 "us num:%" PRId64 " %s", fetchUs, fetchNum, pSortHandle->idStr);
414,565✔
420

421
  taosArrayDestroy(pSortHandle->pOrderedSource);
414,570✔
422
  if (pSortHandle->pExtRowsMemFile != NULL) {
414,567✔
423
    destroySortMemFile(pSortHandle);
18,156✔
424
  }
425

426
  taosArrayDestroy(pSortHandle->pSortInfo);
414,567✔
427
  taosArrayDestroy(pSortHandle->aExtRowsOrders);
414,567✔
428
  pSortHandle->aExtRowsOrders = NULL;
414,569✔
429
  taosMemoryFreeClear(pSortHandle);
414,569!
430
}
431

432
int32_t tsortAddSource(SSortHandle* pSortHandle, void* pSource) {
495,203✔
433
  void* p = taosArrayPush(pSortHandle->pOrderedSource, &pSource);
495,203✔
434
  return (p != NULL) ? TSDB_CODE_SUCCESS : terrno;
495,273!
435
}
436

437
static int32_t doAddNewExternalMemSource(SDiskbasedBuf* pBuf, SArray* pAllSources, SSDataBlock* pBlock,
57,998✔
438
                                         int32_t* sourceId, SArray* pPageIdList) {
439
  int32_t      code = 0;
57,998✔
440
  int32_t      lino = 0;
57,998✔
441
  SSortSource* pSource = taosMemoryCalloc(1, sizeof(SSortSource));
57,998!
442
  QUERY_CHECK_NULL(pSource, code, lino, _err, terrno);
58,003!
443

444
  pSource->src.pBlock = pBlock;
58,003✔
445
  pSource->pageIdList = pPageIdList;
58,003✔
446

447
  SSortSource** p = taosArrayPush(pAllSources, &pSource);
58,001✔
448
  QUERY_CHECK_NULL(p, code, lino, _err, terrno);
58,001!
449
  pSource = NULL;
58,001✔
450

451
  (*sourceId) += 1;
58,001✔
452

453
  int32_t rowSize = blockDataGetSerialRowSize((*p)->src.pBlock);
58,001✔
454

455
  // The value of numOfRows must be greater than 0, which is guaranteed by the previous memory allocation
456
  int32_t numOfRows =
57,999✔
457
      (getBufPageSize(pBuf) - blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock))) / rowSize;
58,001✔
458
  QUERY_CHECK_CONDITION((numOfRows > 0), code, lino, _err, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
57,999!
459

460
  code = blockDataEnsureCapacity((*p)->src.pBlock, numOfRows);
57,999✔
461
  QUERY_CHECK_CODE(code, lino, _err);
58,001!
462

463
  return code;
58,001✔
464

465
_err:
×
466
  if (pSource) taosMemoryFree(pSource);
×
467
  qError("sort failed at %s:%d since %s", __func__, lino, tstrerror(code));
×
468
  return code;
×
469
}
470

471
static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
165✔
472
  int32_t start = 0;
165✔
473

474
  if (pHandle->pBuf == NULL) {
165✔
475
    if (!osTempSpaceAvailable()) {
74!
476
      terrno = TSDB_CODE_NO_DISKSPACE;
×
477
      qError("Add to buf failed since %s, tempDir:%s", terrstr(), tsTempDir);
×
478
      return terrno;
×
479
    }
480

481
    int32_t code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize,
74✔
482
                                      "sortExternalBuf", tsTempDir);
483
    if (code != TSDB_CODE_SUCCESS) {
74!
484
      return code;
×
485
    }
486
    dBufSetPrintInfo(pHandle->pBuf);
74✔
487
  }
488

489
  SArray* pPageIdList = taosArrayInit(4, sizeof(int32_t));
165✔
490
  if (pPageIdList == NULL) {
165!
491
    return terrno;
×
492
  }
493

494
  while (start < pDataBlock->info.rows) {
147,235✔
495
    int32_t stop = 0;
147,070✔
496

497
    int32_t code = blockDataSplitRows(pDataBlock, pDataBlock->info.hasVarCol, start, &stop, pHandle->pageSize);
147,070✔
498
    if (code) {
147,070!
499
      taosArrayDestroy(pPageIdList);
×
500
      return code;
×
501
    }
502

503
    SSDataBlock* p = NULL;
147,070✔
504
    code = blockDataExtractBlock(pDataBlock, start, stop - start + 1, &p);
147,070✔
505
    if (code) {
147,070!
506
      taosArrayDestroy(pPageIdList);
×
507
      return code;
×
508
    }
509

510
    int32_t pageId = -1;
147,070✔
511
    void*   pPage = getNewBufPage(pHandle->pBuf, &pageId);
147,070✔
512
    if (pPage == NULL) {
147,070!
513
      taosArrayDestroy(pPageIdList);
×
514
      blockDataDestroy(p);
×
515
      return terrno;
×
516
    }
517

518
    void* px = taosArrayPush(pPageIdList, &pageId);
147,070✔
519
    if (px == NULL) {
147,070!
520
      taosArrayDestroy(pPageIdList);
×
521
      blockDataDestroy(p);
×
522
      return terrno;
×
523
    }
524

525
    int32_t size = blockDataGetSize(p) + sizeof(int32_t) + taosArrayGetSize(p->pDataBlock) * sizeof(int32_t);
147,070✔
526
    if (size > getBufPageSize(pHandle->pBuf)) {
147,070!
527
      qError("sort failed at: %s:%d", __func__, __LINE__);
×
528
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
529
    }
530

531
    code = blockDataToBuf(pPage, p);
147,070✔
532
    if (code) {
147,070!
533
      return code;
×
534
    }
535

536
    setBufPageDirty(pPage, true);
147,070✔
537
    releaseBufPage(pHandle->pBuf, pPage);
147,070✔
538

539
    blockDataDestroy(p);
147,070✔
540
    start = stop + 1;
147,070✔
541
  }
542

543
  blockDataCleanup(pDataBlock);
165✔
544

545
  SSDataBlock* pBlock = NULL;
165✔
546
  int32_t      code = createOneDataBlock(pDataBlock, false, &pBlock);
165✔
547
  if (code) {
165!
548
    return code;
×
549
  }
550

551
  code = doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId, pPageIdList);
165✔
552
  if (code) {
165!
553
    blockDataDestroy(pBlock);
×
554
    taosArrayDestroy(pPageIdList);
×
555
  }
556
  return code;
165✔
557
}
558

559
static void setCurrentSourceDone(SSortSource* pSource, SSortHandle* pHandle) {
188,681✔
560
  pSource->src.rowIndex = -1;
188,681✔
561
  ++pHandle->numOfCompletedSources;
188,681✔
562
}
188,681✔
563

564
static int32_t sortComparInit(SMsortComparParam* pParam, SArray* pSources, int32_t startIndex, int32_t endIndex,
113,250✔
565
                              SSortHandle* pHandle) {
566
  pParam->pSources = taosArrayGet(pSources, startIndex);
113,250✔
567
  if (pParam->pSources == NULL) {
113,249!
568
    return terrno;
×
569
  }
570

571
  pParam->numOfSources = (endIndex - startIndex + 1);
113,249✔
572
  int32_t code = 0;
113,249✔
573

574
  // multi-pass internal merge sort is required
575
  if (pHandle->pBuf == NULL) {
113,249✔
576
    if (!osTempSpaceAvailable()) {
55,341!
577
      code = terrno = TSDB_CODE_NO_DISKSPACE;
×
578
      qError("Sort compare init failed since %s, tempDir:%s, idStr:%s", terrstr(), tsTempDir, pHandle->idStr);
×
579
      return code;
×
580
    }
581

582
    code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize,
55,341✔
583
                              "sortComparInit", tsTempDir);
584
    if (code != TSDB_CODE_SUCCESS) {
55,341!
585
      return code;
×
586
    } else {
587
      dBufSetPrintInfo(pHandle->pBuf);
55,341✔
588
    }
589
  }
590

591
  if (pHandle->type == SORT_SINGLESOURCE_SORT) {
113,250✔
592
    for (int32_t i = 0; i < pParam->numOfSources; ++i) {
115,913✔
593
      SSortSource* pSource = pParam->pSources[i];
58,002✔
594

595
      // set current source is done
596
      if (taosArrayGetSize(pSource->pageIdList) == 0) {
58,002!
597
        setCurrentSourceDone(pSource, pHandle);
×
598
        continue;
×
599
      }
600

601
      int32_t* pPgId = taosArrayGet(pSource->pageIdList, pSource->pageIndex);
58,001✔
602
      if (pPgId == NULL) {
58,001!
603
        return terrno;
×
604
      }
605

606
      void* pPage = getBufPage(pHandle->pBuf, *pPgId);
58,001✔
607
      if (NULL == pPage) {
58,003!
608
        return terrno;
×
609
      }
610

611
      code = blockDataFromBuf(pSource->src.pBlock, pPage);
58,003✔
612
      if (code != TSDB_CODE_SUCCESS) {
58,005!
613
        terrno = code;
×
614
        return code;
×
615
      }
616

617
      releaseBufPage(pHandle->pBuf, pPage);
58,005✔
618
    }
619
  } else {
620
    qDebug("start init for the multiway merge sort, %s", pHandle->idStr);
55,341✔
621
    int64_t st = taosGetTimestampUs();
55,341✔
622

623
    for (int32_t i = 0; i < pParam->numOfSources; ++i) {
191,426✔
624
      SSortSource* pSource = pParam->pSources[i];
136,085✔
625
      TAOS_CHECK_RETURN(pHandle->fetchfp(pSource->param, &pSource->src.pBlock));
136,085!
626

627
      // set current source is done
628
      if (pSource->src.pBlock == NULL) {
136,085✔
629
        setCurrentSourceDone(pSource, pHandle);
43,430✔
630
      }
631
    }
632

633
    int64_t et = taosGetTimestampUs();
55,342✔
634
    qDebug("init for merge sort completed, elapsed time:%.2f ms, %s", (et - st) / 1000.0, pHandle->idStr);
55,342✔
635
  }
636

637
  return code;
113,253✔
638
}
639

640
static int32_t appendOneRowToDataBlock(SSDataBlock* pBlock, const SSDataBlock* pSource, int32_t* rowIndex) {
34,096,430✔
641
  int32_t code = 0;
34,096,430✔
642

643
  for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
97,301,631✔
644
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
48,234,411✔
645
    if (pColInfo == NULL) {
55,470,403!
646
      return terrno;
×
647
    }
648

649
    SColumnInfoData* pSrcColInfo = taosArrayGet(pSource->pDataBlock, i);
55,470,403✔
650
    if (pSrcColInfo == NULL) {
53,462,143!
651
      return terrno;
×
652
    }
653

654
    bool isNull = colDataIsNull(pSrcColInfo, pSource->info.rows, *rowIndex, NULL);
53,611,717✔
655
    if (isNull) {
53,611,717✔
656
      code = colDataSetVal(pColInfo, pBlock->info.rows, NULL, true);
3,193,276✔
657
      if (code) {
3,194,338!
658
        return code;
×
659
      }
660
    } else {
661
      if (!pSrcColInfo->pData) continue;
50,418,441!
662
      char* pData = colDataGetData(pSrcColInfo, *rowIndex);
50,418,441!
663
      code = colDataSetVal(pColInfo, pBlock->info.rows, pData, false);
50,418,441✔
664
      if (code) {
60,010,863!
665
        return code;
×
666
      }
667
    }
668
  }
669

670
  pBlock->info.rows += 1;
35,006,662✔
671
  *rowIndex += 1;
35,006,662✔
672
  return code;
35,006,662✔
673
}
674

675
static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeTreeInfo* pTree, SSortHandle* pHandle,
174,672,236✔
676
                                           int32_t* numOfCompleted) {
677
  /*
678
   * load a new SDataBlock into memory of a given intermediate data-set source,
679
   * since it's last record in buffer has been chosen to be processed, as the winner of loser-tree
680
   */
681
  if (pSource->src.rowIndex >= pSource->src.pBlock->info.rows) {
174,672,236✔
682
    pSource->src.rowIndex = 0;
583,552✔
683

684
    if (pHandle->type == SORT_SINGLESOURCE_SORT) {
583,552✔
685
      pSource->pageIndex++;
288,430✔
686
      if (pSource->pageIndex >= taosArrayGetSize(pSource->pageIdList)) {
288,430✔
687
        qDebug("adjust merge tree. %d source completed %d", *numOfCompleted, pSource->pageIndex);
57,949✔
688
        (*numOfCompleted) += 1;
57,957✔
689
        pSource->src.rowIndex = -1;
57,957✔
690
        pSource->pageIndex = -1;
57,957✔
691
        blockDataDestroy(pSource->src.pBlock);
57,957✔
692
        pSource->src.pBlock = NULL;
57,957✔
693
      } else {
694
        if (pSource->pageIndex % 512 == 0) {
230,451✔
695
          qDebug("begin source %p page %d", pSource, pSource->pageIndex);
190!
696
        }
697

698
        int32_t* pPgId = taosArrayGet(pSource->pageIdList, pSource->pageIndex);
230,451✔
699
        if (pPgId == NULL) {
230,454!
700
          return terrno;
×
701
        }
702

703
        void* pPage = getBufPage(pHandle->pBuf, *pPgId);
230,454✔
704
        if (pPage == NULL) {
230,459!
705
          qError("failed to get buffer, code:%s", tstrerror(terrno));
×
706
          return terrno;
×
707
        }
708

709
        int32_t code = blockDataFromBuf(pSource->src.pBlock, pPage);
230,459✔
710
        if (code != TSDB_CODE_SUCCESS) {
230,485!
711
          return code;
×
712
        }
713
        releaseBufPage(pHandle->pBuf, pPage);
230,485✔
714
      }
715
    } else {
716
      int64_t st = taosGetTimestampUs();
295,123✔
717
      TAOS_CHECK_RETURN(pHandle->fetchfp(((SSortSource*)pSource)->param, &pSource->src.pBlock));
295,123!
718
      pSource->fetchUs += taosGetTimestampUs() - st;
295,122✔
719
      pSource->fetchNum++;
295,122✔
720
      if (pSource->src.pBlock == NULL) {
295,122✔
721
        (*numOfCompleted) += 1;
80,501✔
722
        pSource->src.rowIndex = -1;
80,501✔
723
        qDebug("adjust merge tree. %d source completed", *numOfCompleted);
80,501✔
724
      }
725
    }
726
  }
727

728
  /*
729
   * Adjust loser tree otherwise, according to new candidate data
730
   * if the loser tree is rebuild completed, we do not need to adjust
731
   */
732
  int32_t leafNodeIndex = tMergeTreeGetAdjustIndex(pTree);
174,672,242✔
733

734
#ifdef _DEBUG_VIEW
735
  printf("before adjust:\t");
736
  tMergeTreePrint(pTree);
737
#endif
738

739
  int32_t code = tMergeTreeAdjust(pTree, leafNodeIndex);
174,672,242✔
740
  if (TSDB_CODE_SUCCESS != code) {
173,180,202!
741
    return code;
×
742
  }
743

744
#ifdef _DEBUG_VIEW
745
  printf("\nafter adjust:\t");
746
  tMergeTreePrint(pTree);
747
#endif
748
  return TSDB_CODE_SUCCESS;
173,180,202✔
749
}
750

751
static int32_t getSortedBlockDataInner(SSortHandle* pHandle, SMsortComparParam* cmpParam, int32_t capacity,
×
752
                                       SSDataBlock** pRes) {
753
  *pRes = NULL;
×
754

755
  int32_t code = 0;
×
756
  blockDataCleanup(pHandle->pDataBlock);
×
757

758
  while (1) {
×
759
    if (cmpParam->numOfSources == pHandle->numOfCompletedSources) {
×
760
      break;
×
761
    }
762

763
    int32_t index = tMergeTreeGetChosenIndex(pHandle->pMergeTree);
×
764

765
    SSortSource* pSource = (*cmpParam).pSources[index];
×
766
    code = appendOneRowToDataBlock(pHandle->pDataBlock, pSource->src.pBlock, &pSource->src.rowIndex);
×
767
    if (code) {
×
768
      return code;
×
769
    }
770

771
    code = adjustMergeTreeForNextTuple(pSource, pHandle->pMergeTree, pHandle, &pHandle->numOfCompletedSources);
×
772
    if (code != TSDB_CODE_SUCCESS) {
×
773
      return code;
×
774
    }
775

776
    if (pHandle->pDataBlock->info.rows >= capacity) {
×
777
      *pRes = pHandle->pDataBlock;
×
778
      return code;
×
779
    }
780
  }
781

782
  *pRes = (pHandle->pDataBlock->info.rows > 0) ? pHandle->pDataBlock : NULL;
×
783
  return code;
×
784
}
785

786
// TODO: improve this function performance
787

788
int32_t tsortComparBlockCell(SSDataBlock* pLeftBlock, SSDataBlock* pRightBlock, int32_t leftRowIndex,
501,927✔
789
                             int32_t rightRowIndex, void* pCompareOrder) {
790
  SBlockOrderInfo* pOrder = pCompareOrder;
501,927✔
791
  SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->slotId);
501,927✔
792
  SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->slotId);
501,927✔
793

794
  bool isVarType = IS_VAR_DATA_TYPE(pLeftColInfoData->info.type);
501,927!
795
  if (pLeftColInfoData->hasNull || pRightColInfoData->hasNull) {
501,927✔
796
    bool leftNull = false;
777✔
797
    if (pLeftColInfoData->hasNull) {
777!
798
      if (pLeftBlock->pBlockAgg == NULL) {
777!
799
        leftNull = colDataIsNull_t(pLeftColInfoData, leftRowIndex, isVarType);
1,554!
800
      } else {
801
        leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, leftRowIndex,
×
802
                                 &pLeftBlock->pBlockAgg[pOrder->slotId]);
×
803
      }
804
    }
805

806
    bool rightNull = false;
777✔
807
    if (pRightColInfoData->hasNull) {
777!
808
      if (pRightBlock->pBlockAgg == NULL) {
777!
809
        rightNull = colDataIsNull_t(pRightColInfoData, rightRowIndex, isVarType);
1,554!
810
      } else {
811
        rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, rightRowIndex,
×
812
                                  &pRightBlock->pBlockAgg[pOrder->slotId]);
×
813
      }
814
    }
815

816
    if (leftNull && rightNull) {
777!
817
      return 0;
×
818
    }
819

820
    if (rightNull) {
777!
821
      return pOrder->nullFirst ? 1 : -1;
×
822
    }
823

824
    if (leftNull) {
777!
825
      return pOrder->nullFirst ? -1 : 1;
×
826
    }
827
  }
828

829
  void *left1, *right1;
830
  left1 = colDataGetData(pLeftColInfoData, leftRowIndex);
501,927!
831
  right1 = colDataGetData(pRightColInfoData, rightRowIndex);
501,927!
832
  __compar_fn_t fn = pOrder->compFn;
501,927✔
833
  int32_t       ret = fn(left1, right1);
501,927✔
834
  return ret;
502,936✔
835
}
836

837
int32_t msortComparFn(const void* pLeft, const void* pRight, void* param) {
267,978,802✔
838
  int32_t pLeftIdx = *(int32_t*)pLeft;
267,978,802✔
839
  int32_t pRightIdx = *(int32_t*)pRight;
267,978,802✔
840

841
  SMsortComparParam* pParam = (SMsortComparParam*)param;
267,978,802✔
842

843
  SArray* pInfo = pParam->orderInfo;
267,978,802✔
844

845
  SSortSource* pLeftSource = pParam->pSources[pLeftIdx];
267,978,802✔
846
  SSortSource* pRightSource = pParam->pSources[pRightIdx];
267,978,802✔
847

848
  // this input is exhausted, set the special value to denote this
849
  if (pLeftSource->src.rowIndex == -1) {
267,978,802✔
850
    return 1;
36,775,142✔
851
  }
852

853
  if (pRightSource->src.rowIndex == -1) {
231,203,660✔
854
    return -1;
33,505✔
855
  }
856

857
  SSDataBlock* pLeftBlock = pLeftSource->src.pBlock;
231,170,155✔
858
  SSDataBlock* pRightBlock = pRightSource->src.pBlock;
231,170,155✔
859

860
  if (pParam->cmpGroupId) {
231,170,155✔
861
    if (pLeftBlock->info.id.groupId != pRightBlock->info.id.groupId) {
37,481,233✔
862
      return pLeftBlock->info.id.groupId < pRightBlock->info.id.groupId ? -1 : 1;
11,827,078✔
863
    }
864
  }
865

866
  if (pParam->sortType == SORT_BLOCK_TS_MERGE) {
219,343,077✔
867
    SColumnInfoData* pLeftTsCol = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pParam->tsSlotId);
2,771,440✔
868
    SColumnInfoData* pRightTsCol = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pParam->tsSlotId);
2,771,440✔
869
    int64_t*         leftTs = (int64_t*)(pLeftTsCol->pData) + pLeftSource->src.rowIndex;
2,771,440✔
870
    int64_t*         rightTs = (int64_t*)(pRightTsCol->pData) + pRightSource->src.rowIndex;
2,771,440✔
871

872
    int32_t ret = pParam->cmpTsFn(leftTs, rightTs);
2,771,440✔
873
    if (ret == 0 && pParam->pPkOrder) {
2,756,058!
874
      ret = tsortComparBlockCell(pLeftBlock, pRightBlock, pLeftSource->src.rowIndex, pRightSource->src.rowIndex,
×
875
                                 (SBlockOrderInfo*)pParam->pPkOrder);
876
    }
877
    return ret;
2,761,189✔
878
  } else {
879
    bool isVarType;
880
    for (int32_t i = 0; i < pInfo->size; ++i) {
304,141,359✔
881
      SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pInfo, i);
231,002,163✔
882
      SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->slotId);
231,002,163✔
883
      SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->slotId);
231,002,163✔
884
      isVarType = IS_VAR_DATA_TYPE(pLeftColInfoData->info.type);
231,002,163!
885

886
      if (pLeftColInfoData->hasNull || pRightColInfoData->hasNull) {
231,002,163!
887
        bool leftNull = false;
231,082,764✔
888
        if (pLeftColInfoData->hasNull) {
231,082,764✔
889
          if (pLeftBlock->pBlockAgg == NULL) {
231,008,082!
890
            leftNull = colDataIsNull_t(pLeftColInfoData, pLeftSource->src.rowIndex, isVarType);
462,027,626!
891
          } else {
892
            leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex,
×
893
                                     &pLeftBlock->pBlockAgg[i]);
×
894
          }
895
        }
896

897
        bool rightNull = false;
231,082,764✔
898
        if (pRightColInfoData->hasNull) {
231,082,764✔
899
          if (pRightBlock->pBlockAgg == NULL) {
231,066,464!
900
            rightNull = colDataIsNull_t(pRightColInfoData, pRightSource->src.rowIndex, isVarType);
462,167,892!
901
          } else {
902
            rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex,
×
903
                                      &pRightBlock->pBlockAgg[i]);
×
904
          }
905
        }
906

907
        if (leftNull && rightNull) {
231,082,764✔
908
          continue;  // continue to next slot
1,262,561✔
909
        }
910

911
        if (rightNull) {
229,820,203✔
912
          return pOrder->nullFirst ? 1 : -1;
627,855!
913
        }
914

915
        if (leftNull) {
229,192,348✔
916
          return pOrder->nullFirst ? -1 : 1;
447!
917
        }
918
      }
919

920
      void *left1, *right1;
921
      if (isVarType) {
229,111,300✔
922
        left1 = colDataGetVarData(pLeftColInfoData, pLeftSource->src.rowIndex);
14,830,154✔
923
        right1 = colDataGetVarData(pRightColInfoData, pRightSource->src.rowIndex);
14,830,154✔
924
      } else {
925
        left1 = colDataGetNumData(pLeftColInfoData, pLeftSource->src.rowIndex);
214,281,146✔
926
        right1 = colDataGetNumData(pRightColInfoData, pRightSource->src.rowIndex);
214,281,146✔
927
      }
928

929
      __compar_fn_t fn = pOrder->compFn;
229,111,300✔
930
      if (!fn) {
229,111,300✔
931
        fn = getKeyComparFunc(pLeftColInfoData->info.type, pOrder->order);
21,032✔
932
        pOrder->compFn = fn;
21,032✔
933
      }
934

935
      int32_t ret = fn(left1, right1);
229,111,300✔
936
      if (ret == 0) {
229,134,599✔
937
        continue;
86,307,161✔
938
      } else {
939
        return ret;
142,827,438✔
940
      }
941
    }
942
  }
943

944
  return 0;
73,139,196✔
945
}
946

947
static int32_t doSortForEachGroup(SSortHandle* pHandle, int32_t sortTimes, int32_t numOfSorted,
×
948
                                  int32_t numOfInputSources, SArray* pResList, int32_t sortGroup, int32_t numOfRows) {
949
  int32_t code = 0;
×
950
  int32_t lino = 0;
×
951
  SArray* pPageIdList = NULL;
×
952

953
  for (int32_t i = 0; i < sortGroup; ++i) {
×
954
    qDebug("internal merge sort pass %d group %d. num input sources %d ", sortTimes, i, numOfInputSources);
×
955
    pHandle->sourceId += 1;
×
956

957
    int32_t end = (i + 1) * numOfInputSources - 1;
×
958
    if (end > numOfSorted - 1) {
×
959
      end = numOfSorted - 1;
×
960
    }
961

962
    pHandle->cmpParam.numOfSources = end - i * numOfInputSources + 1;
×
963

964
    code = sortComparInit(&pHandle->cmpParam, pHandle->pOrderedSource, i * numOfInputSources, end, pHandle);
×
965
    QUERY_CHECK_CODE(code, lino, _err);
×
966

967
    code =
968
        tMergeTreeCreate(&pHandle->pMergeTree, pHandle->cmpParam.numOfSources, &pHandle->cmpParam, pHandle->comparFn);
×
969
    QUERY_CHECK_CODE(code, lino, _err);
×
970

971
    int32_t nMergedRows = 0;
×
972
    pPageIdList = taosArrayInit(4, sizeof(int32_t));
×
973
    QUERY_CHECK_NULL(pPageIdList, code, lino, _err, terrno);
×
974

975
    while (1) {
×
976
      if (tsortIsClosed(pHandle) || (pHandle->abortCheckFn && pHandle->abortCheckFn(pHandle->abortCheckParam))) {
×
977
        code = TSDB_CODE_TSC_QUERY_CANCELLED;
×
978
        goto _err;
×
979
      }
980

981
      SSDataBlock* pDataBlock = NULL;
×
982
      code = getSortedBlockDataInner(pHandle, &pHandle->cmpParam, numOfRows, &pDataBlock);
×
983
      if (pDataBlock == NULL || code != 0) {
×
984
        break;
985
      }
986

987
      int32_t pageId = -1;
×
988
      void*   pPage = getNewBufPage(pHandle->pBuf, &pageId);
×
989
      QUERY_CHECK_NULL(pPage, code, lino, _err, terrno);
×
990

991
      void* px = taosArrayPush(pPageIdList, &pageId);
×
992
      QUERY_CHECK_NULL(px, code, lino, _err, terrno);
×
993

994
      int32_t size =
×
995
          blockDataGetSize(pDataBlock) + sizeof(int32_t) + taosArrayGetSize(pDataBlock->pDataBlock) * sizeof(int32_t);
×
996
      if (size > getBufPageSize(pHandle->pBuf)) {
×
997
        code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
998
        goto _err;
×
999
      }
1000

1001
      code = blockDataToBuf(pPage, pDataBlock);
×
1002
      QUERY_CHECK_CODE(code, lino, _err);
×
1003

1004
      setBufPageDirty(pPage, true);
×
1005
      releaseBufPage(pHandle->pBuf, pPage);
×
1006
      nMergedRows += pDataBlock->info.rows;
×
1007

1008
      blockDataCleanup(pDataBlock);
×
1009
      if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
×
1010
        break;
×
1011
      }
1012
    }
1013

1014
    code = sortComparCleanup(&pHandle->cmpParam);
×
1015
    QUERY_CHECK_CODE(code, lino, _err);
×
1016

1017
    tMergeTreeDestroy(&pHandle->pMergeTree);
×
1018
    pHandle->numOfCompletedSources = 0;
×
1019

1020
    SSDataBlock* pBlock = NULL;
×
1021
    code = createOneDataBlock(pHandle->pDataBlock, false, &pBlock);
×
1022
    QUERY_CHECK_CODE(code, lino, _err);
×
1023

1024
    code = doAddNewExternalMemSource(pHandle->pBuf, pResList, pBlock, &pHandle->sourceId, pPageIdList);
×
1025
    if (code != TSDB_CODE_SUCCESS) {
×
1026
      blockDataDestroy(pBlock);
×
1027
    }
1028
    QUERY_CHECK_CODE(code, lino, _err);
×
1029
  }
1030

1031
  return code;
×
1032

1033
_err:
×
1034
  taosArrayDestroy(pPageIdList);
×
1035
  qError("%s error happens:%s line:%d, code:%s", pHandle->idStr, __func__, lino, tstrerror(code));
×
1036
  return code;
×
1037
}
1038

1039
static int32_t doInternalMergeSort(SSortHandle* pHandle) {
249,215✔
1040
  size_t numOfSources = taosArrayGetSize(pHandle->pOrderedSource);
249,215✔
1041
  if (numOfSources == 0) {
249,217✔
1042
    return 0;
135,963✔
1043
  }
1044

1045
  // Calculate the I/O counts to complete the data sort.
1046
  double sortPass = floorl(log2(numOfSources) / log2(pHandle->numOfPages));
113,254✔
1047

1048
  pHandle->totalElapsed = taosGetTimestampUs() - pHandle->startTs;
113,251✔
1049

1050
  if (sortPass > 0) {
113,251!
1051
    size_t s = pHandle->pBuf ? getTotalBufSize(pHandle->pBuf) : 0;
×
1052
    qDebug("%s %d rounds mergesort required to complete the sort, first-round sorted data size:%" PRIzu
×
1053
           ", sort elapsed:%" PRId64 ", total elapsed:%" PRId64,
1054
           pHandle->idStr, (int32_t)(sortPass + 1), s, pHandle->sortElapsed, pHandle->totalElapsed);
1055
  } else {
1056
    qDebug("%s ordered source:%" PRIzu ", available buf:%d, no need internal sort", pHandle->idStr, numOfSources,
113,251✔
1057
           pHandle->numOfPages);
1058
  }
1059

1060
  int32_t size = (int32_t)blockDataGetSerialMetaSize(taosArrayGetSize(pHandle->pDataBlock->pDataBlock));
113,251✔
1061
  int32_t numOfRows = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize, size);
113,251✔
1062
  if (numOfRows < 0) {
113,253!
1063
    return terrno;
×
1064
  }
1065

1066
  int32_t code = blockDataEnsureCapacity(pHandle->pDataBlock, numOfRows);
113,253✔
1067
  if (code) {
113,252!
1068
    return code;
×
1069
  }
1070

1071
  // the initial pass + sortPass + final mergePass
1072
  pHandle->loops = sortPass + 2;
113,252✔
1073

1074
  size_t numOfSorted = taosArrayGetSize(pHandle->pOrderedSource);
113,252✔
1075
  for (int32_t t = 0; t < sortPass; ++t) {
113,252!
1076
    int64_t st = taosGetTimestampUs();
×
1077
    SArray* pResList = taosArrayInit(4, POINTER_BYTES);
×
1078
    if (pResList == NULL) {
×
1079
      return terrno;
×
1080
    }
1081

1082
    int32_t numOfInputSources = pHandle->numOfPages;
×
1083
    int32_t sortGroup = (numOfSorted + numOfInputSources - 1) / numOfInputSources;
×
1084

1085
    // Only *numOfInputSources* can be loaded into buffer to perform the external sort.
1086
    code = doSortForEachGroup(pHandle, t, numOfSorted, numOfInputSources, pResList, sortGroup, numOfRows);
×
1087
    if (code != 0) {
×
1088
      tsortClearOrderedSource(pResList, NULL, NULL);
×
1089
      taosArrayDestroy(pResList);
×
1090
      return code;
×
1091
    }
1092

1093
    tsortClearOrderedSource(pHandle->pOrderedSource, NULL, NULL);
×
1094
    void* px = taosArrayAddAll(pHandle->pOrderedSource, pResList);
×
1095
    if (px == NULL) {
×
1096
      tsortClearOrderedSource(pResList, NULL, NULL);
×
1097
      taosArrayDestroy(pResList);
×
1098
      return terrno;
×
1099
    }
1100

1101
    taosArrayDestroy(pResList);
×
1102
    numOfSorted = taosArrayGetSize(pHandle->pOrderedSource);
×
1103

1104
    int64_t el = taosGetTimestampUs() - st;
×
1105
    pHandle->totalElapsed += el;
×
1106

1107
    SDiskbasedBufStatis statis = getDBufStatis(pHandle->pBuf);
×
1108
    qDebug("%s %d round mergesort, elapsed:%" PRId64 " readDisk:%.2f Kb, flushDisk:%.2f Kb", pHandle->idStr, t + 1, el,
×
1109
           statis.loadBytes / 1024.0, statis.flushBytes / 1024.0);
1110

1111
    if (pHandle->type == SORT_MULTISOURCE_MERGE) {
×
1112
      pHandle->type = SORT_SINGLESOURCE_SORT;
×
1113
      pHandle->comparFn = msortComparFn;
×
1114
    }
1115
  }
1116

1117
  pHandle->cmpParam.numOfSources = taosArrayGetSize(pHandle->pOrderedSource);
113,252✔
1118
  return 0;
113,250✔
1119
}
1120

1121
// get sort page size
1122
int32_t getProperSortPageSize(size_t rowSize, uint32_t numOfCols) {
318,556✔
1123
  uint32_t pgSize = rowSize * 4 + blockDataGetSerialMetaSize(numOfCols);
318,556✔
1124
  if (pgSize < DEFAULT_PAGESIZE) {
318,725✔
1125
    return DEFAULT_PAGESIZE;
304,202✔
1126
  }
1127

1128
  return pgSize;
14,523✔
1129
}
1130

1131
static int32_t createPageBuf(SSortHandle* pHandle) {
66,956✔
1132
  if (pHandle->pBuf == NULL) {
66,956!
1133
    if (!osTempSpaceAvailable()) {
66,957!
1134
      terrno = TSDB_CODE_NO_DISKSPACE;
×
1135
      qError("create page buf failed since %s, tempDir:%s", terrstr(), tsTempDir);
×
1136
      return terrno;
×
1137
    }
1138

1139
    int32_t code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize,
66,956✔
1140
                                      "tableBlocksBuf", tsTempDir);
1141
    if (code != TSDB_CODE_SUCCESS) {
66,962!
1142
      return code;
×
1143
    } else {
1144
      dBufSetPrintInfo(pHandle->pBuf);
66,962✔
1145
    }
1146
  }
1147
  return 0;
66,960✔
1148
}
1149

1150
int32_t tsortAppendTupleToBlock(SSortHandle* pHandle, SSDataBlock* pBlock, STupleHandle* pTupleHandle) {
55,567,582✔
1151
  int32_t code = 0;
55,567,582✔
1152

1153
  if (pHandle->bSortByRowId) {
55,567,582✔
1154
    int32_t *p1, *p2, *p3;
1155
    tsortGetValue(pTupleHandle, 1, (void**)&p1);
3,068,486✔
1156
    tsortGetValue(pTupleHandle, 2, (void**)&p2);
3,066,754✔
1157
    tsortGetValue(pTupleHandle, 3, (void**)&p3);
3,065,142✔
1158

1159
    int32_t regionId = *p1;
3,062,481✔
1160
    int32_t offset = *p2;
3,062,481✔
1161
    int32_t length = *p3;
3,062,481✔
1162

1163
    char* buf = NULL;
3,062,481✔
1164
    bool  bFreeRow = false;
3,062,481✔
1165

1166
    code = getRowBufFromExtMemFile(pHandle, regionId, offset, length, &buf, &bFreeRow);
3,062,481✔
1167
    if (code) {
3,063,799!
1168
      return code;
×
1169
    }
1170

1171
    int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
3,063,799✔
1172
    char*   isNull = (char*)buf;
3,063,432✔
1173
    char*   pStart = (char*)buf + sizeof(int8_t) * numOfCols;
3,063,432✔
1174
    for (int32_t i = 0; i < numOfCols; ++i) {
28,234,414✔
1175
      SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
24,750,785✔
1176
      if (pColInfo == NULL) {
24,734,176!
1177
        return terrno;
×
1178
      }
1179

1180
      if (!isNull[i]) {
25,143,117✔
1181
        code = colDataSetVal(pColInfo, pBlock->info.rows, pStart, false);
18,544,228✔
1182
        if (code) {
18,572,093!
1183
          return code;
×
1184
        }
1185

1186
        if (pColInfo->info.type == TSDB_DATA_TYPE_JSON) {
18,572,093✔
1187
          int32_t dataLen = getJsonValueLen(pStart);
148✔
1188
          pStart += dataLen;
148✔
1189
        } else if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
18,571,945!
1190
          if (IS_STR_DATA_BLOB(pColInfo->info.type)) {
6,789,895!
1191
            pStart += blobDataTLen(pStart);
×
1192
          } else {
1193
            pStart += varDataTLen(pStart);
6,904,291✔
1194
          }
1195
        } else {
1196
          int32_t bytes = pColInfo->info.bytes;
11,782,050✔
1197
          pStart += bytes;
11,782,050✔
1198
        }
1199
      } else {
1200
        colDataSetNULL(pColInfo, pBlock->info.rows);
6,598,889✔
1201
      }
1202
    }
1203

1204
    if (*(int32_t*)pStart != pStart - buf) {
3,483,629!
1205
      qError("table merge scan row buf deserialization. length error %d != %d ", *(int32_t*)pStart,
×
1206
             (int32_t)(pStart - buf));
1207
    }
1208

1209
    if (bFreeRow) {
3,483,629!
1210
      taosMemoryFree(buf);
×
1211
    }
1212

1213
    pBlock->info.dataLoad = 1;
3,483,629✔
1214

1215
    SDataBlockInfo info = {0};
3,483,629✔
1216
    tsortGetBlockInfo(pTupleHandle, &info);
3,483,629✔
1217

1218
    pBlock->info.scanFlag = info.scanFlag;
3,068,600✔
1219
    pBlock->info.rows += 1;
3,068,600✔
1220

1221
  } else {
1222
    for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
145,877,683✔
1223
      SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
92,463,271✔
1224
      if (pColInfo == NULL) {
91,439,623!
1225
        return terrno;
×
1226
      }
1227

1228
      bool isNull = tsortIsNullVal(pTupleHandle, i);
91,439,623✔
1229
      if (isNull) {
89,028,421✔
1230
        colDataSetNULL(pColInfo, pBlock->info.rows);
6,577,743✔
1231
      } else {
1232
        char* pData = NULL;
82,450,678✔
1233
        tsortGetValue(pTupleHandle, i, (void**)&pData);
82,450,678✔
1234
        if (pData != NULL) {
85,553,260✔
1235
          code = colDataSetVal(pColInfo, pBlock->info.rows, pData, false);
85,452,960✔
1236
          if (code) {
86,700,544!
1237
            return code;
×
1238
          }
1239
        }
1240
      }
1241
    }
1242

1243
    pBlock->info.dataLoad = 1;
41,493,051✔
1244
    SDataBlockInfo info = {0};
41,493,051✔
1245
    tsortGetBlockInfo(pTupleHandle, &info);
41,493,051✔
1246

1247
    pBlock->info.scanFlag = info.scanFlag;
53,251,580✔
1248
    pBlock->info.rows += 1;
53,251,580✔
1249
  }
1250

1251
  return code;
56,320,180✔
1252
}
1253

1254
static int32_t blockRowToBuf(SSDataBlock* pBlock, int32_t rowIdx, char* buf) {
3,048,821✔
1255
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
3,048,821✔
1256

1257
  char* isNull = (char*)buf;
3,049,496✔
1258
  char* pStart = (char*)buf + sizeof(int8_t) * numOfCols;
3,049,496✔
1259
  for (int32_t i = 0; i < numOfCols; ++i) {
27,954,353✔
1260
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
24,622,603✔
1261
    if (pCol == NULL) {
24,608,050!
1262
      return terrno;
×
1263
    }
1264

1265
    if (colDataIsNull_s(pCol, rowIdx)) {
49,809,714✔
1266
      isNull[i] = 1;
6,511,260✔
1267
      continue;
6,511,260✔
1268
    }
1269

1270
    isNull[i] = 0;
18,393,597✔
1271
    char* pData = colDataGetData(pCol, rowIdx);
18,393,597!
1272
    if (pCol->info.type == TSDB_DATA_TYPE_JSON) {
18,393,597✔
1273
      if (pCol->pData) {
148✔
1274
        int32_t dataLen = getJsonValueLen(pData);
144✔
1275
        memcpy(pStart, pData, dataLen);
144✔
1276
        pStart += dataLen;
144✔
1277
      } else {
1278
        // the column that is pre-allocated has no data and has offset
1279
        *pStart = 0;
4✔
1280
        pStart += 1;
4✔
1281
      }
1282
    } else if (IS_VAR_DATA_TYPE(pCol->info.type)) {
18,393,449!
1283
      if (IS_STR_DATA_BLOB(pCol->info.type)) {
6,700,249!
1284
        if (pCol->pData) {
×
1285
          blobDataCopy(pStart, pData);
×
1286
          pStart += blobDataTLen(pData);
×
1287
        } else {
1288
          // the column that is pre-allocated has no data and has offset
1289
          *(BlobDataLenT*)(pStart) = 0;
×
1290
          pStart += BLOBSTR_HEADER_SIZE;
×
1291
        }
1292
      } else {
1293
        if (pCol->pData) {
6,859,429!
1294
          varDataCopy(pStart, pData);
6,860,906✔
1295
          pStart += varDataTLen(pData);
6,860,906✔
1296
        } else {
1297
          // the column that is pre-allocated has no data and has offset
1298
          *(VarDataLenT*)(pStart) = 0;
×
1299
          pStart += VARSTR_HEADER_SIZE;
×
1300
        }
1301
      }
1302
    } else {
1303
      int32_t bytes = pCol->info.bytes;
11,693,200✔
1304
      memcpy(pStart, pData, bytes);
11,693,200✔
1305
      pStart += bytes;
11,693,200✔
1306
    }
1307
  }
1308
  *(int32_t*)pStart = (char*)pStart - (char*)buf;
3,331,750✔
1309
  pStart += sizeof(int32_t);
3,331,750✔
1310
  return (int32_t)(pStart - (char*)buf);
3,331,750✔
1311
}
1312

1313
static int32_t getRowBufFromExtMemFile(SSortHandle* pHandle, int32_t regionId, int32_t tupleOffset, int32_t rowLen,
3,063,951✔
1314
                                       char** ppRow, bool* pFreeRow) {
1315
  SSortMemFile*       pMemFile = pHandle->pExtRowsMemFile;
3,063,951✔
1316
  SSortMemFileRegion* pRegion = taosArrayGet(pMemFile->aFileRegions, regionId);
3,063,951✔
1317
  if (pRegion == NULL) {
3,063,777!
1318
    return terrno;
×
1319
  }
1320

1321
  if (pRegion->buf == NULL) {
3,063,940✔
1322
    pRegion->bufRegOffset = 0;
14,967✔
1323
    pRegion->buf = taosMemoryMalloc(pMemFile->blockSize);
14,967!
1324
    if (pRegion->buf == NULL) {
14,965!
1325
      return terrno;
×
1326
    }
1327

1328
    TAOS_CHECK_RETURN(taosSeekCFile(pMemFile->pTdFile, pRegion->fileOffset, SEEK_SET));
14,965!
1329

1330
    int32_t readBytes = TMIN(pMemFile->blockSize, pRegion->regionSize);
14,964✔
1331
    int32_t ret = taosReadFromCFile(pRegion->buf, readBytes, 1, pMemFile->pTdFile);
14,964✔
1332
    if (ret != 1) {
14,965!
1333
      terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
1334
      return terrno;
×
1335
    }
1336
    pRegion->bufLen = readBytes;
14,967✔
1337
  }
1338
  if (pRegion->bufRegOffset > tupleOffset) {
3,063,940!
1339
    qError("sort failed at: %s:%d", __func__, __LINE__);
×
1340
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1341
  }
1342
  if (pRegion->bufRegOffset + pRegion->bufLen >= tupleOffset + rowLen) {
3,063,940!
1343
    *pFreeRow = false;
3,064,012✔
1344
    *ppRow = pRegion->buf + tupleOffset - pRegion->bufRegOffset;
3,064,012✔
1345
  } else {
1346
    *ppRow = taosMemoryMalloc(rowLen);
×
1347
    if (*ppRow == NULL) {
×
1348
      return terrno;
×
1349
    }
1350
    int32_t szThisBlock = pRegion->bufLen - (tupleOffset - pRegion->bufRegOffset);
×
1351
    memcpy(*ppRow, pRegion->buf + tupleOffset - pRegion->bufRegOffset, szThisBlock);
×
1352

1353
    // todo
1354
    if (taosSeekCFile(pMemFile->pTdFile, pRegion->fileOffset + pRegion->bufRegOffset + pRegion->bufLen, SEEK_SET) < 0) {
×
1355
      return terrno;
×
1356
    }
1357
    int32_t readBytes = TMIN(pMemFile->blockSize, pRegion->regionSize - (pRegion->bufRegOffset + pRegion->bufLen));
×
1358
    int32_t ret = taosReadFromCFile(pRegion->buf, readBytes, 1, pMemFile->pTdFile);
×
1359
    if (ret != 1) {
×
1360
      taosMemoryFreeClear(*ppRow);
×
1361
      terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
1362
      return terrno;
×
1363
    }
1364
    memcpy(*ppRow + szThisBlock, pRegion->buf, rowLen - szThisBlock);
×
1365
    *pFreeRow = true;
×
1366
    pRegion->bufRegOffset += pRegion->bufLen;
×
1367
    pRegion->bufLen = readBytes;
×
1368
  }
1369
  return TSDB_CODE_SUCCESS;
3,064,012✔
1370
}
1371

1372
static int32_t createSortMemFile(SSortHandle* pHandle) {
18,149✔
1373
  if (pHandle->pExtRowsMemFile != NULL) {
18,149!
1374
    return TSDB_CODE_SUCCESS;
×
1375
  }
1376
  int32_t       code = TSDB_CODE_SUCCESS;
18,149✔
1377
  SSortMemFile* pMemFile = taosMemoryCalloc(1, sizeof(SSortMemFile));
18,149!
1378
  if (pMemFile == NULL) {
18,152!
1379
    code = terrno;
×
1380
  }
1381
  if (code == TSDB_CODE_SUCCESS) {
18,152!
1382
    taosGetTmpfilePath(tsTempDir, "sort-ext-mem", pMemFile->memFilePath);
18,152✔
1383
    pMemFile->pTdFile = taosOpenCFile(pMemFile->memFilePath, "w+b");
18,156✔
1384
    if (pMemFile->pTdFile == NULL) {
18,156!
1385
      code = terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
1386
    }
1387
  }
1388
  if (code == TSDB_CODE_SUCCESS) {
18,156!
1389
    code = taosSetAutoDelFile(pMemFile->memFilePath);
18,156✔
1390
    if (code) {
18,156!
1391
      qError("failed to set the auto-delete file attribute");
×
1392
      return code;
×
1393
    }
1394

1395
    pMemFile->currRegionId = -1;
18,156✔
1396
    pMemFile->currRegionOffset = -1;
18,156✔
1397

1398
    pMemFile->writeBufSize = 4 * 1024 * 1024;
18,156✔
1399
    pMemFile->writeFileOffset = -1;
18,156✔
1400
    pMemFile->bRegionDirty = false;
18,156✔
1401

1402
    pMemFile->writeBuf = taosMemoryMalloc(pMemFile->writeBufSize);
18,156!
1403
    if (pMemFile->writeBuf == NULL) {
18,156!
1404
      code = terrno;
×
1405
    }
1406
  }
1407

1408
  if (code == TSDB_CODE_SUCCESS) {
18,156!
1409
    pMemFile->cacheSize = pHandle->extRowsMemSize;
18,156✔
1410
    pMemFile->aFileRegions = taosArrayInit(64, sizeof(SSortMemFileRegion));
18,156✔
1411
    if (pMemFile->aFileRegions == NULL) {
18,156!
1412
      code = terrno;
×
1413
    }
1414
  }
1415

1416
  if (code == TSDB_CODE_SUCCESS) {
18,156!
1417
    pHandle->pExtRowsMemFile = pMemFile;
18,156✔
1418
  } else {
1419
    if (pMemFile) {
×
1420
      if (pMemFile->aFileRegions) taosMemoryFreeClear(pMemFile->aFileRegions);
×
1421
      if (pMemFile->writeBuf) taosMemoryFreeClear(pMemFile->writeBuf);
×
1422
      if (pMemFile->pTdFile) {
×
1423
        (void)taosCloseCFile(pMemFile->pTdFile);
×
1424
        pMemFile->pTdFile = NULL;
×
1425
      }
1426
      taosMemoryFreeClear(pMemFile);
×
1427
    }
1428
  }
1429
  return code;
18,156✔
1430
}
1431

1432
static void destroySortMemFile(SSortHandle* pHandle) {
18,156✔
1433
  if (pHandle->pExtRowsMemFile == NULL) {
18,156!
1434
    return;
×
1435
  }
1436

1437
  SSortMemFile* pMemFile = pHandle->pExtRowsMemFile;
18,156✔
1438
  for (int32_t i = 0; i < taosArrayGetSize(pMemFile->aFileRegions); ++i) {
33,123✔
1439
    SSortMemFileRegion* pRegion = taosArrayGet(pMemFile->aFileRegions, i);
14,967✔
1440
    if (pRegion == NULL) {
14,967!
1441
      continue;
×
1442
    }
1443

1444
    taosMemoryFree(pRegion->buf);
14,967!
1445
  }
1446

1447
  taosArrayDestroy(pMemFile->aFileRegions);
18,155✔
1448
  pMemFile->aFileRegions = NULL;
18,156✔
1449

1450
  taosMemoryFree(pMemFile->writeBuf);
18,156!
1451
  pMemFile->writeBuf = NULL;
18,156✔
1452

1453
  (void)taosCloseCFile(pMemFile->pTdFile);
18,156✔
1454
  pMemFile->pTdFile = NULL;
18,156✔
1455
  (void)taosRemoveFile(pMemFile->memFilePath);
18,156✔
1456
  taosMemoryFree(pMemFile);
18,156!
1457
  pHandle->pExtRowsMemFile = NULL;
18,156✔
1458
}
1459

1460
static int32_t tsortOpenRegion(SSortHandle* pHandle) {
14,967✔
1461
  SSortMemFile* pMemFile = pHandle->pExtRowsMemFile;
14,967✔
1462
  int32_t       code = 0;
14,967✔
1463

1464
  if (pMemFile->currRegionId == -1) {
14,967!
1465
    SSortMemFileRegion region = {0};
14,967✔
1466
    region.fileOffset = 0;
14,967✔
1467
    region.bufRegOffset = 0;
14,967✔
1468
    void* px = taosArrayPush(pMemFile->aFileRegions, &region);
14,967✔
1469
    if (px == NULL) {
14,967!
1470
      code = terrno;
×
1471
    }
1472

1473
    pMemFile->currRegionId = 0;
14,967✔
1474
    pMemFile->currRegionOffset = 0;
14,967✔
1475
    pMemFile->writeFileOffset = 0;
14,967✔
1476
  } else {
1477
    SSortMemFileRegion  regionNew = {0};
×
1478
    SSortMemFileRegion* pRegion = taosArrayGet(pMemFile->aFileRegions, pMemFile->currRegionId);
×
1479
    if (pRegion == NULL) {
×
1480
      return terrno;
×
1481
    }
1482

1483
    regionNew.fileOffset = pRegion->fileOffset + pRegion->regionSize;
×
1484
    regionNew.bufRegOffset = 0;
×
1485

1486
    void* px = taosArrayPush(pMemFile->aFileRegions, &regionNew);
×
1487
    if (px == NULL) {
×
1488
      code = terrno;
×
1489
    }
1490
    ++pMemFile->currRegionId;
×
1491
    pMemFile->currRegionOffset = 0;
×
1492
    pMemFile->writeFileOffset = regionNew.fileOffset;
×
1493
  }
1494
  return code;
14,967✔
1495
}
1496

1497
static int32_t tsortCloseRegion(SSortHandle* pHandle) {
14,966✔
1498
  SSortMemFile*       pMemFile = pHandle->pExtRowsMemFile;
14,966✔
1499
  SSortMemFileRegion* pRegion = taosArrayGet(pMemFile->aFileRegions, pMemFile->currRegionId);
14,966✔
1500
  if (pRegion == NULL) {
14,966!
1501
    return terrno;
×
1502
  }
1503

1504
  pRegion->regionSize = pMemFile->currRegionOffset;
14,966✔
1505
  int32_t writeBytes = pRegion->regionSize - (pMemFile->writeFileOffset - pRegion->fileOffset);
14,966✔
1506
  if (writeBytes > 0) {
14,966!
1507
    int32_t ret = fwrite(pMemFile->writeBuf, writeBytes, 1, pMemFile->pTdFile);
14,966✔
1508
    if (ret != 1) {
14,964!
1509
      terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
1510
      return terrno;
×
1511
    }
1512
    pMemFile->bRegionDirty = false;
14,964✔
1513
  }
1514
  return TSDB_CODE_SUCCESS;
14,964✔
1515
}
1516

1517
static int32_t tsortFinalizeRegions(SSortHandle* pHandle) {
18,155✔
1518
  SSortMemFile* pMemFile = pHandle->pExtRowsMemFile;
18,155✔
1519
  size_t        numRegions = taosArrayGetSize(pMemFile->aFileRegions);
18,155✔
1520
  if (numRegions != (pMemFile->currRegionId + 1)) {
18,156!
1521
    qError("sort failed at: %s:%d", __func__, __LINE__);
×
1522
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1523
  }
1524
  if (numRegions == 0) {
18,156✔
1525
    return TSDB_CODE_SUCCESS;
3,189✔
1526
  }
1527

1528
  int32_t blockReadBytes = (pMemFile->cacheSize / numRegions + 4095) & ~4095;
14,967✔
1529
  pMemFile->blockSize = blockReadBytes;
14,967✔
1530

1531
  for (int32_t i = 0; i < numRegions; ++i) {
29,934✔
1532
    SSortMemFileRegion* pRegion = taosArrayGet(pMemFile->aFileRegions, i);
14,967✔
1533
    if (pRegion == NULL) {
14,967!
1534
      return terrno;
×
1535
    }
1536

1537
    pRegion->bufRegOffset = 0;
14,967✔
1538
  }
1539

1540
  taosMemoryFree(pMemFile->writeBuf);
14,967!
1541
  pMemFile->writeBuf = NULL;
14,967✔
1542
  return TSDB_CODE_SUCCESS;
14,967✔
1543
}
1544

1545
static int32_t saveBlockRowToExtRowsMemFile(SSortHandle* pHandle, SSDataBlock* pBlock, int32_t rowIdx,
3,048,524✔
1546
                                            int32_t* pRegionId, int32_t* pOffset, int32_t* pLength) {
1547
  SSortMemFile*       pMemFile = pHandle->pExtRowsMemFile;
3,048,524✔
1548
  SSortMemFileRegion* pRegion = taosArrayGet(pMemFile->aFileRegions, pMemFile->currRegionId);
3,048,524✔
1549
  if (pRegion == NULL) {
3,047,839!
1550
    return terrno;
×
1551
  }
1552

1553
  {
1554
    if (pMemFile->currRegionOffset + pHandle->extRowBytes >= pMemFile->writeBufSize) {
3,047,839!
1555
      int32_t writeBytes = pMemFile->currRegionOffset - (pMemFile->writeFileOffset - pRegion->fileOffset);
×
1556
      int32_t ret = fwrite(pMemFile->writeBuf, writeBytes, 1, pMemFile->pTdFile);
×
1557
      if (ret != 1) {
×
1558
        terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
1559
        return terrno;
×
1560
      }
1561
      pMemFile->writeFileOffset = pRegion->fileOffset + pMemFile->currRegionOffset;
×
1562
    }
1563
  }
1564

1565
  *pRegionId = pMemFile->currRegionId;
3,047,839✔
1566
  *pOffset = pMemFile->currRegionOffset;
3,047,839✔
1567
  int32_t writeBufOffset = pMemFile->currRegionOffset - (pMemFile->writeFileOffset - pRegion->fileOffset);
3,047,839✔
1568
  int32_t blockLen = blockRowToBuf(pBlock, rowIdx, pMemFile->writeBuf + writeBufOffset);
3,047,839✔
1569
  *pLength = blockLen;
3,048,707✔
1570

1571
  pMemFile->currRegionOffset += blockLen;
3,048,707✔
1572
  pMemFile->bRegionDirty = true;
3,048,707✔
1573
  return TSDB_CODE_SUCCESS;
3,048,707✔
1574
}
1575

1576
static int32_t appendToRowIndexDataBlock(SSortHandle* pHandle, SSDataBlock* pSource, int32_t* rowIndex) {
3,048,460✔
1577
  int32_t pageId = -1;
3,048,460✔
1578
  int32_t offset = -1;
3,048,460✔
1579
  int32_t length = -1;
3,048,460✔
1580
  int32_t code = 0;
3,048,460✔
1581

1582
  code = saveBlockRowToExtRowsMemFile(pHandle, pSource, *rowIndex, &pageId, &offset, &length);
3,048,460✔
1583
  if (code) {
3,049,001!
1584
    return code;
×
1585
  }
1586

1587
  SSDataBlock*     pBlock = pHandle->pDataBlock;
3,049,001✔
1588
  SBlockOrderInfo* extRowsTsOrder = taosArrayGet(pHandle->aExtRowsOrders, 0);
3,049,001✔
1589
  if (extRowsTsOrder == NULL) {
3,047,526!
1590
    return terrno;
×
1591
  }
1592

1593
  SColumnInfoData* pSrcTsCol = taosArrayGet(pSource->pDataBlock, extRowsTsOrder->slotId);
3,047,526✔
1594
  if (pSrcTsCol == NULL) {
3,046,374!
1595
    return terrno;
×
1596
  }
1597

1598
  SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, 0);
3,046,374✔
1599
  if (pTsCol == NULL) {
3,045,811!
1600
    return terrno;
×
1601
  }
1602

1603
  char* pData = colDataGetData(pSrcTsCol, *rowIndex);
3,046,626!
1604
  code = colDataSetVal(pTsCol, pBlock->info.rows, pData, false);
3,046,626✔
1605
  if (code) {
3,054,520!
1606
    return code;
×
1607
  }
1608

1609
  SColumnInfoData* pRegionIdCol = taosArrayGet(pBlock->pDataBlock, 1);
3,054,520✔
1610
  if (pRegionIdCol == NULL) {
3,052,989✔
1611
    return terrno;
212✔
1612
  }
1613

1614
  colDataSetInt32(pRegionIdCol, pBlock->info.rows, &pageId);
3,052,777✔
1615

1616
  SColumnInfoData* pOffsetCol = taosArrayGet(pBlock->pDataBlock, 2);
3,052,777✔
1617
  if (pOffsetCol == NULL) {
3,047,975!
1618
    return terrno;
×
1619
  }
1620

1621
  colDataSetInt32(pOffsetCol, pBlock->info.rows, &offset);
3,049,867✔
1622

1623
  SColumnInfoData* pLengthCol = taosArrayGet(pBlock->pDataBlock, 3);
3,049,867✔
1624
  if (pLengthCol == NULL) {
3,044,751!
1625
    return terrno;
×
1626
  }
1627

1628
  colDataSetInt32(pLengthCol, pBlock->info.rows, &length);
3,047,110✔
1629

1630
  if (pHandle->bSortPk) {
3,047,110✔
1631
    SBlockOrderInfo* extRowsPkOrder = taosArrayGet(pHandle->aExtRowsOrders, 1);
310,815✔
1632
    if (extRowsPkOrder == NULL) {
310,057!
1633
      return terrno;
×
1634
    }
1635

1636
    SColumnInfoData* pSrcPkCol = taosArrayGet(pSource->pDataBlock, extRowsPkOrder->slotId);
310,057✔
1637
    if (pSrcPkCol == NULL) {
310,104!
1638
      return terrno;
×
1639
    }
1640

1641
    SColumnInfoData* pPkCol = taosArrayGet(pBlock->pDataBlock, 4);
310,104✔
1642
    if (pPkCol == NULL) {
309,763!
1643
      return terrno;
×
1644
    }
1645

1646
    if (colDataIsNull_s(pSrcPkCol, *rowIndex)) {
620,624!
1647
      colDataSetNULL(pPkCol, pBlock->info.rows);
×
1648
    } else {
1649
      char* pPkData = colDataGetData(pSrcPkCol, *rowIndex);
310,312!
1650
      code = colDataSetVal(pPkCol, pBlock->info.rows, pPkData, false);
310,312✔
1651
      if (code) {
311,046!
1652
        return code;
×
1653
      }
1654
    }
1655
  }
1656

1657
  pBlock->info.rows += 1;
3,047,341✔
1658
  *rowIndex += 1;
3,047,341✔
1659
  return code;
3,047,341✔
1660
}
1661

1662
static int32_t initRowIdSort(SSortHandle* pHandle) {
18,154✔
1663
  SBlockOrderInfo* pkOrder = (pHandle->bSortPk) ? taosArrayGet(pHandle->aExtRowsOrders, 1) : NULL;
18,154✔
1664
  SColumnInfoData* extPkCol =
18,156✔
1665
      (pHandle->bSortPk) ? taosArrayGet(pHandle->pDataBlock->pDataBlock, pkOrder->slotId) : NULL;
18,156✔
1666

1667
  SColumnInfoData pkCol = {0};
18,156✔
1668
  SSDataBlock*    pSortInput = NULL;
18,156✔
1669
  int32_t         code = createDataBlock(&pSortInput);
18,156✔
1670
  if (code) {
18,155!
1671
    return code;
×
1672
  }
1673

1674
  SColumnInfoData tsCol = createColumnInfoData(TSDB_DATA_TYPE_TIMESTAMP, 8, 1);
18,155✔
1675
  code = blockDataAppendColInfo(pSortInput, &tsCol);
18,156✔
1676
  if (code) {
18,156!
1677
    blockDataDestroy(pSortInput);
×
1678
    return code;
×
1679
  }
1680

1681
  SColumnInfoData regionIdCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 2);
18,156✔
1682
  code = blockDataAppendColInfo(pSortInput, &regionIdCol);
18,155✔
1683
  if (code) {
18,155!
1684
    blockDataDestroy(pSortInput);
×
1685
    return code;
×
1686
  }
1687

1688
  SColumnInfoData offsetCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 3);
18,155✔
1689
  code = blockDataAppendColInfo(pSortInput, &offsetCol);
18,156✔
1690
  if (code) {
18,154!
1691
    blockDataDestroy(pSortInput);
×
1692
    return code;
×
1693
  }
1694

1695
  SColumnInfoData lengthCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 4);
18,154✔
1696
  code = blockDataAppendColInfo(pSortInput, &lengthCol);
18,156✔
1697
  if (code) {
18,156!
1698
    blockDataDestroy(pSortInput);
×
1699
    return code;
×
1700
  }
1701

1702
  if (pHandle->bSortPk) {
18,156✔
1703
    pkCol = createColumnInfoData(extPkCol->info.type, extPkCol->info.bytes, 5);
8,632✔
1704
    code = blockDataAppendColInfo(pSortInput, &pkCol);
8,632✔
1705
    if (code) {
8,632!
1706
      blockDataDestroy(pSortInput);
×
1707
      return code;
×
1708
    }
1709
  }
1710

1711
  blockDataDestroy(pHandle->pDataBlock);
18,156✔
1712
  pHandle->pDataBlock = pSortInput;
18,153✔
1713

1714
  //  int32_t  rowSize = blockDataGetRowSize(pHandle->pDataBlock);
1715
  //  size_t nCols = taosArrayGetSize(pHandle->pDataBlock->pDataBlock);
1716
  pHandle->pageSize = 256 * 1024;  // 256k
18,153✔
1717
  pHandle->numOfPages = 256;
18,153✔
1718

1719
  SArray* pOrderInfoList = taosArrayInit(1, sizeof(SBlockOrderInfo));
18,153✔
1720
  if (pOrderInfoList == NULL) {
18,155!
1721
    return terrno;
×
1722
  }
1723

1724
  int32_t tsOrder = ((SBlockOrderInfo*)taosArrayGet(pHandle->pSortInfo, 0))->order;
18,155✔
1725

1726
  SBlockOrderInfo biTs = {0};
18,152✔
1727
  biTs.order = tsOrder;
18,152✔
1728
  biTs.slotId = 0;
18,152✔
1729
  biTs.nullFirst = (biTs.order == TSDB_ORDER_ASC);
18,152✔
1730
  biTs.compFn = getKeyComparFunc(TSDB_DATA_TYPE_TIMESTAMP, biTs.order);
18,152✔
1731
  void* p = taosArrayPush(pOrderInfoList, &biTs);
18,153✔
1732
  if (p == NULL) {
18,153!
1733
    taosArrayDestroy(pOrderInfoList);
×
1734
    return terrno;
×
1735
  }
1736

1737
  if (pHandle->bSortPk) {
18,153✔
1738
    SBlockOrderInfo biPk = {0};
8,631✔
1739
    biPk.order = pkOrder->order;
8,631✔
1740
    biPk.slotId = 4;
8,631✔
1741
    biPk.nullFirst = (biPk.order == TSDB_ORDER_ASC);
8,631✔
1742
    biPk.compFn = getKeyComparFunc(pkCol.info.type, biPk.order);
8,631✔
1743

1744
    void* px = taosArrayPush(pOrderInfoList, &biPk);
8,629✔
1745
    if (px == NULL) {
8,629!
1746
      taosArrayDestroy(pOrderInfoList);
×
1747
      return terrno;
×
1748
    }
1749
  }
1750

1751
  taosArrayDestroy(pHandle->pSortInfo);
18,151✔
1752
  pHandle->pSortInfo = pOrderInfoList;
18,154✔
1753
  pHandle->cmpParam.pPkOrder = (pHandle->bSortPk) ? taosArrayGet(pHandle->pSortInfo, 1) : NULL;
18,154✔
1754
  return TSDB_CODE_SUCCESS;
18,154✔
1755
}
1756

1757
int32_t tsortSetSortByRowId(SSortHandle* pHandle, int32_t extRowsMemSize) {
18,149✔
1758
  pHandle->extRowBytes =
18,153✔
1759
      blockDataGetRowSize(pHandle->pDataBlock) + taosArrayGetSize(pHandle->pDataBlock->pDataBlock) + sizeof(int32_t);
18,149✔
1760
  pHandle->extRowsMemSize = extRowsMemSize;
18,153✔
1761
  pHandle->aExtRowsOrders = taosArrayDup(pHandle->pSortInfo, NULL);
18,153✔
1762
  if (pHandle->aExtRowsOrders == NULL) {
18,155!
1763
    return terrno;
×
1764
  }
1765

1766
  int32_t code = initRowIdSort(pHandle);
18,155✔
1767
  if (code) {
18,154!
1768
    return code;
×
1769
  }
1770

1771
  if (!osTempSpaceAvailable()) {
18,154!
1772
    qError("create sort mem file failed since %s, tempDir:%s", terrstr(), tsTempDir);
×
1773
    return TSDB_CODE_NO_DISKSPACE;
×
1774
  }
1775

1776
  code = createSortMemFile(pHandle);
18,151✔
1777
  pHandle->bSortByRowId = true;
18,156✔
1778
  return code;
18,156✔
1779
}
1780

1781
typedef struct SBlkMergeSupport {
1782
  int64_t** aTs;
1783
  int32_t*  aRowIdx;
1784
  int32_t   tsOrder;
1785

1786
  SBlockOrderInfo* pPkOrder;
1787
  SSDataBlock**    aBlks;
1788
} SBlkMergeSupport;
1789

1790
static int32_t blockCompareTsFn(const void* pLeft, const void* pRight, void* param) {
122,312,033✔
1791
  int32_t left = *(int32_t*)pLeft;
122,312,033✔
1792
  int32_t right = *(int32_t*)pRight;
122,312,033✔
1793

1794
  SBlkMergeSupport* pSup = (SBlkMergeSupport*)param;
122,312,033✔
1795
  if (pSup->aRowIdx[left] == -1) {
122,312,033✔
1796
    return 1;
9,219,407✔
1797
  } else if (pSup->aRowIdx[right] == -1) {
113,092,626✔
1798
    return -1;
339,868✔
1799
  }
1800

1801
  int64_t leftTs = pSup->aTs[left][pSup->aRowIdx[left]];
112,752,758✔
1802
  int64_t rightTs = pSup->aTs[right][pSup->aRowIdx[right]];
112,752,758✔
1803

1804
  int32_t ret = leftTs > rightTs ? 1 : ((leftTs < rightTs) ? -1 : 0);
112,752,758✔
1805
  if (pSup->tsOrder == TSDB_ORDER_DESC) {
112,752,758✔
1806
    ret = -1 * ret;
37,518,761✔
1807
  }
1808
  return ret;
112,752,758✔
1809
}
1810

1811
static int32_t blockCompareTsPkFn(const void* pLeft, const void* pRight, void* param) {
636,798✔
1812
  int32_t left = *(int32_t*)pLeft;
636,798✔
1813
  int32_t right = *(int32_t*)pRight;
636,798✔
1814

1815
  SBlkMergeSupport* pSup = (SBlkMergeSupport*)param;
636,798✔
1816
  if (pSup->aRowIdx[left] == -1) {
636,798✔
1817
    return 1;
13,737✔
1818
  } else if (pSup->aRowIdx[right] == -1) {
623,061✔
1819
    return -1;
11,349✔
1820
  }
1821

1822
  int64_t leftTs = pSup->aTs[left][pSup->aRowIdx[left]];
611,712✔
1823
  int64_t rightTs = pSup->aTs[right][pSup->aRowIdx[right]];
611,712✔
1824

1825
  int32_t ret = leftTs > rightTs ? 1 : ((leftTs < rightTs) ? -1 : 0);
611,712✔
1826
  if (pSup->tsOrder == TSDB_ORDER_DESC) {
611,712✔
1827
    ret = -1 * ret;
330,369✔
1828
  }
1829
  if (ret == 0 && pSup->pPkOrder) {
611,712!
1830
    ret = tsortComparBlockCell(pSup->aBlks[left], pSup->aBlks[right], pSup->aRowIdx[left], pSup->aRowIdx[right],
501,888✔
1831
                               pSup->pPkOrder);
501,888✔
1832
  }
1833
  return ret;
614,205✔
1834
}
1835

1836
static int32_t appendDataBlockToPageBuf(SSortHandle* pHandle, SSDataBlock* blk, SArray* aPgId) {
177,154✔
1837
  int32_t pageId = -1;
177,154✔
1838
  void*   pPage = getNewBufPage(pHandle->pBuf, &pageId);
177,154✔
1839
  if (pPage == NULL) {
177,269!
1840
    return terrno;
×
1841
  }
1842

1843
  void* px = taosArrayPush(aPgId, &pageId);
177,265✔
1844
  if (px == NULL) {
177,265!
1845
    return terrno;
×
1846
  }
1847

1848
  int32_t size = blockDataGetSize(blk) + sizeof(int32_t) + taosArrayGetSize(blk->pDataBlock) * sizeof(int32_t);
177,265✔
1849
  if (size > getBufPageSize(pHandle->pBuf)) {
177,128!
1850
    qError("sort failed at: %s:%d", __func__, __LINE__);
×
1851
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1852
  }
1853

1854
  int32_t code = blockDataToBuf(pPage, blk);
177,136✔
1855

1856
  setBufPageDirty(pPage, true);
177,273✔
1857
  releaseBufPage(pHandle->pBuf, pPage);
177,269✔
1858

1859
  return code;
177,258✔
1860
}
1861

1862
static int32_t getPageBufIncForRow(SSDataBlock* pSrcBlock, int32_t srcRowIndex, int32_t dstRowIndex) {
36,304,904✔
1863
  int32_t size = 0;
36,304,904✔
1864
  int32_t numCols = taosArrayGetSize(pSrcBlock->pDataBlock);
36,304,904✔
1865

1866
  if (!pSrcBlock->info.hasVarCol) {
33,767,165✔
1867
    size += numCols * ((dstRowIndex & 0x7) == 0 ? 1 : 0);
30,816,896✔
1868
    size += blockDataGetRowSize(pSrcBlock);
30,816,896✔
1869
  } else {
1870
    for (int32_t i = 0; i < numCols; ++i) {
17,693,536✔
1871
      SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pSrcBlock->pDataBlock, i);
14,743,549✔
1872
      if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
14,743,549!
1873
        if ((pColInfoData->varmeta.offset[srcRowIndex] != -1) && (pColInfoData->pData)) {
3,271,859!
1874
          char* p = colDataGetData(pColInfoData, srcRowIndex);
2,883,667!
1875

1876
          size += calcStrBytesByType(pColInfoData->info.type, p);
2,883,667✔
1877
          // if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
1878
          //   size += getJsonValueLen(p);
1879
          // } else {
1880
          //   size += varDataTLen(p);
1881
          // }
1882
        }
1883

1884
        size += sizeof(pColInfoData->varmeta.offset[0]);
3,271,577✔
1885
      } else {
1886
        size += pColInfoData->info.bytes;
11,471,690✔
1887

1888
        if (((dstRowIndex)&0x07) == 0) {
11,471,690✔
1889
          size += 1;  // bitmap
1,655,524✔
1890
        }
1891
      }
1892
    }
1893
  }
1894

1895
  return size;
33,879,388✔
1896
}
1897

1898
static int32_t getPageBufIncForRowIdSort(SSDataBlock* pDstBlock, int32_t srcRowIndex, int32_t dstRowIndex,
3,047,816✔
1899
                                         SColumnInfoData* pPkCol) {
1900
  int32_t size = 0;
3,047,816✔
1901
  int32_t numOfCols = blockDataGetNumOfCols(pDstBlock);
3,047,816✔
1902

1903
  if (pPkCol == NULL) {  // no var column
3,047,881✔
1904
    if (!((numOfCols == 4) && (!pDstBlock->info.hasVarCol))) {
2,736,244!
1905
      qError("sort failed at: %s:%d", __func__, __LINE__);
16!
1906
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1907
    }
1908

1909
    size += numOfCols * ((dstRowIndex & 0x7) == 0 ? 1 : 0);
2,736,228✔
1910
    size += blockDataGetRowSize(pDstBlock);
2,736,228✔
1911
  } else {
1912
    if (numOfCols != 5) {
311,637!
1913
      qError("sort failed at: %s:%d", __func__, __LINE__);
×
1914
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1915
    }
1916

1917
    size += (numOfCols - 1) * (((dstRowIndex & 0x7) == 0) ? 1 : 0);
311,637✔
1918
    for (int32_t i = 0; i < numOfCols - 1; ++i) {
1,557,020✔
1919
      SColumnInfoData* pColInfo = TARRAY_GET_ELEM(pDstBlock->pDataBlock, i);
1,245,383✔
1920
      size += pColInfo->info.bytes;
1,245,383✔
1921
    }
1922

1923
    // handle the pk column, the last column, may be the var char column
1924
    if (IS_VAR_DATA_TYPE(pPkCol->info.type)) {
311,637!
1925
      if ((pPkCol->varmeta.offset[srcRowIndex] != -1) && (pPkCol->pData)) {
105,774!
1926
        char* p = colDataGetData(pPkCol, srcRowIndex);
106,513!
1927
        if (IS_STR_DATA_BLOB(pPkCol->info.type)) {
106,513!
1928
          size += blobDataTLen(p);
×
1929
        } else {
1930
          size += varDataTLen(p);
106,575✔
1931
        }
1932
      }
1933

1934
      size += sizeof(pPkCol->varmeta.offset[0]);
105,774✔
1935
    } else {
1936
      size += pPkCol->info.bytes;
205,863✔
1937
      if (((dstRowIndex)&0x07) == 0) {
205,863✔
1938
        size += 1;  // bitmap
26,665✔
1939
      }
1940
    }
1941
  }
1942

1943
  return size;
3,047,886✔
1944
}
1945

1946
static int32_t getBufIncForNewRow(SSortHandle* pHandle, int32_t dstRowIndex, SSDataBlock* pSrcBlock,
39,031,632✔
1947
                                  int32_t srcRowIndex) {
1948
  int32_t inc = 0;
39,031,632✔
1949

1950
  if (pHandle->bSortByRowId) {
39,031,632✔
1951
    SColumnInfoData* pPkCol = NULL;
3,050,059✔
1952

1953
    // there may be varchar column exists, so we need to get the pk info, and then calculate the row length
1954
    if (pHandle->bSortPk) {
3,050,059✔
1955
      SBlockOrderInfo* extRowsPkOrder = taosArrayGet(pHandle->aExtRowsOrders, 1);
313,692✔
1956
      pPkCol = taosArrayGet(pSrcBlock->pDataBlock, extRowsPkOrder->slotId);
312,238✔
1957
    }
1958

1959
    inc = getPageBufIncForRowIdSort(pHandle->pDataBlock, srcRowIndex, dstRowIndex, pPkCol);
3,047,686✔
1960
  } else {
1961
    inc = getPageBufIncForRow(pSrcBlock, srcRowIndex, dstRowIndex);
35,981,573✔
1962
  }
1963

1964
  return inc;
36,872,749✔
1965
}
1966

1967
static int32_t initMergeSup(SBlkMergeSupport* pSup, SArray* pBlockList, int32_t tsOrder, int32_t tsSlotId,
57,825✔
1968
                            SBlockOrderInfo* pPkOrderInfo) {
1969
  int32_t code = TSDB_CODE_SUCCESS;
57,825✔
1970
  int32_t lino = 0;
57,825✔
1971
  memset(pSup, 0, sizeof(SBlkMergeSupport));
57,825✔
1972

1973
  int32_t numOfBlocks = taosArrayGetSize(pBlockList);
57,825✔
1974

1975
  pSup->aRowIdx = taosMemoryCalloc(numOfBlocks, sizeof(int32_t));
57,835!
1976
  QUERY_CHECK_NULL(pSup->aRowIdx, code, lino, _end, terrno);
57,838!
1977

1978
  pSup->aTs = taosMemoryCalloc(numOfBlocks, sizeof(int64_t*));
57,838!
1979
  QUERY_CHECK_NULL(pSup->aTs, code, lino, _end, terrno);
57,839!
1980

1981
  pSup->tsOrder = tsOrder;
57,839✔
1982
  pSup->aBlks = taosMemoryCalloc(numOfBlocks, sizeof(SSDataBlock*));
57,839!
1983
  QUERY_CHECK_NULL(pSup->aBlks, code, lino, _end, terrno);
57,839!
1984

1985
  for (int32_t i = 0; i < numOfBlocks; ++i) {
504,392✔
1986
    SSDataBlock*     pBlock = taosArrayGetP(pBlockList, i);
446,564✔
1987
    SColumnInfoData* col = taosArrayGet(pBlock->pDataBlock, tsSlotId);
446,630✔
1988
    pSup->aTs[i] = (int64_t*)col->pData;
446,553✔
1989
    pSup->aRowIdx[i] = 0;
446,553✔
1990
    pSup->aBlks[i] = pBlock;
446,553✔
1991
  }
1992

1993
  pSup->pPkOrder = pPkOrderInfo;
57,828✔
1994

1995
_end:
57,828✔
1996
  if (code != TSDB_CODE_SUCCESS) {
57,828!
1997
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1998
  }
1999
  return code;
57,827✔
2000
}
2001

2002
static void cleanupMergeSup(SBlkMergeSupport* pSup) {
57,833✔
2003
  taosMemoryFree(pSup->aRowIdx);
57,833!
2004
  taosMemoryFree(pSup->aTs);
57,837!
2005
  taosMemoryFree(pSup->aBlks);
57,838!
2006
}
57,837✔
2007

2008
static int32_t getTotalRows(SArray* pBlockList) {
57,823✔
2009
  int32_t totalRows = 0;
57,823✔
2010

2011
  for (int32_t i = 0; i < taosArrayGetSize(pBlockList); ++i) {
503,583✔
2012
    SSDataBlock* blk = taosArrayGetP(pBlockList, i);
445,542✔
2013
    totalRows += blk->info.rows;
445,760✔
2014
  }
2015

2016
  return totalRows;
57,813✔
2017
}
2018

2019
static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* aExtSrc) {
57,839✔
2020
  int32_t code = TSDB_CODE_SUCCESS;
57,839✔
2021
  int32_t pageHeaderSize = sizeof(int32_t) + sizeof(int32_t) * blockDataGetNumOfCols(pHandle->pDataBlock);
57,839✔
2022
  int32_t rowCap = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize, pageHeaderSize);
57,838✔
2023
  if (rowCap < 0) {
57,840!
2024
    return terrno;
×
2025
  }
2026

2027
  code = blockDataEnsureCapacity(pHandle->pDataBlock, rowCap);
57,840✔
2028
  if (code) {
57,838!
2029
    return code;
×
2030
  }
2031

2032
  blockDataCleanup(pHandle->pDataBlock);
57,838✔
2033
  SBlkMergeSupport sup = {0};
57,840✔
2034

2035
  SBlockOrderInfo* pOrigBlockTsOrder =
115,676✔
2036
      (!pHandle->bSortByRowId) ? taosArrayGet(pHandle->pSortInfo, 0) : taosArrayGet(pHandle->aExtRowsOrders, 0);
57,840✔
2037
  if (pOrigBlockTsOrder == NULL) {
57,836!
2038
    return terrno;
×
2039
  }
2040

2041
  SBlockOrderInfo* pHandleBlockTsOrder = taosArrayGet(pHandle->pSortInfo, 0);
57,836✔
2042
  if (pHandleBlockTsOrder == NULL) {
57,836!
2043
    return terrno;
×
2044
  }
2045

2046
  SBlockOrderInfo* pOrigBlockPkOrder = NULL;
57,836✔
2047
  if (pHandle->bSortPk) {
57,836✔
2048
    pOrigBlockPkOrder =
13,376✔
2049
        (!pHandle->bSortByRowId) ? taosArrayGet(pHandle->pSortInfo, 1) : taosArrayGet(pHandle->aExtRowsOrders, 1);
6,688✔
2050
    if (pOrigBlockPkOrder == NULL) {
6,688!
2051
      return terrno;
×
2052
    }
2053
  }
2054

2055
  code = initMergeSup(&sup, aBlk, pOrigBlockTsOrder->order, pOrigBlockTsOrder->slotId, pOrigBlockPkOrder);
57,836✔
2056
  if (code) {
57,826!
2057
    return code;
×
2058
  }
2059

2060
  int32_t totalRows = getTotalRows(aBlk);
57,826✔
2061

2062
  SMultiwayMergeTreeInfo* pTree = NULL;
57,812✔
2063
  __merge_compare_fn_t    mergeCompareFn = (!pHandle->bSortPk) ? blockCompareTsFn : blockCompareTsPkFn;
57,812✔
2064

2065
  code = tMergeTreeCreate(&pTree, taosArrayGetSize(aBlk), &sup, mergeCompareFn);
57,812✔
2066
  if (TSDB_CODE_SUCCESS != code) {
57,828!
2067
    cleanupMergeSup(&sup);
×
2068
    return code;
×
2069
  }
2070

2071
  SArray* aPgId = taosArrayInit(8, sizeof(int32_t));
57,828✔
2072
  if (aPgId == NULL) {
57,837!
2073
    goto _error;
×
2074
  }
2075

2076
  int32_t nRows = 0;
57,837✔
2077
  int32_t nMergedRows = 0;
57,837✔
2078
  bool    mergeLimitReached = false;
57,837✔
2079
  size_t  blkPgSz = pageHeaderSize;
57,837✔
2080
  int64_t lastPageBufTs = (pHandleBlockTsOrder->order == TSDB_ORDER_ASC) ? INT64_MAX : INT64_MIN;
57,837✔
2081

2082
  while (nRows < totalRows) {
41,797,713!
2083
    int32_t      minIdx = tMergeTreeGetChosenIndex(pTree);
41,823,271✔
2084
    SSDataBlock* minBlk = taosArrayGetP(aBlk, minIdx);
41,823,271✔
2085
    int32_t      minRow = sup.aRowIdx[minIdx];
38,992,916✔
2086

2087
    int32_t bufInc = getBufIncForNewRow(pHandle, pHandle->pDataBlock->info.rows, minBlk, minRow);
38,992,916✔
2088

2089
    if (blkPgSz <= pHandle->pageSize && blkPgSz + bufInc > pHandle->pageSize) {
36,628,497!
2090
      SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, pHandleBlockTsOrder->slotId);
121,814✔
2091
      lastPageBufTs = ((int64_t*)tsCol->pData)[pHandle->pDataBlock->info.rows - 1];
121,795✔
2092
      code = appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId);
121,795✔
2093
      if (code != TSDB_CODE_SUCCESS) {
121,876!
2094
        goto _error;
×
2095
      }
2096

2097
      nMergedRows += pHandle->pDataBlock->info.rows;
121,876✔
2098
      blockDataCleanup(pHandle->pDataBlock);
121,876✔
2099
      blkPgSz = pageHeaderSize;
121,819✔
2100

2101
      bufInc = getBufIncForNewRow(pHandle, 0, minBlk, minRow);
121,819✔
2102

2103
      if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
121,818✔
2104
        mergeLimitReached = true;
2,454✔
2105
        if ((lastPageBufTs < pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_ASC) ||
2,454!
2106
            (lastPageBufTs > pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_DESC)) {
1,344!
2107
          pHandle->currMergeLimitTs = lastPageBufTs;
2,454✔
2108
        }
2109

2110
        break;
2,454✔
2111
      }
2112
    }
2113

2114
    code = blockDataEnsureCapacity(pHandle->pDataBlock, pHandle->pDataBlock->info.rows + 1);
36,626,047✔
2115
    if (code) {
37,234,524!
2116
      goto _error;
×
2117
    }
2118

2119
    if (pHandle->bSortByRowId) {
37,234,524✔
2120
      code = appendToRowIndexDataBlock(pHandle, minBlk, &minRow);
3,048,441✔
2121
    } else {
2122
      code = appendOneRowToDataBlock(pHandle->pDataBlock, minBlk, &minRow);
34,186,083✔
2123
    }
2124

2125
    if (code) {
37,905,636!
2126
      goto _error;
×
2127
    }
2128

2129
    blkPgSz += bufInc;
37,905,636✔
2130
    if (blkPgSz != blockDataGetSize(pHandle->pDataBlock) + pageHeaderSize) {
37,905,636!
2131
      qError("sort failed at: %s:%d", __func__, __LINE__);
×
2132
      goto _error;
×
2133
    }
2134

2135
    ++nRows;
36,040,777✔
2136

2137
    if (sup.aRowIdx[minIdx] == minBlk->info.rows - 1) {
36,040,777✔
2138
      sup.aRowIdx[minIdx] = -1;
405,782✔
2139
    } else {
2140
      ++sup.aRowIdx[minIdx];
35,634,995✔
2141
    }
2142
    code = tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
36,040,777✔
2143
    if (TSDB_CODE_SUCCESS != code) {
41,739,876!
2144
      goto _error;
×
2145
    }
2146
  }
2147

2148
  if (pHandle->pDataBlock->info.rows > 0) {
×
2149
    if (!mergeLimitReached) {
55,380!
2150
      SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, pHandleBlockTsOrder->slotId);
55,380✔
2151
      lastPageBufTs = ((int64_t*)tsCol->pData)[pHandle->pDataBlock->info.rows - 1];
55,378✔
2152
      code = appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId);
55,378✔
2153
      if (code != TSDB_CODE_SUCCESS) {
55,382!
2154
        goto _error;
×
2155
      }
2156
      nMergedRows += pHandle->pDataBlock->info.rows;
55,382✔
2157
      if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
55,382✔
2158
        mergeLimitReached = true;
1,965✔
2159
        if ((lastPageBufTs < pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_ASC) ||
1,965!
2160
            (lastPageBufTs > pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_DESC)) {
1,268!
2161
          pHandle->currMergeLimitTs = lastPageBufTs;
1,965✔
2162
        }
2163
      }
2164
    }
2165
    blockDataCleanup(pHandle->pDataBlock);
55,382✔
2166
  }
2167

2168
  SSDataBlock* pMemSrcBlk = NULL;
×
2169
  code = createOneDataBlock(pHandle->pDataBlock, false, &pMemSrcBlk);
×
2170
  if (code) goto _error;
57,833!
2171

2172
  code = doAddNewExternalMemSource(pHandle->pBuf, aExtSrc, pMemSrcBlk, &pHandle->sourceId, aPgId);
57,833✔
2173
  if (code != TSDB_CODE_SUCCESS) {
57,834!
2174
    blockDataDestroy(pMemSrcBlk);
×
2175
    goto _error;
×
2176
  }
2177

2178
  cleanupMergeSup(&sup);
57,834✔
2179
  tMergeTreeDestroy(&pTree);
57,836✔
2180

2181
  return code;
57,838✔
2182

2183
_error:
×
2184
  tMergeTreeDestroy(&pTree);
×
2185
  cleanupMergeSup(&sup);
×
2186
  if (aPgId) taosArrayDestroy(aPgId);
×
2187
  return code;
×
2188
}
2189

2190
static int32_t getRowsBlockWithinMergeLimit(const SSortHandle* pHandle, SSHashObj* mTableNumRows, SSDataBlock* pOrigBlk,
28,749✔
2191
                                            bool* pExtractedBlock, bool* pSkipBlock, SSDataBlock** pRes) {
2192
  int64_t nRows = 0;
28,749✔
2193
  int64_t prevRows = 0;
28,749✔
2194
  int32_t code = 0;
28,749✔
2195

2196
  *pRes = NULL;
28,749✔
2197

2198
  void* pNum = tSimpleHashGet(mTableNumRows, &pOrigBlk->info.id.uid, sizeof(pOrigBlk->info.id.uid));
28,749✔
2199
  if (pNum == NULL) {
28,748✔
2200
    prevRows = 0;
28,658✔
2201
    nRows = pOrigBlk->info.rows;
28,658✔
2202
    code = tSimpleHashPut(mTableNumRows, &pOrigBlk->info.id.uid, sizeof(pOrigBlk->info.id.uid), &nRows, sizeof(nRows));
28,658✔
2203
    if (code) {
28,658!
2204
      return code;
×
2205
    }
2206
  } else {
2207
    prevRows = *(int64_t*)pNum;
90✔
2208
    *(int64_t*)pNum = *(int64_t*)pNum + pOrigBlk->info.rows;
90✔
2209
    nRows = *(int64_t*)pNum;
90✔
2210
  }
2211

2212
  int64_t keepRows = pOrigBlk->info.rows;
28,748✔
2213
  if (nRows >= pHandle->mergeLimit) {
28,748✔
2214
    if (pHandle->mergeLimitReachedFn) {
26,049!
2215
      pHandle->mergeLimitReachedFn(pOrigBlk->info.id.uid, pHandle->mergeLimitReachedParam);
26,050✔
2216
    }
2217
    keepRows = pHandle->mergeLimit > prevRows ? (pHandle->mergeLimit - prevRows) : 0;
26,052!
2218
  }
2219

2220
  if (keepRows == 0) {
28,751!
2221
    *pSkipBlock = true;
×
2222
    *pRes = pOrigBlk;
×
2223
  }
2224

2225
  *pSkipBlock = false;
28,751✔
2226
  SSDataBlock* pBlock = NULL;
28,751✔
2227
  if (keepRows != pOrigBlk->info.rows) {
28,751✔
2228
    code = blockDataExtractBlock(pOrigBlk, 0, keepRows, &pBlock);
25,972✔
2229
    if (code) {
25,969!
2230
      return code;
×
2231
    }
2232

2233
    *pExtractedBlock = true;
25,969✔
2234
  } else {
2235
    *pExtractedBlock = false;
2,779✔
2236
    pBlock = pOrigBlk;
2,779✔
2237
  }
2238

2239
  *pRes = pBlock;
28,748✔
2240
  return code;
28,748✔
2241
}
2242

2243
static void freeHelp(void* param) {
447,203✔
2244
  SSDataBlock** ptr = param;
447,203✔
2245
  if (*ptr != NULL) {
447,203!
2246
    blockDataDestroy(*ptr);
447,204✔
2247
  }
2248
}
447,294✔
2249

2250
static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
66,942✔
2251
  int32_t          szSort = 0;
66,942✔
2252
  int32_t          code = 0;
66,942✔
2253
  int32_t          lino = 0;
66,942✔
2254
  size_t           nSrc = taosArrayGetSize(pHandle->pOrderedSource);
66,942✔
2255
  SArray*          aExtSrc = NULL;
66,947✔
2256
  SArray*          aBlkSort = NULL;
66,947✔
2257
  SSHashObj*       mTableNumRows = NULL;
66,947✔
2258
  SSHashObj*       mUidBlk = NULL;
66,947✔
2259
  SBlockOrderInfo* pOrigTsOrder = NULL;
66,947✔
2260

2261
  aExtSrc = taosArrayInit(nSrc, POINTER_BYTES);
66,947✔
2262
  QUERY_CHECK_NULL(aExtSrc, code, lino, _err, terrno);
66,958!
2263

2264
  mTableNumRows = tSimpleHashInit(8192, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT));
66,958✔
2265
  QUERY_CHECK_NULL(mTableNumRows, code, lino, _err, terrno);
66,956!
2266

2267
  aBlkSort = taosArrayInit(8, POINTER_BYTES);
66,956✔
2268
  QUERY_CHECK_NULL(aBlkSort, code, lino, _err, terrno);
66,960!
2269

2270
  mUidBlk = tSimpleHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT));
66,960✔
2271
  QUERY_CHECK_NULL(mUidBlk, code, lino, _err, terrno);
66,957!
2272

2273
  size_t maxBufSize = (pHandle->bSortByRowId) ? pHandle->extRowsMemSize : (pHandle->numOfPages * pHandle->pageSize);
66,957✔
2274
  code = createPageBuf(pHandle);
66,957✔
2275
  QUERY_CHECK_CODE(code, lino, _err);
66,960!
2276

2277
  SSortSource* pSrc = taosArrayGetP(pHandle->pOrderedSource, 0);
66,960✔
2278
  QUERY_CHECK_NULL(pSrc, code, lino, _err, terrno);
66,960✔
2279

2280
  pOrigTsOrder =
133,917✔
2281
      (!pHandle->bSortByRowId) ? taosArrayGet(pHandle->pSortInfo, 0) : taosArrayGet(pHandle->aExtRowsOrders, 0);
66,959✔
2282
  QUERY_CHECK_NULL(pOrigTsOrder, code, lino, _err, terrno);
66,958!
2283

2284
  pHandle->currMergeLimitTs = (pOrigTsOrder->order == TSDB_ORDER_ASC) ? INT64_MAX : INT64_MIN;
66,958✔
2285

2286
  while (1) {
449,312✔
2287
    bool         bExtractedBlock = false;
516,270✔
2288
    bool         bSkipBlock = false;
516,270✔
2289
    SSDataBlock* pBlk = NULL;
516,270✔
2290

2291
    code = pHandle->fetchfp(pSrc->param, &pBlk);
516,270✔
2292
    QUERY_CHECK_CODE(code, lino, _err);
516,235!
2293

2294
    if (pBlk != NULL && pHandle->mergeLimit > 0) {
516,235✔
2295
      SSDataBlock* p = NULL;
28,750✔
2296
      code = getRowsBlockWithinMergeLimit(pHandle, mTableNumRows, pBlk, &bExtractedBlock, &bSkipBlock, &p);
28,750✔
2297
      if (bSkipBlock || code != 0) {
28,746!
2298
        continue;
×
2299
      }
2300

2301
      pBlk = p;
28,746✔
2302
    }
2303

2304
    if (pBlk != NULL) {
516,231✔
2305
      SColumnInfoData* tsCol = taosArrayGet(pBlk->pDataBlock, pOrigTsOrder->slotId);
449,290✔
2306
      QUERY_CHECK_NULL(tsCol, code, lino, _err, terrno);
449,214!
2307

2308
      int64_t firstRowTs = *(int64_t*)tsCol->pData;
449,214✔
2309
      if ((pOrigTsOrder->order == TSDB_ORDER_ASC && firstRowTs > pHandle->currMergeLimitTs) ||
449,214!
2310
          (pOrigTsOrder->order == TSDB_ORDER_DESC && firstRowTs < pHandle->currMergeLimitTs)) {
449,215!
2311
        if (bExtractedBlock) {
×
2312
          blockDataDestroy(pBlk);
×
2313
        }
2314
        continue;
×
2315
      }
2316
    }
2317

2318
    if (pBlk != NULL) {
516,156✔
2319
      szSort += blockDataGetSize(pBlk);
449,213✔
2320
      void* ppBlk = tSimpleHashGet(mUidBlk, &pBlk->info.id.uid, sizeof(pBlk->info.id.uid));
449,120✔
2321
      if (ppBlk != NULL) {
449,227✔
2322
        SSDataBlock* tBlk = *(SSDataBlock**)(ppBlk);
1,979✔
2323
        code = blockDataMerge(tBlk, pBlk);
1,979✔
2324
        QUERY_CHECK_CODE(code, lino, _err);
1,979!
2325

2326
        if (bExtractedBlock) {
1,979✔
2327
          blockDataDestroy(pBlk);
1✔
2328
        }
2329
      } else {
2330
        SSDataBlock* tBlk = NULL;
447,248✔
2331
        if (bExtractedBlock) {
447,248✔
2332
          tBlk = pBlk;
25,955✔
2333
        } else {
2334
          code = createOneDataBlock(pBlk, true, &tBlk);
421,293✔
2335
          QUERY_CHECK_CODE(code, lino, _err);
421,256!
2336
        }
2337

2338
        code = tSimpleHashPut(mUidBlk, &pBlk->info.id.uid, sizeof(pBlk->info.id.uid), &tBlk, POINTER_BYTES);
447,211✔
2339
        if (code != TSDB_CODE_SUCCESS) {
447,253!
2340
          blockDataDestroy(tBlk);
×
2341
        }
2342
        QUERY_CHECK_CODE(code, lino, _err);
447,255!
2343

2344
        void* px = taosArrayPush(aBlkSort, &tBlk);
447,252✔
2345
        if (px == NULL) {
447,252!
2346
          blockDataDestroy(tBlk);
×
2347
        }
2348
        QUERY_CHECK_NULL(px, code, lino, _err, terrno);
447,250!
2349
      }
2350
    }
2351

2352
    if ((pBlk != NULL && szSort > maxBufSize) || (pBlk == NULL && szSort > 0)) {
516,172✔
2353
      tSimpleHashClear(mUidBlk);
57,841✔
2354

2355
      int64_t p = taosGetTimestampUs();
57,839✔
2356
      if (pHandle->bSortByRowId) {
57,839✔
2357
        code = tsortOpenRegion(pHandle);
14,967✔
2358
        QUERY_CHECK_CODE(code, lino, _err);
14,967!
2359
      }
2360

2361
      code = sortBlocksToExtSource(pHandle, aBlkSort, aExtSrc);
57,839✔
2362
      QUERY_CHECK_CODE(code, lino, _err);
57,837!
2363

2364
      if (pHandle->bSortByRowId) {
57,837✔
2365
        code = tsortCloseRegion(pHandle);  // ignore this error code
14,966✔
2366
      }
2367

2368
      int64_t el = taosGetTimestampUs() - p;
57,833✔
2369
      pHandle->sortElapsed += el;
57,833✔
2370
      taosArrayClearEx(aBlkSort, freeHelp);
57,833✔
2371

2372
      szSort = 0;
57,838✔
2373
      qDebug("%s source %zu created", pHandle->idStr, taosArrayGetSize(aExtSrc));
57,838✔
2374
    }
2375

2376
    if (pBlk == NULL) {
516,177✔
2377
      break;
66,962✔
2378
    }
2379

2380
    if (tsortIsClosed(pHandle)) {
449,215!
2381
      break;
×
2382
    }
2383
  }
2384

2385
  tsortClearOrderedSource(pHandle->pOrderedSource, NULL, NULL);
66,962✔
2386
  if (!tsortIsClosed(pHandle)) {
66,962!
2387
    void* px = taosArrayAddAll(pHandle->pOrderedSource, aExtSrc);
66,962✔
2388
    QUERY_CHECK_NULL(px, code, lino, _err, terrno);
66,962!
2389
  }
2390

2391
  if (pHandle->bSortByRowId) {
66,962✔
2392
    code = tsortFinalizeRegions(pHandle);
18,156✔
2393
  }
2394

2395
  pHandle->type = SORT_SINGLESOURCE_SORT;
66,962✔
2396

2397
_err:
66,962✔
2398
  if (code) {
66,962!
2399
    qError("%s %s failed at line %d since %s", pHandle->idStr, __func__, lino, tstrerror(code));
×
2400
  }
2401

2402
  if (aExtSrc) {
66,962!
2403
    taosArrayDestroy(aExtSrc);
66,962✔
2404
  }
2405
  if (aBlkSort) {
66,960!
2406
    taosArrayDestroyEx(aBlkSort, freeHelp);
66,960✔
2407
  }
2408
  if (mTableNumRows) {
66,962!
2409
    tSimpleHashCleanup(mTableNumRows);
66,962✔
2410
  }
2411
  if (mUidBlk) {
66,962!
2412
    tSimpleHashCleanup(mUidBlk);
66,962✔
2413
  }
2414
  return code;
66,962✔
2415
}
2416

2417
static void freeSortSource(void* p) {
126,891✔
2418
  SSortSource** pSource = (SSortSource**)p;
126,891✔
2419
  if (NULL == pSource || NULL == *pSource) {
126,891!
2420
    return;
×
2421
  }
2422

2423
  if ((*pSource)->pageIdList) {
126,897!
2424
    taosArrayDestroy((*pSource)->pageIdList);
×
2425
  }
2426

2427
  if (!(*pSource)->onlyRef) {
126,897✔
2428
    if ((*pSource)->param) {
90!
2429
      taosMemoryFree((*pSource)->param);
90!
2430
    }
2431
    if ((*pSource)->src.pBlock) {
90!
2432
      blockDataDestroy((*pSource)->src.pBlock);
×
2433
    }
2434
  }
2435

2436
  taosMemoryFreeClear(*pSource);
126,897!
2437
}
2438

2439
static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) {
126,896✔
2440
  int32_t       code = 0;
126,896✔
2441
  int32_t       lino = 0;
126,896✔
2442
  size_t        sortBufSize = pHandle->numOfPages * pHandle->pageSize;
126,896✔
2443
  SSortSource** p = taosArrayGet(pHandle->pOrderedSource, 0);
126,896✔
2444
  if (p == NULL) {
126,894!
2445
    return terrno;
×
2446
  }
2447

2448
  SSortSource* pSource = *p;
126,894✔
2449
  size_t       origSourceCount = taosArrayGetSize(pHandle->pOrderedSource);
126,894✔
2450

2451
  while (1) {
304,527✔
2452
    SSDataBlock* pBlock = NULL;
431,423✔
2453
    code = pHandle->fetchfp(pSource->param, &pBlock);
431,423✔
2454
    QUERY_CHECK_CODE(code, lino, _end);
431,299!
2455

2456
    if (pBlock == NULL) {
431,299✔
2457
      break;
126,896✔
2458
    }
2459

2460
    if (pHandle->pDataBlock == NULL) {
304,403✔
2461
      uint32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
108,396✔
2462
      pHandle->pageSize = getProperSortPageSize(blockDataGetRowSize(pBlock), numOfCols);
108,398✔
2463

2464
      // todo, number of pages are set according to the total available sort buffer
2465
      pHandle->numOfPages = 1024;
108,397✔
2466
      sortBufSize = pHandle->numOfPages * pHandle->pageSize;
108,397✔
2467
      code = createOneDataBlock(pBlock, false, &pHandle->pDataBlock);
108,397✔
2468
      QUERY_CHECK_CODE(code, lino, _end);
108,394!
2469
    }
2470

2471
    if (pHandle->beforeFp != NULL) {
304,401!
2472
      pHandle->beforeFp(pBlock, pHandle->param);
304,437✔
2473
    }
2474

2475
    code = blockDataMerge(pHandle->pDataBlock, pBlock);
304,405✔
2476
    QUERY_CHECK_CODE(code, lino, _end);
304,603!
2477

2478
    size_t size = blockDataGetSize(pHandle->pDataBlock);
304,603✔
2479
    if (size > sortBufSize) {
304,535✔
2480
      // Perform the in-memory sort and then flush data in the buffer into disk.
2481
      int64_t st = taosGetTimestampUs();
93✔
2482
      code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
93✔
2483
      QUERY_CHECK_CODE(code, lino, _end);
93!
2484

2485
      pHandle->sortElapsed += (taosGetTimestampUs() - st);
93✔
2486

2487
      if (pHandle->pqMaxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->pqMaxRows);
93!
2488
      code = doAddToBuf(pHandle->pDataBlock, pHandle);
93✔
2489
      QUERY_CHECK_CODE(code, lino, _end);
93!
2490
    }
2491
  }
2492

2493
  if (pHandle->pDataBlock != NULL && pHandle->pDataBlock->info.rows > 0) {
126,896!
2494
    size_t size = blockDataGetSize(pHandle->pDataBlock);
108,395✔
2495

2496
    // Perform the in-memory sort and then flush data in the buffer into disk.
2497
    int64_t st = taosGetTimestampUs();
108,397✔
2498
    code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
108,397✔
2499
    QUERY_CHECK_CODE(code, lino, _end);
108,390!
2500

2501
    if (pHandle->pqMaxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->pqMaxRows);
108,390✔
2502
    pHandle->sortElapsed += (taosGetTimestampUs() - st);
108,392✔
2503

2504
    // All sorted data can fit in memory, external memory sort is not needed. Return to directly
2505
    if (size <= sortBufSize && pHandle->pBuf == NULL) {
108,392!
2506
      pHandle->cmpParam.numOfSources = 1;
108,324✔
2507
      pHandle->inMemSort = true;
108,324✔
2508

2509
      pHandle->loops = 1;
108,324✔
2510
      pHandle->tupleHandle.rowIndex = -1;
108,324✔
2511
      pHandle->tupleHandle.pBlock = pHandle->pDataBlock;
108,324✔
2512
    } else {
2513
      code = doAddToBuf(pHandle->pDataBlock, pHandle);
68✔
2514
    }
2515
  }
2516

2517
_end:
18,501✔
2518
  if (code != TSDB_CODE_SUCCESS) {
126,897!
2519
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2520
  }
2521
  taosArrayRemoveBatch(pHandle->pOrderedSource, 0, origSourceCount, freeSortSource);
126,897✔
2522
  return code;
126,892✔
2523
}
2524

2525
static int32_t createInitialSources(SSortHandle* pHandle) {
249,193✔
2526
  int32_t code = 0;
249,193✔
2527

2528
  if (pHandle->type == SORT_SINGLESOURCE_SORT) {
249,193✔
2529
    code = createBlocksQuickSortInitialSources(pHandle);
126,900✔
2530
  } else if (pHandle->type == SORT_BLOCK_TS_MERGE) {
122,293✔
2531
    code = createBlocksMergeSortInitialSources(pHandle);
66,949✔
2532
  }
2533

2534
  qDebug("%s %zu sources created", pHandle->idStr, taosArrayGetSize(pHandle->pOrderedSource));
249,192✔
2535
  return code;
249,218✔
2536
}
2537

2538
static int32_t tsortOpenForBufMergeSort(SSortHandle* pHandle) {
249,196✔
2539
  int32_t code = createInitialSources(pHandle);
249,196✔
2540
  if (code != TSDB_CODE_SUCCESS) {
249,217!
2541
    return code;
×
2542
  }
2543

2544
  // do internal sort
2545
  code = doInternalMergeSort(pHandle);
249,217✔
2546
  if (code != TSDB_CODE_SUCCESS) {
249,213!
2547
    return code;
×
2548
  }
2549

2550
  int32_t numOfSources = taosArrayGetSize(pHandle->pOrderedSource);
249,213✔
2551
  if (pHandle->pBuf != NULL) {
249,213✔
2552
    if (numOfSources > getNumOfInMemBufPages(pHandle->pBuf)) {
67,033!
2553
      qError("sort failed at: %s:%d", __func__, __LINE__);
×
2554
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2555
    }
2556
  }
2557

2558
  if (numOfSources == 0) {
249,214✔
2559
    return 0;
135,964✔
2560
  }
2561

2562
  code = sortComparInit(&pHandle->cmpParam, pHandle->pOrderedSource, 0, numOfSources - 1, pHandle);
113,250✔
2563
  if (code != TSDB_CODE_SUCCESS) {
113,253!
2564
    return code;
×
2565
  }
2566

2567
  code = tMergeTreeCreate(&pHandle->pMergeTree, pHandle->cmpParam.numOfSources, &pHandle->cmpParam, pHandle->comparFn);
113,253✔
2568
  return code;
113,253✔
2569
}
2570

2571
void tsortClose(SSortHandle* pHandle) { (void)atomic_val_compare_exchange_8(&pHandle->closed, 0, 1); }
414,555✔
2572

2573
bool tsortIsClosed(SSortHandle* pHandle) { return atomic_val_compare_exchange_8(&pHandle->closed, 1, 2); }
213,483,825✔
2574

2575
void tsortSetClosed(SSortHandle* pHandle) { atomic_store_8(&pHandle->closed, 2); }
×
2576

2577
void tsortSetMergeLimit(SSortHandle* pHandle, int64_t mergeLimit) { pHandle->mergeLimit = mergeLimit; }
213,092✔
2578

2579
void tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fetchFp, void (*fp)(SSDataBlock*, void*),
414,457✔
2580
                            void* param) {
2581
  pHandle->fetchfp = fetchFp;
414,457✔
2582
  pHandle->beforeFp = fp;
414,457✔
2583
  pHandle->param = param;
414,457✔
2584
}
414,457✔
2585

2586
void tsortSetComparFp(SSortHandle* pHandle, _sort_merge_compar_fn_t fp) {
433,588✔
2587
  if (pHandle) {
433,588!
2588
    pHandle->comparFn = fp;
433,641✔
2589
  }
2590
}
433,588✔
2591

2592
void tsortSetCompareGroupId(SSortHandle* pHandle, bool compareGroupId) {
52,887✔
2593
  if (pHandle) {
52,887!
2594
    pHandle->cmpParam.cmpGroupId = compareGroupId;
52,887✔
2595
  }
2596
}
52,887✔
2597

2598
static int32_t tsortBufMergeSortNextTuple(SSortHandle* pHandle, STupleHandle** pTupleHandle) {
212,562,053✔
2599
  *pTupleHandle = NULL;
212,562,053✔
2600
  int32_t code = 0;
212,562,053✔
2601

2602
  if (tsortIsClosed(pHandle)) {
212,562,053!
2603
    return code;
×
2604
  }
2605

2606
  if (pHandle->cmpParam.numOfSources == pHandle->numOfCompletedSources) {
216,284,153✔
2607
    return code;
217,534✔
2608
  }
2609

2610
  // All the data are hold in the buffer, no external sort is invoked.
2611
  if (pHandle->inMemSort) {
216,066,619✔
2612
    pHandle->tupleHandle.rowIndex += 1;
40,734,613✔
2613
    if (pHandle->tupleHandle.rowIndex == pHandle->pDataBlock->info.rows) {
40,734,613✔
2614
      pHandle->numOfCompletedSources = 1;
108,300✔
2615
      return code;
108,300✔
2616
    }
2617

2618
    *pTupleHandle = &pHandle->tupleHandle;
40,626,313✔
2619
    return code;
40,626,313✔
2620
  }
2621

2622
  int32_t      index = tMergeTreeGetChosenIndex(pHandle->pMergeTree);
175,332,006✔
2623
  SSortSource* pSource = pHandle->cmpParam.pSources[index];
175,332,006✔
2624

2625
  if (pHandle->needAdjust) {
175,332,006✔
2626
    code = adjustMergeTreeForNextTuple(pSource, pHandle->pMergeTree, pHandle, &pHandle->numOfCompletedSources);
174,970,021✔
2627
    if (code != TSDB_CODE_SUCCESS) {
172,386,577!
2628
      return code;
×
2629
    }
2630
  }
2631

2632
  // all sources are completed.
2633
  if (pHandle->cmpParam.numOfSources == pHandle->numOfCompletedSources) {
172,748,562✔
2634
    return code;
109,082✔
2635
  }
2636

2637
  // Get the adjusted value after the loser tree is updated.
2638
  index = tMergeTreeGetChosenIndex(pHandle->pMergeTree);
172,639,480✔
2639
  pSource = pHandle->cmpParam.pSources[index];
172,639,480✔
2640

2641
  if (pSource->src.pBlock == NULL) {
172,639,480!
2642
    qError("sort failed at: %s:%d", __func__, __LINE__);
×
2643
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2644
  }
2645

2646
  pHandle->tupleHandle.rowIndex = pSource->src.rowIndex;
172,639,480✔
2647
  pHandle->tupleHandle.pBlock = pSource->src.pBlock;
172,639,480✔
2648

2649
  pHandle->needAdjust = true;
172,639,480✔
2650
  pSource->src.rowIndex += 1;
172,639,480✔
2651

2652
  *pTupleHandle = &pHandle->tupleHandle;
172,639,480✔
2653
  return code;
172,639,480✔
2654
}
2655

2656
static bool tsortIsForceUsePQSort(SSortHandle* pHandle) { return pHandle->forceUsePQSort == true; }
146,041✔
2657

2658
void tsortSetForceUsePQSort(SSortHandle* pHandle) { pHandle->forceUsePQSort = true; }
2,468✔
2659

2660
static bool tsortIsPQSortApplicable(SSortHandle* pHandle) {
268,340✔
2661
  if (pHandle->type != SORT_SINGLESOURCE_SORT) return false;
268,340✔
2662
  if (tsortIsForceUsePQSort(pHandle)) return true;
146,034!
2663
  uint64_t maxRowsFitInMemory = pHandle->pqSortBufSize / (pHandle->pqMaxTupleLength + sizeof(char*));
146,045✔
2664
  return maxRowsFitInMemory > pHandle->pqMaxRows;
146,045✔
2665
}
2666

2667
static bool tsortPQCompFn(void* a, void* b, void* param) {
9,562,200✔
2668
  SSortHandle* pHandle = param;
9,562,200✔
2669
  int32_t      res = pHandle->comparFn(a, b, param);
9,562,200✔
2670
  if (res < 0) return 1;
9,559,210✔
2671
  return 0;
1,534,576✔
2672
}
2673

2674
static bool tsortPQComFnReverse(void* a, void* b, void* param) {
734,658✔
2675
  SSortHandle* pHandle = param;
734,658✔
2676
  int32_t      res = pHandle->comparFn(a, b, param);
734,658✔
2677
  if (res > 0) return 1;
734,643✔
2678
  return 0;
315,315✔
2679
}
2680

2681
static int32_t tupleComparFn(const void* pLeft, const void* pRight, void* param) {
10,296,787✔
2682
  TupleDesc* pLeftDesc = (TupleDesc*)pLeft;
10,296,787✔
2683
  TupleDesc* pRightDesc = (TupleDesc*)pRight;
10,296,787✔
2684

2685
  SSortHandle* pHandle = (SSortHandle*)param;
10,296,787✔
2686
  SArray*      orderInfo = (SArray*)pHandle->pSortInfo;
10,296,787✔
2687
  uint32_t     colNum = blockDataGetNumOfCols(pHandle->pDataBlock);
10,296,787✔
2688
  for (int32_t i = 0; i < orderInfo->size; ++i) {
12,562,419✔
2689
    SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(orderInfo, i);
11,750,488✔
2690
    void *           lData = NULL, *rData = NULL;
11,750,488✔
2691

2692
    int32_t ret1 = tupleDescGetField(pLeftDesc, pOrder->slotId, colNum, &lData);
11,750,488✔
2693
    int32_t ret2 = tupleDescGetField(pRightDesc, pOrder->slotId, colNum, &rData);
11,748,677✔
2694
    if (ret1) {
11,747,226!
2695
      return ret1;
9,481,259✔
2696
    }
2697

2698
    if (ret2) {
11,747,226!
2699
      return ret2;
×
2700
    }
2701

2702
    if ((!lData) && (!rData)) {
11,747,226✔
2703
      continue;
2,265,340✔
2704
    }
2705

2706
    if (!lData) return pOrder->nullFirst ? -1 : 1;
10,837,365✔
2707
    if (!rData) return pOrder->nullFirst ? 1 : -1;
10,096,954✔
2708

2709
    SColumnInfoData* p = (SColumnInfoData*)taosArrayGet(pHandle->pDataBlock->pDataBlock, pOrder->slotId);
9,971,212✔
2710
    if (p == NULL) {
9,970,765!
2711
      return terrno;
×
2712
    }
2713

2714
    __compar_fn_t fn = getKeyComparFunc(p->info.type, pOrder->order);
9,970,765✔
2715

2716
    int32_t ret = fn(lData, rData);
9,970,839✔
2717
    if (ret == 0) {
9,970,585✔
2718
      continue;
1,355,479✔
2719
    } else {
2720
      return ret;
8,615,106✔
2721
    }
2722
  }
2723

2724
  return 0;
811,931✔
2725
}
2726

2727
static int32_t tsortOpenForPQSort(SSortHandle* pHandle) {
19,144✔
2728
  pHandle->pBoundedQueue = createBoundedQueue(pHandle->pqMaxRows, tsortPQCompFn, destroyTuple, pHandle);
19,144✔
2729
  if (NULL == pHandle->pBoundedQueue) {
19,149!
2730
    return TSDB_CODE_OUT_OF_MEMORY;
×
2731
  }
2732

2733
  tsortSetComparFp(pHandle, tupleComparFn);
19,149✔
2734

2735
  SSortSource** pSource = taosArrayGet(pHandle->pOrderedSource, 0);
19,149✔
2736
  if (pSource == NULL) {
19,145!
2737
    return terrno;
×
2738
  }
2739

2740
  SSortSource*      source = *pSource;
19,145✔
2741
  uint32_t          tupleLen = 0;
19,145✔
2742
  PriorityQueueNode pqNode;
2743
  pHandle->pDataBlock = NULL;
19,145✔
2744

2745
  while (1) {
3,004,098✔
2746
    // fetch data
2747
    SSDataBlock* pBlock = NULL;
3,023,243✔
2748
    TAOS_CHECK_RETURN(pHandle->fetchfp(source->param, &pBlock));
3,023,243!
2749
    if (NULL == pBlock) {
3,023,265✔
2750
      break;
19,152✔
2751
    }
2752

2753
    if (pHandle->beforeFp != NULL) {
3,004,113!
2754
      pHandle->beforeFp(pBlock, pHandle->param);
3,004,113✔
2755
    }
2756

2757
    if (pHandle->pDataBlock == NULL) {
3,004,113✔
2758
      int32_t code = createOneDataBlock(pBlock, false, &pHandle->pDataBlock);
12,463✔
2759
      if (code) {
12,464!
2760
        return code;
×
2761
      }
2762
    }
2763

2764
    size_t colNum = blockDataGetNumOfCols(pBlock);
3,004,114✔
2765

2766
    if (tupleLen == 0) {
3,004,114✔
2767
      for (size_t colIdx = 0; colIdx < colNum; ++colIdx) {
50,581✔
2768
        SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, colIdx);
38,118✔
2769
        if (pCol == NULL) {
38,118✔
2770
          return terrno;
1✔
2771
        }
2772

2773
        tupleLen += pCol->info.bytes;
38,117✔
2774
        if (IS_VAR_DATA_TYPE(pCol->info.type)) {
38,117!
2775
          if (IS_STR_DATA_BLOB(pCol->info.type)) {
8,240!
2776
            tupleLen += sizeof(BlobDataLenT);
×
2777
          } else {
2778
            tupleLen += sizeof(VarDataLenT);
8,240✔
2779
          }
2780
        }
2781
      }
2782
    }
2783

2784
    ReferencedTuple refTuple = {.desc.data = (char*)pBlock, .desc.type = ReferencedTupleType, .rowIndex = 0};
3,004,113✔
2785
    for (size_t rowIdx = 0; rowIdx < pBlock->info.rows; ++rowIdx) {
10,553,606✔
2786
      refTuple.rowIndex = rowIdx;
7,549,508✔
2787
      pqNode.data = &refTuple;
7,549,508✔
2788
      PriorityQueueNode* pPushedNode = taosBQPush(pHandle->pBoundedQueue, &pqNode);
7,549,508✔
2789
      if (!pPushedNode) {
7,547,831✔
2790
        // do nothing if push failed
2791
      } else {
2792
        pPushedNode->data = NULL;
558,493✔
2793
        int32_t code = createAllocatedTuple(pBlock, colNum, tupleLen, rowIdx, (TupleDesc**)&pPushedNode->data);
558,493✔
2794
        if (code) {
560,155!
2795
          return code;
×
2796
        }
2797
      }
2798
    }
2799
  }
2800

2801
  return TSDB_CODE_SUCCESS;
19,152✔
2802
}
2803

2804
static int32_t tsortPQSortNextTuple(SSortHandle* pHandle, STupleHandle** pTupleHandle) {
119,079✔
2805
  int32_t code = 0;
119,079✔
2806

2807
  *pTupleHandle = NULL;
119,079✔
2808
  if (pHandle->pDataBlock == NULL) {  // when no input stream datablock
119,079!
2809
    return code;
×
2810
  }
2811

2812
  blockDataCleanup(pHandle->pDataBlock);
119,079✔
2813
  code = blockDataEnsureCapacity(pHandle->pDataBlock, 1);
119,077✔
2814
  if (code) {
119,077!
2815
    return code;
×
2816
  }
2817

2818
  // abandon the top tuple if queue size bigger than max size
2819
  if (taosBQSize(pHandle->pBoundedQueue) == taosBQMaxSize(pHandle->pBoundedQueue) + 1) {
119,077✔
2820
    taosBQPop(pHandle->pBoundedQueue);
10,952✔
2821
  }
2822
  if (pHandle->tmpRowIdx == 0) {
119,076✔
2823
    // sort the results
2824
    taosBQSetFn(pHandle->pBoundedQueue, tsortPQComFnReverse);
12,463✔
2825
    taosBQBuildHeap(pHandle->pBoundedQueue);
12,463✔
2826
  }
2827
  if (taosBQSize(pHandle->pBoundedQueue) > 0) {
119,076✔
2828
    uint32_t           colNum = blockDataGetNumOfCols(pHandle->pDataBlock);
98,897✔
2829
    PriorityQueueNode* node = taosBQTop(pHandle->pBoundedQueue);
98,897✔
2830
    char*              pTuple = ((TupleDesc*)node->data)->data;
98,898✔
2831

2832
    for (uint32_t i = 0; i < colNum; ++i) {
754,313✔
2833
      void* pData = tupleGetField(pTuple, i, colNum);
655,414✔
2834

2835
      SColumnInfoData* p = NULL;
655,414✔
2836
      TAOS_CHECK_RETURN(bdGetColumnInfoData(pHandle->pDataBlock, i, &p));
655,414!
2837

2838
      if (!pData) {
655,417✔
2839
        colDataSetNULL(p, 0);
21,906✔
2840
      } else {
2841
        TAOS_CHECK_RETURN(colDataSetVal(p, 0, pData, false));
633,511✔
2842
      }
2843
    }
2844
    pHandle->pDataBlock->info.rows++;
98,899✔
2845
    pHandle->tmpRowIdx++;
98,899✔
2846
    taosBQPop(pHandle->pBoundedQueue);
98,899✔
2847
  }
2848

2849
  if (pHandle->pDataBlock->info.rows == 0) {
119,079✔
2850
    return code;
20,179✔
2851
  }
2852

2853
  pHandle->tupleHandle.pBlock = pHandle->pDataBlock;
98,900✔
2854
  *pTupleHandle = &pHandle->tupleHandle;
98,900✔
2855
  return code;
98,900✔
2856
}
2857

2858
static int32_t tsortSingleTableMergeNextTuple(SSortHandle* pHandle, STupleHandle** pTupleHandle) {
13,636,934✔
2859
  *pTupleHandle = NULL;
13,636,934✔
2860
  int32_t code = 0;
13,636,934✔
2861

2862
  if (1 == pHandle->numOfCompletedSources) {
13,636,934✔
2863
    return code;
70,697✔
2864
  }
2865

2866
  if (pHandle->tupleHandle.pBlock && pHandle->tupleHandle.rowIndex + 1 < pHandle->tupleHandle.pBlock->info.rows) {
13,566,237✔
2867
    pHandle->tupleHandle.rowIndex++;
13,365,663✔
2868
  } else {
2869
    if (pHandle->tupleHandle.rowIndex == -1) {
200,574!
2870
      return code;
145,251✔
2871
    }
2872

2873
    SSortSource** pSource = taosArrayGet(pHandle->pOrderedSource, 0);
200,574✔
2874
    if (pSource == NULL) {
204,435!
2875
      return terrno;
×
2876
    }
2877

2878
    SSortSource* source = *pSource;
204,435✔
2879
    SSDataBlock* pBlock = NULL;
204,435✔
2880
    TAOS_CHECK_RETURN(pHandle->fetchfp(source->param, &pBlock));
204,435!
2881

2882
    if (!pBlock || pBlock->info.rows == 0) {
204,485!
2883
      setCurrentSourceDone(source, pHandle);
145,249✔
2884
      pHandle->tupleHandle.pBlock = NULL;
145,251✔
2885
      return code;
145,251✔
2886
    }
2887

2888
    pHandle->tupleHandle.pBlock = pBlock;
59,236✔
2889
    pHandle->tupleHandle.rowIndex = 0;
59,236✔
2890
  }
2891

2892
  *pTupleHandle = &pHandle->tupleHandle;
13,424,899✔
2893
  return code;
13,424,899✔
2894
}
2895

2896
int32_t tsortOpen(SSortHandle* pHandle) {
268,341✔
2897
  int32_t code = 0;
268,341✔
2898
  if (pHandle->opened) {
268,341!
2899
    return code;
×
2900
  }
2901

2902
  if (pHandle == NULL || pHandle->fetchfp == NULL || pHandle->comparFn == NULL) {
268,341!
2903
    return TSDB_CODE_INVALID_PARA;
×
2904
  }
2905

2906
  pHandle->opened = true;
268,362✔
2907
  if (tsortIsPQSortApplicable(pHandle)) {
268,362✔
2908
    code = tsortOpenForPQSort(pHandle);
19,151✔
2909
  } else {
2910
    code = tsortOpenForBufMergeSort(pHandle);
249,206✔
2911
  }
2912

2913
  return code;
268,367✔
2914
}
2915

2916
int32_t tsortNextTuple(SSortHandle* pHandle, STupleHandle** pTupleHandle) {
226,730,841✔
2917
  int32_t code = 0;
226,730,841✔
2918

2919
  if (pHandle->singleTableMerge) {
226,730,841✔
2920
    code = tsortSingleTableMergeNextTuple(pHandle, pTupleHandle);
13,639,283✔
2921
  } else if (pHandle->pBoundedQueue) {
213,091,558✔
2922
    code = tsortPQSortNextTuple(pHandle, pTupleHandle);
119,079✔
2923
  } else {
2924
    code = tsortBufMergeSortNextTuple(pHandle, pTupleHandle);
212,972,479✔
2925
  }
2926

2927
  return code;
226,358,803✔
2928
}
2929

2930
bool tsortIsNullVal(STupleHandle* pVHandle, int32_t colIndex) {
645,161,352✔
2931
  SColumnInfoData* pColInfoSrc = taosArrayGet(pVHandle->pBlock->pDataBlock, colIndex);
645,161,352✔
2932
  if (pColInfoSrc == NULL || pColInfoSrc->pData == NULL) {
638,262,228!
2933
    return true;
4,508,460✔
2934
  }
2935

2936
  return colDataIsNull_s(pColInfoSrc, pVHandle->rowIndex);
1,267,507,536✔
2937
}
2938

2939
void tsortGetValue(STupleHandle* pVHandle, int32_t colIndex, void** pVal) {
572,532,172✔
2940
  *pVal = NULL;
572,532,172✔
2941
  SColumnInfoData* pColInfo = TARRAY_GET_ELEM(pVHandle->pBlock->pDataBlock, colIndex);
572,532,172✔
2942
  if (pColInfo->pData != NULL) {
572,532,172!
2943
    *pVal = colDataGetData(pColInfo, pVHandle->rowIndex);
574,684,164!
2944
  }
2945
}
572,532,172✔
2946

2947
void tsortGetColumnInfo(STupleHandle* pVHandle, int32_t colIndex, SColumnInfoData** pColInfo) {
2,851,228✔
2948
  *pColInfo = TARRAY_GET_ELEM(pVHandle->pBlock->pDataBlock, colIndex);
2,851,228✔
2949
}
2,851,228✔
2950

2951
size_t   tsortGetColNum(STupleHandle* pVHandle) { return blockDataGetNumOfCols(pVHandle->pBlock); }
143,253,166✔
2952
uint64_t tsortGetGroupId(STupleHandle* pVHandle) { return pVHandle->pBlock->info.id.groupId; }
43,970,690✔
2953
void     tsortGetBlockInfo(STupleHandle* pVHandle, SDataBlockInfo* pBlockInfo) { *pBlockInfo = pVHandle->pBlock->info; }
229,591,862✔
2954

2955
SSortExecInfo tsortGetSortExecInfo(SSortHandle* pHandle) {
214,551✔
2956
  SSortExecInfo info = {0};
214,551✔
2957

2958
  if (pHandle == NULL) {
214,551!
2959
    info.sortMethod = SORT_QSORT_T;  // by default
×
2960
    info.sortBuffer = 2 * 1048576;   // 2mb by default
×
2961
  } else {
2962
    info.sortBuffer = pHandle->pageSize * pHandle->numOfPages;
214,551✔
2963
    info.sortMethod = pHandle->inMemSort ? SORT_QSORT_T : SORT_SPILLED_MERGE_SORT_T;
214,551✔
2964
    info.loops = pHandle->loops;
214,551✔
2965

2966
    if (pHandle->pBuf != NULL) {
214,551✔
2967
      SDiskbasedBufStatis st = getDBufStatis(pHandle->pBuf);
68,064✔
2968
      info.writeBytes = st.flushBytes;
68,064✔
2969
      info.readBytes = st.loadBytes;
68,064✔
2970
    }
2971
  }
2972

2973
  return info;
214,551✔
2974
}
2975

2976
int32_t tsortCompAndBuildKeys(const SArray* pSortCols, char* keyBuf, int32_t* keyLen, const STupleHandle* pTuple) {
19,485,046✔
2977
  int32_t ret;
2978
  if (0 == compKeys(pSortCols, keyBuf, *keyLen, pTuple->pBlock, pTuple->rowIndex)) {
19,485,046✔
2979
    ret = 0;
19,410,531✔
2980
  } else {
2981
    *keyLen = buildKeys(keyBuf, pSortCols, pTuple->pBlock, pTuple->rowIndex);
11,520✔
2982
    ret = 1;
12,955✔
2983
  }
2984
  return ret;
19,423,486✔
2985
}
2986

2987
void tsortSetMergeLimitReachedFp(SSortHandle* pHandle, void (*mergeLimitReachedCb)(uint64_t tableUid, void* param),
213,043✔
2988
                                 void*        param) {
2989
  pHandle->mergeLimitReachedFn = mergeLimitReachedCb;
213,043✔
2990
  pHandle->mergeLimitReachedParam = param;
213,043✔
2991
}
213,043✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc