• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3531

19 Nov 2024 10:42AM UTC coverage: 60.213% (-0.006%) from 60.219%
#3531

push

travis-ci

web-flow
Merge pull request #28777 from taosdata/fix/3.0/TD-32366

fix:TD-32366/stmt add geometry datatype check

118529 of 252344 branches covered (46.97%)

Branch coverage included in aggregate %.

7 of 48 new or added lines in 3 files covered. (14.58%)

2282 existing lines in 115 files now uncovered.

199096 of 275161 relevant lines covered (72.36%)

6067577.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.54
/source/libs/executor/src/tsort.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "query.h"
17
#include "tcommon.h"
18

19
#include "tcompare.h"
20
#include "tdatablock.h"
21
#include "tdef.h"
22
#include "theap.h"
23
#include "tlosertree.h"
24
#include "tpagedbuf.h"
25
#include "tsort.h"
26
#include "tutil.h"
27
#include "tsimplehash.h"
28
#include "executil.h"
29

30
#define AllocatedTupleType 0
31
#define ReferencedTupleType 1 // tuple references to one row in pDataBlock
32

33
struct STupleHandle {
34
  SSDataBlock* pBlock;
35
  int32_t      rowIndex;
36
};
37

38
typedef struct SSortMemFileRegion {
39
  int64_t fileOffset;
40
  int32_t regionSize;
41

42
  int32_t bufRegOffset;
43
  int32_t bufLen;
44
  char*   buf;
45
} SSortMemFileRegion;
46

47
typedef struct SSortMemFile {
48
  char*   writeBuf;
49
  int32_t writeBufSize;
50
  int64_t writeFileOffset;
51

52
  int32_t currRegionId;
53
  int32_t currRegionOffset;
54
  bool    bRegionDirty;
55

56
  SArray* aFileRegions;
57
  int32_t cacheSize;
58
  int32_t blockSize;
59

60
  FILE* pTdFile;
61
  char  memFilePath[PATH_MAX];
62
} SSortMemFile;
63

64
struct SSortHandle {
65
  int32_t        type;
66
  int32_t        pageSize;
67
  int32_t        numOfPages;
68
  SDiskbasedBuf* pBuf;
69
  SArray*        pSortInfo;
70
  SArray*        pOrderedSource;
71
  int32_t        loops;
72
  uint64_t       sortElapsed;
73
  int64_t        startTs;
74
  uint64_t       totalElapsed;
75

76
  uint64_t      pqMaxRows;
77
  uint32_t      pqMaxTupleLength;
78
  uint32_t      pqSortBufSize;
79
  bool          forceUsePQSort;
80
  BoundedQueue* pBoundedQueue;
81
  uint32_t      tmpRowIdx;
82
  int64_t       mergeLimit;
83
  int64_t       currMergeLimitTs;
84

85
  int32_t           sourceId;
86
  SSDataBlock*      pDataBlock;
87
  SMsortComparParam cmpParam;
88
  int32_t           numOfCompletedSources;
89
  bool              opened;
90
  int8_t            closed;
91
  const char*       idStr;
92
  bool              inMemSort;
93
  bool              needAdjust;
94
  STupleHandle      tupleHandle;
95
  void*             param;
96
  void (*beforeFp)(SSDataBlock* pBlock, void* param);
97

98
  _sort_fetch_block_fn_t  fetchfp;
99
  _sort_merge_compar_fn_t comparFn;
100
  SMultiwayMergeTreeInfo* pMergeTree;
101

102
  bool singleTableMerge;
103

104
  bool (*abortCheckFn)(void* param);
105
  void* abortCheckParam;
106

107
  bool          bSortByRowId;
108
  SSortMemFile* pExtRowsMemFile;
109
  int32_t       extRowBytes;
110
  int32_t       extRowsPageSize;
111
  int32_t       extRowsMemSize;
112
  int32_t       srcTsSlotId;
113
  SArray*       aExtRowsOrders;
114
  bool          bSortPk;
115
  void (*mergeLimitReachedFn)(uint64_t tableUid, void* param);
116
  void* mergeLimitReachedParam;
117
};
118

119
static void destroySortMemFile(SSortHandle* pHandle);
120
static int32_t getRowBufFromExtMemFile(SSortHandle* pHandle, int32_t regionId, int32_t tupleOffset, int32_t rowLen,
121
                                       char** ppRow, bool* pFreeRow);
122
void tsortSetSingleTableMerge(SSortHandle* pHandle) {
162,186✔
123
  pHandle->singleTableMerge = true;
162,186✔
124
}
162,186✔
125

126
void tsortSetAbortCheckFn(SSortHandle *pHandle, bool (*checkFn)(void *), void* param) {
236,727✔
127
  pHandle->abortCheckFn = checkFn;
236,727✔
128
  pHandle->abortCheckParam = param;
236,727✔
129
}
236,727✔
130

131
static int32_t msortComparFn(const void* pLeft, const void* pRight, void* param);
132

133
// | offset[0] | offset[1] |....| nullbitmap | data |...|
134
static void* createTuple(uint32_t columnNum, uint32_t tupleLen) {
866,087✔
135
  uint32_t totalLen = sizeof(uint32_t) * columnNum + BitmapLen(columnNum) + tupleLen;
866,087✔
136
  return taosMemoryCalloc(1, totalLen);
866,087✔
137
}
138

139
static void destoryAllocatedTuple(void* t) { taosMemoryFree(t); }
865,063✔
140

141
#define tupleOffset(tuple, colIdx) ((uint32_t*)(tuple + sizeof(uint32_t) * colIdx))
142
#define tupleSetOffset(tuple, colIdx, offset) (*tupleOffset(tuple, colIdx) = offset)
143
#define tupleSetNull(tuple, colIdx, colNum) colDataSetNull_f((char*)tuple + sizeof(uint32_t) * colNum, colIdx)
144
#define tupleColIsNull(tuple, colIdx, colNum) colDataIsNull_f((char*)tuple + sizeof(uint32_t) * colNum, colIdx)
145
#define tupleGetDataStartOffset(colNum) (sizeof(uint32_t) * colNum + BitmapLen(colNum))
146
#define tupleSetData(tuple, offset, data, length) memcpy(tuple + offset, data, length)
147

148
/**
149
 * @param t the tuple pointer addr, if realloced, *t is changed to the new addr
150
 * @param offset copy data into pTuple start from offset
151
 * @param colIndex the columnIndex, for setting null bitmap
152
 * @return the next offset to add field
153
 * */
154
static inline int32_t tupleAddField(char** t, uint32_t colNum, uint32_t offset, uint32_t colIdx, void* data,
5,019,618✔
155
                                   size_t length, bool isNull, uint32_t tupleLen, uint32_t* pOffset) {
156
  int32_t code = TSDB_CODE_SUCCESS;
5,019,618✔
157
  int32_t lino = 0;
5,019,618✔
158
  tupleSetOffset(*t, colIdx, offset);
5,019,618✔
159

160
  if (isNull) {
5,019,618✔
161
    tupleSetNull(*t, colIdx, colNum);
211,346✔
162
  } else {
163
    if (offset + length > tupleLen + tupleGetDataStartOffset(colNum)) {
4,808,272!
164
      void* px = taosMemoryRealloc(*t, offset + length);
×
165
      QUERY_CHECK_NULL(px, code, lino, _end, terrno);
×
166

167
      *t = px;
×
168
    }
169
    tupleSetData(*t, offset, data, length);
4,808,272✔
170
  }
171

172
  (*pOffset) = offset + length;
5,019,618✔
173

174
_end:
5,019,618✔
175
  if (code != TSDB_CODE_SUCCESS) {
5,019,618!
176
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
177
  }
178
  return code;
5,021,181✔
179
}
180

181
static void* tupleGetField(char* t, uint32_t colIdx, uint32_t colNum) {
39,168,595✔
182
  if (tupleColIsNull(t, colIdx, colNum)) {
39,168,595✔
183
    return NULL;
18,449✔
184
  }
185

186
  return t + *tupleOffset(t, colIdx);
39,150,146✔
187
}
188

189
int32_t tsortGetSortedDataBlock(const SSortHandle* pSortHandle, SSDataBlock** pBlock) {
974,509✔
190
  *pBlock = NULL;
974,509✔
191
  if (pSortHandle->pDataBlock == NULL) {
974,509✔
192
    return TSDB_CODE_SUCCESS;
36,026✔
193
  }
194
  return createOneDataBlock(pSortHandle->pDataBlock, false, pBlock);
938,483✔
195
}
196

197
typedef struct TupleDesc {
198
  uint8_t type;
199
  char*   data; // if type is AllocatedTuple, then points to the created tuple, otherwise points to the DataBlock
200
} TupleDesc;
201

202
typedef struct ReferencedTuple {
203
  TupleDesc desc;
204
  size_t    rowIndex;
205
} ReferencedTuple;
206

207
static int32_t createAllocatedTuple(SSDataBlock* pBlock, size_t colNum, uint32_t tupleLen, size_t rowIdx, TupleDesc** pDesc) {
864,481✔
208
  int32_t    code = TSDB_CODE_SUCCESS;
864,481✔
209
  TupleDesc* t = taosMemoryCalloc(1, sizeof(TupleDesc));
864,481✔
210
  if (t == NULL) {
866,202!
211
    return terrno;
×
212
  }
213

214
  void* pTuple = createTuple(colNum, tupleLen);
866,202✔
215
  if (!pTuple) {
866,115✔
216
    taosMemoryFree(t);
7✔
217
    return terrno;
×
218
  }
219

220
  size_t   colLen = 0;
866,108✔
221
  uint32_t offset = tupleGetDataStartOffset(colNum);
866,108✔
222
  for (size_t colIdx = 0; colIdx < colNum; ++colIdx) {
5,886,780✔
223
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, colIdx);
5,021,625✔
224
    if (pCol == NULL) {
5,021,373!
225
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
226
      return terrno;
×
227
    }
228

229
    if (colDataIsNull_s(pCol, rowIdx)) {
10,043,060✔
230
      code = tupleAddField((char**)&pTuple, colNum, offset, colIdx, 0, 0, true, tupleLen, &offset);
211,344✔
231
    } else {
232
      colLen = colDataGetRowLength(pCol, rowIdx);
4,810,186✔
233
      code =
234
          tupleAddField((char**)&pTuple, colNum, offset, colIdx, colDataGetData(pCol, rowIdx), colLen, false, tupleLen, &offset);
4,808,621✔
235
    }
236
    if (code != TSDB_CODE_SUCCESS) {
5,020,672!
237
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
238
      return code;
×
239
    }
240
  }
241

242
  t->type = AllocatedTupleType;
865,155✔
243
  t->data = pTuple;
865,155✔
244

245
  *pDesc = t;
865,155✔
246
  return code;
865,155✔
247
}
248

249
int32_t tupleDescGetField(const TupleDesc* pDesc, int32_t colIdx, uint32_t colNum, void** pResult) {
49,969,745✔
250
  *pResult = NULL;
49,969,745✔
251

252
  if (pDesc->type == ReferencedTupleType) {
49,969,745✔
253
    ReferencedTuple* pRefTuple = (ReferencedTuple*)pDesc;
12,427,508✔
254
    SColumnInfoData* pCol = taosArrayGet(((SSDataBlock*)pDesc->data)->pDataBlock, colIdx);
12,427,508✔
255
    if (pCol == NULL) {
12,427,106!
256
      return terrno;
×
257
    }
258

259
    if (colDataIsNull_s(pCol, pRefTuple->rowIndex)) {
24,854,348✔
260
      return TSDB_CODE_SUCCESS;
1,014✔
261
    }
262

263
    *pResult = colDataGetData(pCol, pRefTuple->rowIndex);
12,426,160!
264
  } else {
265
    *pResult = tupleGetField(pDesc->data, colIdx, colNum);
37,542,237✔
266
  }
267

268
  return 0;
49,976,836✔
269
}
270

271
void destroyTuple(void* t) {
865,063✔
272
  TupleDesc* pDesc = t;
865,063✔
273
  if (pDesc != NULL && pDesc->type == AllocatedTupleType) {
865,063!
274
    destoryAllocatedTuple(pDesc->data);
865,063✔
275
    taosMemoryFree(pDesc);
866,216✔
276
  }
277
}
865,976✔
278

279
/**
280
 *
281
 * @param type
282
 * @return
283
 */
284
int32_t tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t pageSize, int32_t numOfPages,
533,816✔
285
                              SSDataBlock* pBlock, const char* idstr, uint64_t pqMaxRows, uint32_t pqMaxTupleLength,
286
                              uint32_t pqSortBufSize, SSortHandle** pHandle) {
287
  int32_t code = 0;
533,816✔
288
  int32_t lino = 0;
533,816✔
289

290
  QRY_PARAM_CHECK(pHandle);
533,816!
291
  SSortHandle* pSortHandle = taosMemoryCalloc(1, sizeof(SSortHandle));
533,816✔
292
  QUERY_CHECK_NULL(pSortHandle, code, lino, _err, terrno);
534,137!
293

294
  pSortHandle->type = type;
534,137✔
295
  pSortHandle->pageSize = pageSize;
534,137✔
296
  pSortHandle->numOfPages = numOfPages;
534,137✔
297
  pSortHandle->pSortInfo = taosArrayDup(pSortInfo, NULL);
534,137✔
298
  QUERY_CHECK_NULL(pSortHandle->pSortInfo, code, lino, _err, terrno);
534,296!
299

300
  pSortHandle->loops = 0;
534,296✔
301
  pSortHandle->pqMaxTupleLength = pqMaxTupleLength;
534,296✔
302
  if (pqMaxRows != 0) {
534,296✔
303
    pSortHandle->pqSortBufSize = pqSortBufSize;
207,003✔
304
    pSortHandle->pqMaxRows = pqMaxRows;
207,003✔
305
  }
306

307
  pSortHandle->forceUsePQSort = false;
534,296✔
308
  if (pBlock != NULL) {
534,296✔
309
    code = createOneDataBlock(pBlock, false, &pSortHandle->pDataBlock);
327,210✔
310
    QUERY_CHECK_CODE(code, lino, _err);
327,207!
311
  }
312

313
  pSortHandle->mergeLimit = -1;
534,293✔
314

315
  pSortHandle->pOrderedSource = taosArrayInit(4, POINTER_BYTES);
534,293✔
316
  QUERY_CHECK_NULL(pSortHandle->pOrderedSource, code, lino, _err, terrno);
534,459!
317

318
  pSortHandle->cmpParam.orderInfo = pSortInfo;
534,459✔
319
  pSortHandle->cmpParam.cmpGroupId = false;
534,459✔
320
  pSortHandle->cmpParam.sortType = type;
534,459✔
321

322
  if (type == SORT_BLOCK_TS_MERGE) {
534,459✔
323
    SBlockOrderInfo* pTsOrder = TARRAY_GET_ELEM(pSortInfo, 0);
237,301✔
324
    pSortHandle->cmpParam.tsSlotId = pTsOrder->slotId;
237,301✔
325
    pSortHandle->cmpParam.tsOrder = pTsOrder->order;
237,301✔
326
    pSortHandle->cmpParam.cmpTsFn = pTsOrder->compFn;
237,301✔
327
    if (taosArrayGetSize(pSortHandle->pSortInfo) == 2) {
237,301✔
328
      pSortHandle->cmpParam.pPkOrder = taosArrayGet(pSortHandle->pSortInfo, 1);
33,330✔
329
      pSortHandle->bSortPk = true;
33,351✔
330
    } else {
331
      pSortHandle->cmpParam.pPkOrder = NULL;
203,909✔
332
      pSortHandle->bSortPk = false;
203,909✔
333
    }
334
  }
335
  tsortSetComparFp(pSortHandle, msortComparFn);
534,418✔
336

337
  if (idstr != NULL) {
534,205✔
338
    pSortHandle->idStr = taosStrdup(idstr);
534,199✔
339
    QUERY_CHECK_NULL(pSortHandle->idStr, code, lino, _err, terrno);
534,288!
340
  }
341

342
  *pHandle = pSortHandle;
534,344✔
343
  return code;
534,344✔
344

345
_err:
×
346
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
347
  if (pSortHandle) {
×
348
    tsortDestroySortHandle(pSortHandle);
×
349
  }
350
  return code;
×
351
}
352

353
static int32_t sortComparCleanup(SMsortComparParam* cmpParam) {
×
354
  // NOTICE: pSource may be, if it is SORT_MULTISOURCE_MERGE
355
  for (int32_t i = 0; i < cmpParam->numOfSources; ++i) {
×
356
    SSortSource* pSource = cmpParam->pSources[i];
×
357
    blockDataDestroy(pSource->src.pBlock);
×
358
    if (pSource->pageIdList) {
×
359
      taosArrayDestroy(pSource->pageIdList);
×
360
    }
361
    taosMemoryFreeClear(pSource);
×
362
    cmpParam->pSources[i] = NULL;
×
363
  }
364

365
  cmpParam->numOfSources = 0;
×
366
  return TSDB_CODE_SUCCESS;
×
367
}
368

369
void tsortClearOrderedSource(SArray* pOrderedSource, int64_t *fetchUs, int64_t *fetchNum) {
609,305✔
370
  for (size_t i = 0; i < taosArrayGetSize(pOrderedSource); i++) {
1,175,268✔
371
    SSortSource** pSource = taosArrayGet(pOrderedSource, i);
565,930✔
372
    if (NULL == *pSource) {
565,927!
373
      continue;
×
374
    }
375

376
    if (fetchUs) {
565,927✔
377
      *fetchUs += (*pSource)->fetchUs;
491,051✔
378
      *fetchNum += (*pSource)->fetchNum;
491,051✔
379
    }
380
    
381
    // release pageIdList
382
    if ((*pSource)->pageIdList) {
565,927✔
383
      taosArrayDestroy((*pSource)->pageIdList);
65,123✔
384
      (*pSource)->pageIdList = NULL;
65,129✔
385
    }
386
    if ((*pSource)->param && !(*pSource)->onlyRef) {
565,933✔
387
      taosMemoryFree((*pSource)->param);
237,272✔
388
      (*pSource)->param = NULL;
237,297✔
389
    }
390

391
    if (!(*pSource)->onlyRef && (*pSource)->src.pBlock) {
565,958✔
392
      blockDataDestroy((*pSource)->src.pBlock);
424✔
393
      (*pSource)->src.pBlock = NULL;
424✔
394
    }
395

396
    taosMemoryFreeClear(*pSource);
565,958!
397
  }
398

399
  taosArrayClear(pOrderedSource);
609,278✔
400
}
609,358✔
401

402
void tsortDestroySortHandle(SSortHandle* pSortHandle) {
700,217✔
403
  if (pSortHandle == NULL) {
700,217✔
404
    return;
165,918✔
405
  }
406

407
  tsortClose(pSortHandle);
534,299✔
408
  if (pSortHandle->pMergeTree != NULL) {
534,524✔
409
    tMergeTreeDestroy(&pSortHandle->pMergeTree);
155,035✔
410
  }
411

412
  destroyDiskbasedBuf(pSortHandle->pBuf);
534,523✔
413
  taosMemoryFreeClear(pSortHandle->idStr);
534,434!
414
  blockDataDestroy(pSortHandle->pDataBlock);
534,401✔
415

416
  if (pSortHandle->pBoundedQueue) destroyBoundedQueue(pSortHandle->pBoundedQueue);
534,503✔
417

418
  int64_t fetchUs = 0, fetchNum = 0;
534,504✔
419
  tsortClearOrderedSource(pSortHandle->pOrderedSource, &fetchUs, &fetchNum);
534,504✔
420
  qDebug("all source fetch time: %" PRId64 "us num:%" PRId64 " %s", fetchUs, fetchNum, pSortHandle->idStr);
534,446✔
421
  
422
  taosArrayDestroy(pSortHandle->pOrderedSource);
534,446✔
423
  if (pSortHandle->pExtRowsMemFile != NULL) {
534,466✔
424
    destroySortMemFile(pSortHandle);
23,334✔
425
  }
426

427
  taosArrayDestroy(pSortHandle->pSortInfo);  
534,469✔
428
  taosArrayDestroy(pSortHandle->aExtRowsOrders);
534,454✔
429
  pSortHandle->aExtRowsOrders = NULL;
534,496✔
430
  taosMemoryFreeClear(pSortHandle);
534,496✔
431
}
432

433
int32_t tsortAddSource(SSortHandle* pSortHandle, void* pSource) {
678,928✔
434
  void* p = taosArrayPush(pSortHandle->pOrderedSource, &pSource);
678,928✔
435
  return (p != NULL)? TSDB_CODE_SUCCESS:terrno;
679,282!
436
}
437

438
static int32_t doAddNewExternalMemSource(SDiskbasedBuf* pBuf, SArray* pAllSources, SSDataBlock* pBlock,
65,113✔
439
                                         int32_t* sourceId, SArray* pPageIdList) {
440
  int32_t      code = 0;
65,113✔
441
  int32_t      lino = 0;
65,113✔
442
  SSortSource* pSource = taosMemoryCalloc(1, sizeof(SSortSource));
65,113✔
443
  QUERY_CHECK_NULL(pSource, code, lino, _err, terrno);
65,129!
444

445
  pSource->src.pBlock = pBlock;
65,129✔
446
  pSource->pageIdList = pPageIdList;
65,129✔
447

448
  SSortSource** p = taosArrayPush(pAllSources, &pSource);
65,127✔
449
  QUERY_CHECK_NULL(p, code, lino, _err, terrno);
65,127!
450
  pSource = NULL;
65,127✔
451

452
  (*sourceId) += 1;
65,127✔
453

454
  int32_t rowSize = blockDataGetSerialRowSize((*p)->src.pBlock);
65,127✔
455

456
  // The value of numOfRows must be greater than 0, which is guaranteed by the previous memory allocation
457
  int32_t numOfRows =
65,110✔
458
      (getBufPageSize(pBuf) - blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock))) / rowSize;
65,112✔
459
  QUERY_CHECK_CONDITION((numOfRows > 0), code, lino, _err, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
65,110!
460

461
  code = blockDataEnsureCapacity((*p)->src.pBlock, numOfRows);
65,110✔
462
  QUERY_CHECK_CODE(code, lino, _err);
65,127!
463

464
  return code;
65,127✔
465

466
_err:
×
467
  if (pSource) taosMemoryFree(pSource);
×
468
  qError("sort failed at %s:%d since %s", __func__, lino, tstrerror(code));
×
469
  return code;
×
470
}
471

472
static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
155✔
473
  int32_t start = 0;
155✔
474

475
  if (pHandle->pBuf == NULL) {
155✔
476
    if (!osTempSpaceAvailable()) {
68!
477
      terrno = TSDB_CODE_NO_DISKSPACE;
×
478
      qError("Add to buf failed since %s, tempDir:%s", terrstr(), tsTempDir);
×
479
      return terrno;
×
480
    }
481

482
    int32_t code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize,
68✔
483
                                      "sortExternalBuf", tsTempDir);
484
    if (code != TSDB_CODE_SUCCESS) {
68!
485
      return code;
×
486
    }
487
    dBufSetPrintInfo(pHandle->pBuf);
68✔
488
  }
489

490
  SArray* pPageIdList = taosArrayInit(4, sizeof(int32_t));
155✔
491
  if (pPageIdList == NULL) {
155!
492
    return terrno;
×
493
  }
494

495
  while (start < pDataBlock->info.rows) {
142,267✔
496
    int32_t stop = 0;
142,112✔
497

498
    int32_t code = blockDataSplitRows(pDataBlock, pDataBlock->info.hasVarCol, start, &stop, pHandle->pageSize);
142,112✔
499
    if (code) {
142,112!
500
      taosArrayDestroy(pPageIdList);
×
501
      return code;
×
502
    }
503

504
    SSDataBlock* p = NULL;
142,112✔
505
    code = blockDataExtractBlock(pDataBlock, start, stop - start + 1, &p);
142,112✔
506
    if (code) {
142,112!
507
      taosArrayDestroy(pPageIdList);
×
508
      return code;
×
509
    }
510

511
    int32_t pageId = -1;
142,112✔
512
    void*   pPage = getNewBufPage(pHandle->pBuf, &pageId);
142,112✔
513
    if (pPage == NULL) {
142,112!
514
      taosArrayDestroy(pPageIdList);
×
515
      blockDataDestroy(p);
×
516
      return terrno;
×
517
    }
518

519
    void* px = taosArrayPush(pPageIdList, &pageId);
142,112✔
520
    if (px == NULL) {
142,112!
521
      taosArrayDestroy(pPageIdList);
×
522
      blockDataDestroy(p);
×
523
      return terrno;
×
524
    }
525

526
    int32_t size = blockDataGetSize(p) + sizeof(int32_t) + taosArrayGetSize(p->pDataBlock) * sizeof(int32_t);
142,112✔
527
    if (size > getBufPageSize(pHandle->pBuf)) {
142,112!
528
      qError("sort failed at: %s:%d", __func__, __LINE__);
×
529
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
530
    }
531

532
    code = blockDataToBuf(pPage, p);
142,112✔
533
    if (code) {
142,112!
534
      return code;
×
535
    }
536

537
    setBufPageDirty(pPage, true);
142,112✔
538
    releaseBufPage(pHandle->pBuf, pPage);
142,112✔
539

540
    blockDataDestroy(p);
142,112✔
541
    start = stop + 1;
142,112✔
542
  }
543

544
  blockDataCleanup(pDataBlock);
155✔
545

546
  SSDataBlock* pBlock = NULL;
155✔
547
  int32_t code = createOneDataBlock(pDataBlock, false, &pBlock);
155✔
548
  if (code) {
155!
549
    return code;
×
550
  }
551

552
  code = doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId, pPageIdList);
155✔
553
  if (code) {
155!
554
    blockDataDestroy(pBlock);
×
555
    taosArrayDestroy(pPageIdList);
×
556
  }
557
  return code;
155✔
558
}
559

560
static void setCurrentSourceDone(SSortSource* pSource, SSortHandle* pHandle) {
258,258✔
561
  pSource->src.rowIndex = -1;
258,258✔
562
  ++pHandle->numOfCompletedSources;
258,258✔
563
}
258,258✔
564

565
static int32_t sortComparInit(SMsortComparParam* pParam, SArray* pSources, int32_t startIndex, int32_t endIndex,
155,005✔
566
                              SSortHandle* pHandle) {
567
  pParam->pSources = taosArrayGet(pSources, startIndex);
155,005✔
568
  if (pParam->pSources == NULL) {
155,012!
569
    return terrno;
×
570
  }
571

572
  pParam->numOfSources = (endIndex - startIndex + 1);
155,012✔
573
  int32_t code = 0;
155,012✔
574

575
  // multi-pass internal merge sort is required
576
  if (pHandle->pBuf == NULL) {
155,012✔
577
    if (!osTempSpaceAvailable()) {
89,992!
578
      code = terrno = TSDB_CODE_NO_DISKSPACE;
×
579
      qError("Sort compare init failed since %s, tempDir:%s, idStr:%s", terrstr(), tsTempDir, pHandle->idStr);
×
580
      return code;
×
581
    }
582

583
    code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize,
89,991✔
584
                              "sortComparInit", tsTempDir);
585
    if (code != TSDB_CODE_SUCCESS) {
89,994!
586
      return code;
×
587
    } else {
588
      dBufSetPrintInfo(pHandle->pBuf);
89,994✔
589
    }
590
  }
591

592
  if (pHandle->type == SORT_SINGLESOURCE_SORT) {
155,008✔
593
    for (int32_t i = 0; i < pParam->numOfSources; ++i) {
130,128✔
594
      SSortSource* pSource = pParam->pSources[i];
65,094✔
595

596
      // set current source is done
597
      if (taosArrayGetSize(pSource->pageIdList) == 0) {
65,094!
598
        setCurrentSourceDone(pSource, pHandle);
×
599
        continue;
×
600
      }
601

602
      int32_t* pPgId = taosArrayGet(pSource->pageIdList, pSource->pageIndex);
65,101✔
603
      if (pPgId == NULL) {
65,105!
604
        return terrno;
×
605
      }
606

607
      void* pPage = getBufPage(pHandle->pBuf, *pPgId);
65,105✔
608
      if (NULL == pPage) {
65,117!
609
        return terrno;
×
610
      }
611
      
612
      code = blockDataFromBuf(pSource->src.pBlock, pPage);
65,117✔
613
      if (code != TSDB_CODE_SUCCESS) {
65,104!
614
        terrno = code;
×
615
        return code;
×
616
      }
617

618
      releaseBufPage(pHandle->pBuf, pPage);
65,104✔
619
    }
620
  } else {
621
    qDebug("start init for the multiway merge sort, %s", pHandle->idStr);
89,992✔
622
    int64_t st = taosGetTimestampUs();
89,989✔
623

624
    for (int32_t i = 0; i < pParam->numOfSources; ++i) {
324,925✔
625
      SSortSource* pSource = pParam->pSources[i];
234,933✔
626
      TAOS_CHECK_RETURN(pHandle->fetchfp(pSource->param, &pSource->src.pBlock));
234,933!
627

628
      // set current source is done
629
      if (pSource->src.pBlock == NULL) {
234,934✔
630
        setCurrentSourceDone(pSource, pHandle);
95,985✔
631
      }
632
    }
633

634
    int64_t et = taosGetTimestampUs();
89,995✔
635
    qDebug("init for merge sort completed, elapsed time:%.2f ms, %s", (et - st) / 1000.0, pHandle->idStr);
89,995✔
636
  }
637

638
  return code;
155,012✔
639
}
640

641
static int32_t appendOneRowToDataBlock(SSDataBlock* pBlock, const SSDataBlock* pSource, int32_t* rowIndex) {
41,202,053✔
642
  int32_t code = 0;
41,202,053✔
643

644
  for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
111,497,338✔
645
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
60,228,779✔
646
    if (pColInfo == NULL) {
65,565,535!
647
      return terrno;
×
648
    }
649

650
    SColumnInfoData* pSrcColInfo = taosArrayGet(pSource->pDataBlock, i);
65,565,535✔
651
    if (pSrcColInfo == NULL) {
63,738,243!
652
      return terrno;
×
653
    }
654

655
    bool isNull = colDataIsNull(pSrcColInfo, pSource->info.rows, *rowIndex, NULL);
63,994,932✔
656
    if (isNull) {
63,994,932✔
657
      code = colDataSetVal(pColInfo, pBlock->info.rows, NULL, true);
2,528,015✔
658
      if (code) {
2,529,884!
659
        return code;
×
660
      }
661
    } else {
662
      if (!pSrcColInfo->pData) continue;
61,466,917!
663
      char* pData = colDataGetData(pSrcColInfo, *rowIndex);
61,466,917!
664
      code = colDataSetVal(pColInfo, pBlock->info.rows, pData, false);
61,466,917✔
665
      if (code) {
67,765,401!
666
        return code;
×
667
      }
668
    }
669
  }
670

671
  pBlock->info.rows += 1;
42,424,784✔
672
  *rowIndex += 1;
42,424,784✔
673
  return code;
42,424,784✔
674
}
675

676
static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeTreeInfo* pTree, SSortHandle* pHandle,
148,404,916✔
677
                                           int32_t* numOfCompleted) {
678
  /*
679
   * load a new SDataBlock into memory of a given intermediate data-set source,
680
   * since it's last record in buffer has been chosen to be processed, as the winner of loser-tree
681
   */
682
  if (pSource->src.rowIndex >= pSource->src.pBlock->info.rows) {
148,404,916✔
683
    pSource->src.rowIndex = 0;
968,781✔
684

685
    if (pHandle->type == SORT_SINGLESOURCE_SORT) {
968,781✔
686
      pSource->pageIndex++;
300,927✔
687
      if (pSource->pageIndex >= taosArrayGetSize(pSource->pageIdList)) {
300,927✔
688
        qDebug("adjust merge tree. %d source completed %d", *numOfCompleted, pSource->pageIndex);
64,706✔
689
        (*numOfCompleted) += 1;
64,706✔
690
        pSource->src.rowIndex = -1;
64,706✔
691
        pSource->pageIndex = -1;
64,706✔
692
        blockDataDestroy(pSource->src.pBlock);
64,706✔
693
        pSource->src.pBlock = NULL;
64,707✔
694
      } else {
695
        if (pSource->pageIndex % 512 == 0) {
236,205✔
696
          qDebug("begin source %p page %d", pSource, pSource->pageIndex);
186✔
697
        }
698

699
        int32_t* pPgId = taosArrayGet(pSource->pageIdList, pSource->pageIndex);
236,205✔
700
        if (pPgId == NULL) {
236,210!
701
          return terrno;
×
702
        }
703

704
        void* pPage = getBufPage(pHandle->pBuf, *pPgId);
236,210✔
705
        if (pPage == NULL) {
236,212!
706
          qError("failed to get buffer, code:%s", tstrerror(terrno));
×
707
          return terrno;
×
708
        }
709

710
        int32_t code = blockDataFromBuf(pSource->src.pBlock, pPage);
236,212✔
711
        if (code != TSDB_CODE_SUCCESS) {
236,221!
712
          return code;
×
713
        }
714
        releaseBufPage(pHandle->pBuf, pPage);
236,221✔
715
      }
716
    } else {
717
      int64_t st = taosGetTimestampUs();      
667,856✔
718
      TAOS_CHECK_RETURN(pHandle->fetchfp(((SSortSource*)pSource)->param, &pSource->src.pBlock));
667,856!
719
      pSource->fetchUs += taosGetTimestampUs() - st;
667,854✔
720
      pSource->fetchNum++;
667,854✔
721
      if (pSource->src.pBlock == NULL) {
667,854✔
722
        (*numOfCompleted) += 1;
132,154✔
723
        pSource->src.rowIndex = -1;
132,154✔
724
        qDebug("adjust merge tree. %d source completed", *numOfCompleted);
132,154✔
725
      }
726
    }
727
  }
728

729
  /*
730
   * Adjust loser tree otherwise, according to new candidate data
731
   * if the loser tree is rebuild completed, we do not need to adjust
732
   */
733
  int32_t leafNodeIndex = tMergeTreeGetAdjustIndex(pTree);
148,404,911✔
734

735
#ifdef _DEBUG_VIEW
736
  printf("before adjust:\t");
737
  tMergeTreePrint(pTree);
738
#endif
739

740
  int32_t code = tMergeTreeAdjust(pTree, leafNodeIndex);
148,404,911✔
741
  if (TSDB_CODE_SUCCESS != code) {
147,358,190!
742
    return code;
×
743
  }
744

745
#ifdef _DEBUG_VIEW
746
  printf("\nafter adjust:\t");
747
  tMergeTreePrint(pTree);
748
#endif
749
  return TSDB_CODE_SUCCESS;
147,358,190✔
750
}
751

752
static int32_t getSortedBlockDataInner(SSortHandle* pHandle, SMsortComparParam* cmpParam, int32_t capacity,
×
753
                                       SSDataBlock** pRes) {
754
  *pRes = NULL;
×
755

756
  int32_t code = 0;
×
757
  blockDataCleanup(pHandle->pDataBlock);
×
758

759
  while (1) {
×
760
    if (cmpParam->numOfSources == pHandle->numOfCompletedSources) {
×
761
      break;
×
762
    }
763

764
    int32_t index = tMergeTreeGetChosenIndex(pHandle->pMergeTree);
×
765

766
    SSortSource* pSource = (*cmpParam).pSources[index];
×
767
    code = appendOneRowToDataBlock(pHandle->pDataBlock, pSource->src.pBlock, &pSource->src.rowIndex);
×
768
    if (code) {
×
769
      return code;
×
770
    }
771

772
    code = adjustMergeTreeForNextTuple(pSource, pHandle->pMergeTree, pHandle, &pHandle->numOfCompletedSources);
×
773
    if (code != TSDB_CODE_SUCCESS) {
×
774
      return code;
×
775
    }
776

777
    if (pHandle->pDataBlock->info.rows >= capacity) {
×
778
      *pRes = pHandle->pDataBlock;
×
779
      return code;
×
780
    }
781
  }
782

783
  *pRes = (pHandle->pDataBlock->info.rows > 0) ? pHandle->pDataBlock : NULL;
×
784
  return code;
×
785
}
786

787
// TODO: improve this function performance
788

789
int32_t tsortComparBlockCell(SSDataBlock* pLeftBlock, SSDataBlock* pRightBlock,
1,466,008✔
790
                      int32_t leftRowIndex, int32_t rightRowIndex, void* pCompareOrder) {
791
  SBlockOrderInfo* pOrder = pCompareOrder;
1,466,008✔
792
  SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->slotId);
1,466,008✔
793
  SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->slotId);
1,466,008✔
794

795
  bool isVarType = IS_VAR_DATA_TYPE(pLeftColInfoData->info.type);
1,466,008!
796
  if (pLeftColInfoData->hasNull || pRightColInfoData->hasNull) {
1,466,008✔
797
    bool leftNull = false;
6,361✔
798
    if (pLeftColInfoData->hasNull) {
6,361!
799
      if (pLeftBlock->pBlockAgg == NULL) {
6,361!
800
        leftNull = colDataIsNull_t(pLeftColInfoData, leftRowIndex, isVarType);
12,722!
801
      } else {
802
        leftNull =
803
            colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, leftRowIndex, &pLeftBlock->pBlockAgg[pOrder->slotId]);
×
804
      }
805
    }
806

807
    bool rightNull = false;
6,361✔
808
    if (pRightColInfoData->hasNull) {
6,361✔
809
      if (pRightBlock->pBlockAgg == NULL) {
6,360!
810
        rightNull = colDataIsNull_t(pRightColInfoData, rightRowIndex, isVarType);
12,722!
811
      } else {
812
        rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, rightRowIndex,
×
813
                                  &pRightBlock->pBlockAgg[pOrder->slotId]);
×
814
      }
815
    }
816

817
    if (leftNull && rightNull) {
6,361!
818
      return 0;
×
819
    }
820

821
    if (rightNull) {
6,361!
822
      return pOrder->nullFirst ? 1 : -1;
×
823
    }
824

825
    if (leftNull) {
6,361!
826
      return pOrder->nullFirst ? -1 : 1;
×
827
    }
828
  }
829

830
  void *left1, *right1;
831
  left1 = colDataGetData(pLeftColInfoData, leftRowIndex);
1,466,008!
832
  right1 = colDataGetData(pRightColInfoData, rightRowIndex);
1,466,008!
833
  __compar_fn_t fn = pOrder->compFn;
1,466,008✔
834
  int32_t ret = fn(left1, right1);
1,466,008✔
835
  return ret;
1,470,362✔
836
}
837

838
int32_t msortComparFn(const void* pLeft, const void* pRight, void* param) {
173,856,083✔
839
  int32_t pLeftIdx = *(int32_t*)pLeft;
173,856,083✔
840
  int32_t pRightIdx = *(int32_t*)pRight;
173,856,083✔
841

842
  SMsortComparParam* pParam = (SMsortComparParam*)param;
173,856,083✔
843

844
  SArray* pInfo = pParam->orderInfo;
173,856,083✔
845

846
  SSortSource* pLeftSource = pParam->pSources[pLeftIdx];
173,856,083✔
847
  SSortSource* pRightSource = pParam->pSources[pRightIdx];
173,856,083✔
848

849
  // this input is exhausted, set the special value to denote this
850
  if (pLeftSource->src.rowIndex == -1) {
173,856,083✔
851
    return 1;
31,048,553✔
852
  }
853

854
  if (pRightSource->src.rowIndex == -1) {
142,807,530✔
855
    return -1;
52,673✔
856
  }
857

858
  SSDataBlock* pLeftBlock = pLeftSource->src.pBlock;
142,754,857✔
859
  SSDataBlock* pRightBlock = pRightSource->src.pBlock;
142,754,857✔
860

861
  if (pParam->cmpGroupId) {
142,754,857✔
862
    if (pLeftBlock->info.id.groupId != pRightBlock->info.id.groupId) {
89,645,830✔
863
      return pLeftBlock->info.id.groupId < pRightBlock->info.id.groupId ? -1 : 1;
18,612,499✔
864
    }
865
  }
866

867
  if (pParam->sortType == SORT_BLOCK_TS_MERGE) {
124,142,358✔
868
    SColumnInfoData* pLeftTsCol = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pParam->tsSlotId);
4,014,315✔
869
    SColumnInfoData* pRightTsCol = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pParam->tsSlotId);
4,014,315✔
870
    int64_t*            leftTs = (int64_t*)(pLeftTsCol->pData) + pLeftSource->src.rowIndex;
4,014,315✔
871
    int64_t*            rightTs =  (int64_t*)(pRightTsCol->pData) + pRightSource->src.rowIndex;
4,014,315✔
872

873
    int32_t ret = pParam->cmpTsFn(leftTs, rightTs);
4,014,315✔
874
    if (ret == 0 && pParam->pPkOrder) {
4,014,315!
875
      ret = tsortComparBlockCell(pLeftBlock, pRightBlock, 
×
876
                              pLeftSource->src.rowIndex, pRightSource->src.rowIndex, (SBlockOrderInfo*)pParam->pPkOrder);
877
    }
878
    return ret;
4,014,315✔
879
  } else {
880
    bool isVarType;
881
    for (int32_t i = 0; i < pInfo->size; ++i) {
195,185,737✔
882
      SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pInfo, i);
141,529,206✔
883
      SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->slotId);
141,529,206✔
884
      SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->slotId);
141,529,206✔
885
      isVarType = IS_VAR_DATA_TYPE(pLeftColInfoData->info.type);
141,529,206!
886

887
      if (pLeftColInfoData->hasNull || pRightColInfoData->hasNull) {
141,529,206!
888
        bool leftNull = false;
141,677,866✔
889
        if (pLeftColInfoData->hasNull) {
141,677,866✔
890
          if (pLeftBlock->pBlockAgg == NULL) {
141,577,013!
891
            leftNull = colDataIsNull_t(pLeftColInfoData, pLeftSource->src.rowIndex, isVarType);
283,283,898!
892
          } else {
893
            leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex,
×
894
                                     &pLeftBlock->pBlockAgg[i]);
×
895
          }
896
        }
897

898
        bool rightNull = false;
141,677,866✔
899
        if (pRightColInfoData->hasNull) {
141,677,866✔
900
          if (pRightBlock->pBlockAgg == NULL) {
141,582,384!
901
            rightNull = colDataIsNull_t(pRightColInfoData, pRightSource->src.rowIndex, isVarType);
283,217,342!
902
          } else {
UNCOV
903
            rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex,
×
UNCOV
904
                                      &pRightBlock->pBlockAgg[i]);
×
905
          }
906
        }
907

908
        if (leftNull && rightNull) {
141,677,866✔
909
          continue;  // continue to next slot
2,159,582✔
910
        }
911

912
        if (rightNull) {
139,518,284✔
913
          return pOrder->nullFirst ? 1 : -1;
1,067,125!
914
        }
915

916
        if (leftNull) {
138,451,159✔
917
          return pOrder->nullFirst ? -1 : 1;
7,810!
918
        }
919
      }
920

921
      void* left1, *right1;
922
      if (isVarType) {
138,294,689✔
923
        left1 = colDataGetVarData(pLeftColInfoData, pLeftSource->src.rowIndex);
18,871,352✔
924
        right1 = colDataGetVarData(pRightColInfoData, pRightSource->src.rowIndex);
18,871,352✔
925
      } else {
926
        left1 = colDataGetNumData(pLeftColInfoData, pLeftSource->src.rowIndex);
119,423,337✔
927
        right1 = colDataGetNumData(pRightColInfoData, pRightSource->src.rowIndex);
119,423,337✔
928
      }
929

930
      __compar_fn_t fn = pOrder->compFn;
138,294,689✔
931
      if (!fn) {
138,294,689✔
932
        fn = getKeyComparFunc(pLeftColInfoData->info.type, pOrder->order);
27,473✔
933
        pOrder->compFn = fn;
27,473✔
934
      }
935

936
      int32_t ret = fn(left1, right1);
138,294,689✔
937
      if (ret == 0) {
138,224,875✔
938
        continue;
72,898,112✔
939
      } else {
940
        return ret;
65,326,763✔
941
      }
942
    }
943
  }
944

945
  return 0;
53,656,531✔
946
}
947

948
static int32_t doSortForEachGroup(SSortHandle* pHandle, int32_t sortTimes, int32_t numOfSorted,
×
949
                                  int32_t numOfInputSources, SArray* pResList, int32_t sortGroup, int32_t numOfRows) {
950
  int32_t code = 0;
×
951
  int32_t lino = 0;
×
952
  SArray* pPageIdList = NULL;
×
953

954
  for (int32_t i = 0; i < sortGroup; ++i) {
×
955
    qDebug("internal merge sort pass %d group %d. num input sources %d ", sortTimes, i, numOfInputSources);
×
956
    pHandle->sourceId += 1;
×
957

958
    int32_t end = (i + 1) * numOfInputSources - 1;
×
959
    if (end > numOfSorted - 1) {
×
960
      end = numOfSorted - 1;
×
961
    }
962

963
    pHandle->cmpParam.numOfSources = end - i * numOfInputSources + 1;
×
964

965
    code = sortComparInit(&pHandle->cmpParam, pHandle->pOrderedSource, i * numOfInputSources, end, pHandle);
×
966
    QUERY_CHECK_CODE(code, lino, _err);
×
967

968
    code =
969
        tMergeTreeCreate(&pHandle->pMergeTree, pHandle->cmpParam.numOfSources, &pHandle->cmpParam, pHandle->comparFn);
×
970
    QUERY_CHECK_CODE(code, lino, _err);
×
971

972
    int32_t nMergedRows = 0;
×
973
    pPageIdList = taosArrayInit(4, sizeof(int32_t));
×
974
    QUERY_CHECK_NULL(pPageIdList, code, lino, _err, terrno);
×
975

976
    while (1) {
×
977
      if (tsortIsClosed(pHandle) || (pHandle->abortCheckFn && pHandle->abortCheckFn(pHandle->abortCheckParam))) {
×
978
        code = TSDB_CODE_TSC_QUERY_CANCELLED;
×
979
        goto _err;
×
980
      }
981

982
      SSDataBlock* pDataBlock = NULL;
×
983
      code = getSortedBlockDataInner(pHandle, &pHandle->cmpParam, numOfRows, &pDataBlock);
×
984
      if (pDataBlock == NULL || code != 0) {
×
985
        break;
986
      }
987

988
      int32_t pageId = -1;
×
989
      void*   pPage = getNewBufPage(pHandle->pBuf, &pageId);
×
990
      QUERY_CHECK_NULL(pPage, code, lino, _err, terrno);
×
991

992
      void* px = taosArrayPush(pPageIdList, &pageId);
×
993
      QUERY_CHECK_NULL(px, code, lino, _err, terrno);
×
994

995
      int32_t size =
×
996
          blockDataGetSize(pDataBlock) + sizeof(int32_t) + taosArrayGetSize(pDataBlock->pDataBlock) * sizeof(int32_t);
×
997
      if (size > getBufPageSize(pHandle->pBuf)) {
×
998
        code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
999
        goto _err;
×
1000
      }
1001

1002
      code = blockDataToBuf(pPage, pDataBlock);
×
1003
      QUERY_CHECK_CODE(code, lino, _err);
×
1004

1005
      setBufPageDirty(pPage, true);
×
1006
      releaseBufPage(pHandle->pBuf, pPage);
×
1007
      nMergedRows += pDataBlock->info.rows;
×
1008

1009
      blockDataCleanup(pDataBlock);
×
1010
      if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
×
1011
        break;
×
1012
      }
1013
    }
1014

1015
    code = sortComparCleanup(&pHandle->cmpParam);
×
1016
    QUERY_CHECK_CODE(code, lino, _err);
×
1017

1018
    tMergeTreeDestroy(&pHandle->pMergeTree);
×
1019
    pHandle->numOfCompletedSources = 0;
×
1020

1021
    SSDataBlock* pBlock = NULL;
×
1022
    code = createOneDataBlock(pHandle->pDataBlock, false, &pBlock);
×
1023
    QUERY_CHECK_CODE(code, lino, _err);
×
1024

1025
    code = doAddNewExternalMemSource(pHandle->pBuf, pResList, pBlock, &pHandle->sourceId, pPageIdList);
×
1026
    if (code != TSDB_CODE_SUCCESS) {
×
1027
      blockDataDestroy(pBlock);
×
1028
    }
1029
    QUERY_CHECK_CODE(code, lino, _err);
×
1030
  }
1031

1032
  return code;
×
1033

1034
_err:
×
1035
  taosArrayDestroy(pPageIdList);
×
1036
  qError("%s error happens:%s line:%d, code:%s", pHandle->idStr, __func__, lino, tstrerror(code));
×
1037
  return code;
×
1038
}
1039

1040
static int32_t doInternalMergeSort(SSortHandle* pHandle) {
343,393✔
1041
  size_t numOfSources = taosArrayGetSize(pHandle->pOrderedSource);
343,393✔
1042
  if (numOfSources == 0) {
343,415✔
1043
    return 0;
188,401✔
1044
  }
1045

1046
  // Calculate the I/O counts to complete the data sort.
1047
  double sortPass = floorl(log2(numOfSources) / log2(pHandle->numOfPages));
155,014✔
1048

1049
  pHandle->totalElapsed = taosGetTimestampUs() - pHandle->startTs;
155,017✔
1050

1051
  if (sortPass > 0) {
155,017!
1052
    size_t s = pHandle->pBuf ? getTotalBufSize(pHandle->pBuf) : 0;
×
1053
    qDebug("%s %d rounds mergesort required to complete the sort, first-round sorted data size:%" PRIzu
×
1054
           ", sort elapsed:%" PRId64 ", total elapsed:%" PRId64,
1055
           pHandle->idStr, (int32_t)(sortPass + 1), s, pHandle->sortElapsed, pHandle->totalElapsed);
1056
  } else {
1057
    qDebug("%s ordered source:%" PRIzu ", available buf:%d, no need internal sort", pHandle->idStr, numOfSources,
155,017✔
1058
           pHandle->numOfPages);
1059
  }
1060

1061
  int32_t size = (int32_t) blockDataGetSerialMetaSize(taosArrayGetSize(pHandle->pDataBlock->pDataBlock));
155,017✔
1062
  int32_t numOfRows = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize, size);
155,008✔
1063
  if (numOfRows < 0) {
155,030!
1064
    return terrno;
×
1065
  }
1066
  
1067
  int32_t code = blockDataEnsureCapacity(pHandle->pDataBlock, numOfRows);
155,030✔
1068
  if (code) {
155,026!
1069
    return code;
×
1070
  }
1071

1072
  // the initial pass + sortPass + final mergePass
1073
  pHandle->loops = sortPass + 2;
155,026✔
1074

1075
  size_t numOfSorted = taosArrayGetSize(pHandle->pOrderedSource);
155,026✔
1076
  for (int32_t t = 0; t < sortPass; ++t) {
155,021!
1077
    int64_t st = taosGetTimestampUs();
×
1078
    SArray* pResList = taosArrayInit(4, POINTER_BYTES);
×
1079
    if (pResList == NULL) {
×
1080
      return terrno;
×
1081
    }
1082

1083
    int32_t numOfInputSources = pHandle->numOfPages;
×
1084
    int32_t sortGroup = (numOfSorted + numOfInputSources - 1) / numOfInputSources;
×
1085

1086
    // Only *numOfInputSources* can be loaded into buffer to perform the external sort.
1087
    code = doSortForEachGroup(pHandle, t, numOfSorted, numOfInputSources, pResList, sortGroup, numOfRows);
×
1088
    if (code != 0) {
×
1089
      tsortClearOrderedSource(pResList, NULL, NULL);
×
1090
      taosArrayDestroy(pResList);
×
1091
      return code;
×
1092
    }
1093

1094
    tsortClearOrderedSource(pHandle->pOrderedSource, NULL, NULL);
×
1095
    void* px = taosArrayAddAll(pHandle->pOrderedSource, pResList);
×
1096
    if (px == NULL) {
×
1097
      tsortClearOrderedSource(pResList, NULL, NULL);
×
1098
      taosArrayDestroy(pResList);
×
1099
      return terrno;
×
1100
    }
1101

1102
    taosArrayDestroy(pResList);
×
1103
    numOfSorted = taosArrayGetSize(pHandle->pOrderedSource);
×
1104

1105
    int64_t el = taosGetTimestampUs() - st;
×
1106
    pHandle->totalElapsed += el;
×
1107

1108
    SDiskbasedBufStatis statis = getDBufStatis(pHandle->pBuf);
×
1109
    qDebug("%s %d round mergesort, elapsed:%" PRId64 " readDisk:%.2f Kb, flushDisk:%.2f Kb", pHandle->idStr, t + 1, el,
×
1110
           statis.loadBytes / 1024.0, statis.flushBytes / 1024.0);
1111

1112
    if (pHandle->type == SORT_MULTISOURCE_MERGE) {
×
1113
      pHandle->type = SORT_SINGLESOURCE_SORT;
×
1114
      pHandle->comparFn = msortComparFn;
×
1115
    }
1116
  }
1117

1118
  pHandle->cmpParam.numOfSources = taosArrayGetSize(pHandle->pOrderedSource);
155,021✔
1119
  return 0;
155,015✔
1120
}
1121

1122
// get sort page size
1123
int32_t getProperSortPageSize(size_t rowSize, uint32_t numOfCols) {
409,908✔
1124
  uint32_t pgSize = rowSize * 4 + blockDataGetSerialMetaSize(numOfCols);
409,908✔
1125
  if (pgSize < DEFAULT_PAGESIZE) {
410,233✔
1126
    return DEFAULT_PAGESIZE;
392,160✔
1127
  }
1128

1129
  return pgSize;
18,073✔
1130
}
1131

1132
static int32_t createPageBuf(SSortHandle* pHandle) {
74,907✔
1133
  if (pHandle->pBuf == NULL) {
74,907!
1134
    if (!osTempSpaceAvailable()) {
74,915!
1135
      terrno = TSDB_CODE_NO_DISKSPACE;
×
1136
      qError("create page buf failed since %s, tempDir:%s", terrstr(), tsTempDir);
×
1137
      return terrno;
×
1138
    }
1139

1140
    int32_t code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize,
74,910✔
1141
                                      "tableBlocksBuf", tsTempDir);
1142
    if (code != TSDB_CODE_SUCCESS) {
74,924!
1143
      return code;
×
1144
    } else {
1145
      dBufSetPrintInfo(pHandle->pBuf);
74,924✔
1146
    }
1147
  }
1148
  return 0;
74,905✔
1149
}
1150

1151
int32_t tsortAppendTupleToBlock(SSortHandle* pHandle, SSDataBlock* pBlock, STupleHandle* pTupleHandle) {
58,615,946✔
1152
  int32_t code = 0;
58,615,946✔
1153

1154
  if (pHandle->bSortByRowId) {
58,615,946✔
1155
    int32_t *p1, *p2, *p3;
1156
    tsortGetValue(pTupleHandle, 1, (void**) &p1);
3,155,759✔
1157
    tsortGetValue(pTupleHandle, 2, (void**) &p2);
3,154,891✔
1158
    tsortGetValue(pTupleHandle, 3, (void**) &p3);
3,152,646✔
1159

1160
    int32_t regionId = *p1;
3,151,203✔
1161
    int32_t offset = *p2;
3,151,203✔
1162
    int32_t length = *p3;
3,151,203✔
1163
    
1164
    char* buf = NULL;
3,151,203✔
1165
    bool bFreeRow = false;
3,151,203✔
1166

1167
    code = getRowBufFromExtMemFile(pHandle, regionId, offset, length, &buf, &bFreeRow);
3,151,203✔
1168
    if (code) {
3,151,877!
1169
      return code;
×
1170
    }
1171

1172
    int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
3,151,877✔
1173
    char*   isNull = (char*)buf;
3,151,372✔
1174
    char*   pStart = (char*)buf + sizeof(int8_t) * numOfCols;
3,151,372✔
1175
    for (int32_t i = 0; i < numOfCols; ++i) {
29,826,306✔
1176
      SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
26,278,132✔
1177
      if (pColInfo == NULL) {
26,251,584!
1178
        return terrno;
×
1179
      }
1180

1181
      if (!isNull[i]) {
26,659,783✔
1182
        code = colDataSetVal(pColInfo, pBlock->info.rows, pStart, false);
19,995,724✔
1183
        if (code) {
20,010,875!
1184
          return code;
×
1185
        }
1186

1187
        if (pColInfo->info.type == TSDB_DATA_TYPE_JSON) {
20,010,875✔
1188
          int32_t dataLen = getJsonValueLen(pStart);
148✔
1189
          pStart += dataLen;
148✔
1190
        } else if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
20,010,727!
1191
          pStart += varDataTLen(pStart);
7,586,311✔
1192
        } else {
1193
          int32_t bytes = pColInfo->info.bytes;
12,424,416✔
1194
          pStart += bytes;
12,424,416✔
1195
        }
1196
      } else {
1197
        colDataSetNULL(pColInfo, pBlock->info.rows);
6,664,059✔
1198
      }
1199
    }
1200

1201
    if (*(int32_t*)pStart != pStart - buf) {
3,548,174!
1202
      qError("table merge scan row buf deserialization. length error %d != %d ", *(int32_t*)pStart,
×
1203
             (int32_t)(pStart - buf));
1204
    }
1205

1206
    if (bFreeRow) {
3,548,174!
1207
      taosMemoryFree(buf);
×
1208
    }
1209

1210
    pBlock->info.dataLoad = 1;
3,548,174✔
1211

1212
    SDataBlockInfo info = {0};
3,548,174✔
1213
    tsortGetBlockInfo(pTupleHandle, &info);
3,548,174✔
1214

1215
    pBlock->info.scanFlag = info.scanFlag;
3,156,023✔
1216
    pBlock->info.rows += 1;
3,156,023✔
1217

1218
  } else {
1219
    for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
152,973,295✔
1220
      SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
98,462,882✔
1221
      if (pColInfo == NULL) {
98,020,866!
1222
        return terrno;
×
1223
      }
1224

1225
      bool isNull = tsortIsNullVal(pTupleHandle, i);
98,020,866✔
1226
      if (isNull) {
95,033,984✔
1227
        colDataSetNULL(pColInfo, pBlock->info.rows);
5,979,073✔
1228
      } else {
1229
        char* pData = NULL;
89,054,911✔
1230
        tsortGetValue(pTupleHandle, i, (void**)&pData);
89,054,911✔
1231
        if (pData != NULL) {
88,759,249✔
1232
          code = colDataSetVal(pColInfo, pBlock->info.rows, pData, false);
88,651,972✔
1233
          if (code) {
91,426,758!
1234
            return code;
×
1235
          }
1236
        }
1237
      }
1238
    }
1239

1240
    pBlock->info.dataLoad = 1;
46,409,025✔
1241
    SDataBlockInfo info = {0};
46,409,025✔
1242
    tsortGetBlockInfo(pTupleHandle, &info);
46,409,025✔
1243

1244
    pBlock->info.scanFlag = info.scanFlag;
55,593,050✔
1245
    pBlock->info.rows += 1;
55,593,050✔
1246
  }
1247

1248
  return code;
58,749,073✔
1249
}
1250

1251
static int32_t blockRowToBuf(SSDataBlock* pBlock, int32_t rowIdx, char* buf) {
3,138,133✔
1252
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
3,138,133✔
1253

1254
  char* isNull = (char*)buf;
3,138,411✔
1255
  char* pStart = (char*)buf + sizeof(int8_t) * numOfCols;
3,138,411✔
1256
  for (int32_t i = 0; i < numOfCols; ++i) {
29,694,712✔
1257
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
26,230,390✔
1258
    if (pCol == NULL) {
26,217,835!
1259
      return terrno;
×
1260
    }
1261

1262
    if (colDataIsNull_s(pCol, rowIdx)) {
53,112,602✔
1263
      isNull[i] = 1;
6,619,067✔
1264
      continue;
6,619,067✔
1265
    }
1266

1267
    isNull[i] = 0;
19,937,234✔
1268
    char* pData = colDataGetData(pCol, rowIdx);
19,937,234!
1269
    if (pCol->info.type == TSDB_DATA_TYPE_JSON) {
19,937,234✔
1270
      if (pCol->pData) {
148✔
1271
        int32_t dataLen = getJsonValueLen(pData);
144✔
1272
        memcpy(pStart, pData, dataLen);
144✔
1273
        pStart += dataLen;
144✔
1274
      } else {
1275
        // the column that is pre-allocated has no data and has offset
1276
        *pStart = 0;
4✔
1277
        pStart += 1;
4✔
1278
      }
1279
    } else if (IS_VAR_DATA_TYPE(pCol->info.type)) {
19,937,086!
1280
      if (pCol->pData) {
7,553,139!
1281
        varDataCopy(pStart, pData);
7,671,323✔
1282
        pStart += varDataTLen(pData);
7,671,323✔
1283
      } else {
1284
        // the column that is pre-allocated has no data and has offset
1285
        *(VarDataLenT*)(pStart) = 0;
×
1286
        pStart += VARSTR_HEADER_SIZE;
×
1287
      }
1288
    } else {
1289
      int32_t bytes = pCol->info.bytes;
12,383,947✔
1290
      memcpy(pStart, pData, bytes);
12,383,947✔
1291
      pStart += bytes;
12,383,947✔
1292
    }
1293
  }
1294
  *(int32_t*)pStart = (char*)pStart - (char*)buf;
3,464,322✔
1295
  pStart += sizeof(int32_t);
3,464,322✔
1296
  return (int32_t)(pStart - (char*)buf);
3,464,322✔
1297
}
1298

1299
static int32_t getRowBufFromExtMemFile(SSortHandle* pHandle, int32_t regionId, int32_t tupleOffset, int32_t rowLen,
3,151,973✔
1300
                                       char** ppRow, bool* pFreeRow) {
1301
  SSortMemFile* pMemFile = pHandle->pExtRowsMemFile;
3,151,973✔
1302
  SSortMemFileRegion* pRegion = taosArrayGet(pMemFile->aFileRegions, regionId);
3,151,973✔
1303
  if (pRegion == NULL) {
3,152,093!
1304
    return terrno;
×
1305
  }
1306

1307
  if (pRegion->buf == NULL) {
3,152,334✔
1308
    pRegion->bufRegOffset = 0;
18,873✔
1309
    pRegion->buf = taosMemoryMalloc(pMemFile->blockSize);
18,873✔
1310
    if (pRegion->buf == NULL) {
18,873!
1311
      return terrno;
×
1312
    }
1313

1314
    TAOS_CHECK_RETURN(taosSeekCFile(pMemFile->pTdFile, pRegion->fileOffset, SEEK_SET));
18,873!
1315

1316
    int32_t readBytes = TMIN(pMemFile->blockSize, pRegion->regionSize);
18,867✔
1317
    int32_t ret = taosReadFromCFile(pRegion->buf, readBytes, 1, pMemFile->pTdFile);
18,867✔
1318
    if (ret != 1) {
18,871!
1319
      terrno = TAOS_SYSTEM_ERROR(errno);
×
1320
      return terrno;
×
1321
    }
1322
    pRegion->bufLen = readBytes;
18,872✔
1323
  }
1324
  if (pRegion->bufRegOffset > tupleOffset) {
3,152,333!
1325
    qError("sort failed at: %s:%d", __func__, __LINE__);
×
1326
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1327
  }
1328
  if (pRegion->bufRegOffset + pRegion->bufLen >= tupleOffset + rowLen) {
3,152,333!
1329
    *pFreeRow = false;
3,152,383✔
1330
    *ppRow = pRegion->buf + tupleOffset - pRegion->bufRegOffset;
3,152,383✔
1331
  } else {
1332
    *ppRow = taosMemoryMalloc(rowLen);
×
1333
    if (*ppRow == NULL) {
×
1334
      return terrno;
×
1335
    }
1336
    int32_t szThisBlock = pRegion->bufLen - (tupleOffset - pRegion->bufRegOffset);
×
1337
    memcpy(*ppRow, pRegion->buf + tupleOffset - pRegion->bufRegOffset, szThisBlock);
×
1338

1339
    // todo
1340
    (void) taosSeekCFile(pMemFile->pTdFile, pRegion->fileOffset + pRegion->bufRegOffset + pRegion->bufLen, SEEK_SET);
×
1341
    int32_t readBytes = TMIN(pMemFile->blockSize, pRegion->regionSize - (pRegion->bufRegOffset + pRegion->bufLen));
×
1342
    int32_t     ret = taosReadFromCFile(pRegion->buf, readBytes, 1, pMemFile->pTdFile);
×
1343
    if (ret != 1) {
×
1344
      taosMemoryFreeClear(*ppRow);
×
1345
      terrno = TAOS_SYSTEM_ERROR(errno);
×
1346
      return terrno;
×
1347
    }
1348
    memcpy(*ppRow + szThisBlock, pRegion->buf, rowLen - szThisBlock);
×
1349
    *pFreeRow = true;
×
1350
    pRegion->bufRegOffset += pRegion->bufLen;
×
1351
    pRegion->bufLen = readBytes;
×
1352
  }
1353
  return TSDB_CODE_SUCCESS;
3,152,383✔
1354
}
1355

1356
static int32_t createSortMemFile(SSortHandle* pHandle) {
23,317✔
1357
  if (pHandle->pExtRowsMemFile != NULL) {
23,317!
1358
    return TSDB_CODE_SUCCESS;
×
1359
  }
1360
  int32_t       code = TSDB_CODE_SUCCESS;
23,317✔
1361
  SSortMemFile* pMemFile = taosMemoryCalloc(1, sizeof(SSortMemFile));
23,317✔
1362
  if (pMemFile == NULL) {
23,325!
1363
    code = terrno;
×
1364
  }
1365
  if (code == TSDB_CODE_SUCCESS) {
23,325!
1366
    taosGetTmpfilePath(tsTempDir, "sort-ext-mem", pMemFile->memFilePath);
23,327✔
1367
    pMemFile->pTdFile = taosOpenCFile(pMemFile->memFilePath, "w+b");
23,333✔
1368
    if (pMemFile->pTdFile == NULL) {
23,337!
1369
      code = terrno = TAOS_SYSTEM_ERROR(errno);
×
1370
    }
1371
  }
1372
  if (code == TSDB_CODE_SUCCESS) {
23,335!
1373
    code = taosSetAutoDelFile(pMemFile->memFilePath);
23,337✔
1374
    if (code) {
23,337!
1375
      qError("failed to set the auto-delete file attribute");
×
1376
      return code;
×
1377
    }
1378

1379
    pMemFile->currRegionId = -1;
23,337✔
1380
    pMemFile->currRegionOffset = -1;
23,337✔
1381

1382
    pMemFile->writeBufSize = 4 * 1024 * 1024;
23,337✔
1383
    pMemFile->writeFileOffset = -1;
23,337✔
1384
    pMemFile->bRegionDirty = false;
23,337✔
1385
    
1386
    pMemFile->writeBuf = taosMemoryMalloc(pMemFile->writeBufSize);
23,337✔
1387
    if (pMemFile->writeBuf == NULL) {
23,337!
1388
      code = terrno;
×
1389
    }
1390
  }
1391

1392
  if (code == TSDB_CODE_SUCCESS) {
23,335!
1393
    pMemFile->cacheSize = pHandle->extRowsMemSize;
23,336✔
1394
    pMemFile->aFileRegions = taosArrayInit(64, sizeof(SSortMemFileRegion));
23,336✔
1395
    if (pMemFile->aFileRegions == NULL) {
23,334!
1396
      code = terrno;
×
1397
    }
1398
  }
1399

1400
  if (code == TSDB_CODE_SUCCESS) {
23,334!
1401
    pHandle->pExtRowsMemFile = pMemFile;
23,334✔
1402
  } else {
1403
    if (pMemFile) {
×
1404
      if (pMemFile->aFileRegions) taosMemoryFreeClear(pMemFile->aFileRegions);
×
1405
      if (pMemFile->writeBuf) taosMemoryFreeClear(pMemFile->writeBuf);
×
1406
      if (pMemFile->pTdFile) {
×
1407
        (void) taosCloseCFile(pMemFile->pTdFile);
×
1408
        pMemFile->pTdFile = NULL;
×
1409
      }
1410
      taosMemoryFreeClear(pMemFile);
×
1411
    }
1412
  }
1413
  return code;
23,334✔
1414
}
1415

1416
static void destroySortMemFile(SSortHandle* pHandle) {
23,318✔
1417
  if (pHandle->pExtRowsMemFile == NULL) {
23,318!
1418
    return;
×
1419
  }
1420

1421
  SSortMemFile* pMemFile = pHandle->pExtRowsMemFile;
23,318✔
1422
  for (int32_t i = 0; i < taosArrayGetSize(pMemFile->aFileRegions); ++i) {
42,195✔
1423
    SSortMemFileRegion* pRegion = taosArrayGet(pMemFile->aFileRegions, i);
18,867✔
1424
    if (pRegion == NULL) {
18,869!
1425
      continue;
×
1426
    }
1427

1428
    taosMemoryFree(pRegion->buf);
18,869✔
1429
  }
1430

1431
  taosArrayDestroy(pMemFile->aFileRegions);
23,330✔
1432
  pMemFile->aFileRegions = NULL;
23,336✔
1433

1434
  taosMemoryFree(pMemFile->writeBuf);
23,336✔
1435
  pMemFile->writeBuf = NULL;
23,336✔
1436

1437
  (void) taosCloseCFile(pMemFile->pTdFile);
23,336✔
1438
  pMemFile->pTdFile = NULL;
23,337✔
1439
  (void) taosRemoveFile(pMemFile->memFilePath);
23,337✔
1440
  taosMemoryFree(pMemFile);
23,337✔
1441
  pHandle->pExtRowsMemFile = NULL;
23,337✔
1442
}
1443

1444
static int32_t tsortOpenRegion(SSortHandle* pHandle) {
18,870✔
1445
  SSortMemFile* pMemFile = pHandle->pExtRowsMemFile;
18,870✔
1446
  int32_t code = 0;
18,870✔
1447

1448
  if (pMemFile->currRegionId == -1) {
18,870!
1449
    SSortMemFileRegion region = {0};
18,876✔
1450
    region.fileOffset = 0;
18,876✔
1451
    region.bufRegOffset = 0;
18,876✔
1452
    void* px = taosArrayPush(pMemFile->aFileRegions, &region);
18,876✔
1453
    if (px == NULL) {
18,873!
1454
      code = terrno;
×
1455
    }
1456

1457
    pMemFile->currRegionId = 0;
18,873✔
1458
    pMemFile->currRegionOffset = 0;
18,873✔
1459
    pMemFile->writeFileOffset = 0;
18,873✔
1460
  } else {
1461
    SSortMemFileRegion regionNew = {0};
×
1462
    SSortMemFileRegion* pRegion = taosArrayGet(pMemFile->aFileRegions, pMemFile->currRegionId);
×
1463
    if (pRegion == NULL) {
×
1464
      return terrno;
×
1465
    }
1466

1467
    regionNew.fileOffset = pRegion->fileOffset + pRegion->regionSize;
×
1468
    regionNew.bufRegOffset = 0;
×
1469

1470
    void* px = taosArrayPush(pMemFile->aFileRegions, &regionNew);
×
1471
    if (px == NULL) {
×
1472
      code = terrno;
×
1473
    }
1474
    ++pMemFile->currRegionId;
×
1475
    pMemFile->currRegionOffset = 0;
×
1476
    pMemFile->writeFileOffset = regionNew.fileOffset;
×
1477
  }
1478
  return code;
18,873✔
1479
}
1480

1481
static int32_t tsortCloseRegion(SSortHandle* pHandle) {
18,876✔
1482
  SSortMemFile* pMemFile = pHandle->pExtRowsMemFile;
18,876✔
1483
  SSortMemFileRegion* pRegion = taosArrayGet(pMemFile->aFileRegions, pMemFile->currRegionId);
18,876✔
1484
  if (pRegion == NULL) {
18,876!
1485
    return terrno;
×
1486
  }
1487

1488
  pRegion->regionSize = pMemFile->currRegionOffset;
18,876✔
1489
  int32_t writeBytes = pRegion->regionSize - (pMemFile->writeFileOffset - pRegion->fileOffset);
18,876✔
1490
  if (writeBytes > 0) {
18,876!
1491
    int32_t ret = fwrite(pMemFile->writeBuf, writeBytes, 1, pMemFile->pTdFile);
18,876✔
1492
    if (ret != 1) {
18,876!
1493
      terrno = TAOS_SYSTEM_ERROR(errno);
×
1494
      return terrno;
×
1495
    }
1496
    pMemFile->bRegionDirty = false;
18,876✔
1497
  }
1498
  return TSDB_CODE_SUCCESS;
18,876✔
1499
}
1500

1501
static int32_t tsortFinalizeRegions(SSortHandle* pHandle) {
23,331✔
1502
  SSortMemFile* pMemFile = pHandle->pExtRowsMemFile;
23,331✔
1503
  size_t numRegions = taosArrayGetSize(pMemFile->aFileRegions);
23,331✔
1504
  if (numRegions != (pMemFile->currRegionId + 1)) {
23,331✔
1505
    qError("sort failed at: %s:%d", __func__, __LINE__);
2!
1506
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1507
  }
1508
  if (numRegions == 0) {
23,329✔
1509
    return TSDB_CODE_SUCCESS;
4,457✔
1510
  }
1511

1512
  int32_t blockReadBytes = (pMemFile->cacheSize / numRegions + 4095) & ~4095;
18,872✔
1513
  pMemFile->blockSize = blockReadBytes;
18,872✔
1514

1515
  for (int32_t i = 0; i < numRegions; ++i) {
37,745✔
1516
    SSortMemFileRegion* pRegion = taosArrayGet(pMemFile->aFileRegions, i);
18,872✔
1517
    if (pRegion == NULL) {
18,872!
1518
      return terrno;
×
1519
    }
1520

1521
    pRegion->bufRegOffset = 0;
18,873✔
1522
  }
1523

1524
  taosMemoryFree(pMemFile->writeBuf);
18,873✔
1525
  pMemFile->writeBuf = NULL;
18,876✔
1526
  return TSDB_CODE_SUCCESS;
18,876✔
1527
}
1528

1529
static int32_t saveBlockRowToExtRowsMemFile(SSortHandle* pHandle, SSDataBlock* pBlock, int32_t rowIdx,
3,138,102✔
1530
                                            int32_t* pRegionId, int32_t* pOffset, int32_t* pLength) {
1531

1532
  SSortMemFile* pMemFile = pHandle->pExtRowsMemFile;
3,138,102✔
1533
  SSortMemFileRegion* pRegion = taosArrayGet(pMemFile->aFileRegions, pMemFile->currRegionId);
3,138,102✔
1534
  if (pRegion == NULL) {
3,137,245!
1535
    return terrno;
×
1536
  }
1537

1538
  {
1539
    if (pMemFile->currRegionOffset + pHandle->extRowBytes >= pMemFile->writeBufSize) {
3,137,245!
1540
      int32_t writeBytes = pMemFile->currRegionOffset - (pMemFile->writeFileOffset - pRegion->fileOffset);
×
1541
      int32_t ret = fwrite(pMemFile->writeBuf, writeBytes, 1, pMemFile->pTdFile);
×
1542
      if (ret !=  1) {
×
1543
        terrno = TAOS_SYSTEM_ERROR(errno);
×
1544
        return terrno;
×
1545
      }
1546
      pMemFile->writeFileOffset = pRegion->fileOffset + pMemFile->currRegionOffset;
×
1547
    }
1548
  }
1549

1550
  *pRegionId = pMemFile->currRegionId;
3,137,245✔
1551
  *pOffset = pMemFile->currRegionOffset;
3,137,245✔
1552
  int32_t writeBufOffset = pMemFile->currRegionOffset - (pMemFile->writeFileOffset - pRegion->fileOffset);
3,137,245✔
1553
  int32_t blockLen = blockRowToBuf(pBlock, rowIdx, pMemFile->writeBuf + writeBufOffset);
3,137,245✔
1554
  *pLength = blockLen;
3,137,792✔
1555

1556
  pMemFile->currRegionOffset += blockLen;
3,137,792✔
1557
  pMemFile->bRegionDirty = true;
3,137,792✔
1558
  return TSDB_CODE_SUCCESS;
3,137,792✔
1559
}
1560

1561
static int32_t appendToRowIndexDataBlock(SSortHandle* pHandle, SSDataBlock* pSource, int32_t* rowIndex) {
3,137,965✔
1562
  int32_t pageId = -1;
3,137,965✔
1563
  int32_t offset = -1;
3,137,965✔
1564
  int32_t length = -1;
3,137,965✔
1565
  int32_t code = 0;
3,137,965✔
1566

1567
  code = saveBlockRowToExtRowsMemFile(pHandle, pSource, *rowIndex, &pageId, &offset, &length);
3,137,965✔
1568
  if (code) {
3,138,697!
1569
    return code;
×
1570
  }
1571

1572
  SSDataBlock* pBlock = pHandle->pDataBlock;
3,138,697✔
1573
  SBlockOrderInfo* extRowsTsOrder = taosArrayGet(pHandle->aExtRowsOrders, 0);
3,138,697✔
1574
  if (extRowsTsOrder == NULL) {
3,136,713!
1575
    return terrno;
×
1576
  }
1577

1578
  SColumnInfoData* pSrcTsCol = taosArrayGet(pSource->pDataBlock, extRowsTsOrder->slotId);
3,136,713✔
1579
  if (pSrcTsCol == NULL) {
3,135,917!
1580
    return terrno;
×
1581
  }
1582

1583
  SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, 0);
3,135,917✔
1584
  if (pTsCol == NULL) {
3,134,548!
1585
    return terrno;
×
1586
  }
1587

1588
  char* pData = colDataGetData(pSrcTsCol, *rowIndex);
3,135,253!
1589
  code = colDataSetVal(pTsCol, pBlock->info.rows, pData, false);
3,135,253✔
1590
  if (code) {
3,139,888!
1591
    return code;
×
1592
  }
1593

1594
  SColumnInfoData* pRegionIdCol = taosArrayGet(pBlock->pDataBlock, 1);
3,139,888✔
1595
  if (pRegionIdCol == NULL) {
3,138,445✔
1596
    return terrno;
255✔
1597
  }
1598

1599
  colDataSetInt32(pRegionIdCol, pBlock->info.rows, &pageId);
3,138,190✔
1600

1601
  SColumnInfoData* pOffsetCol = taosArrayGet(pBlock->pDataBlock, 2);
3,138,190✔
1602
  if (pOffsetCol == NULL) {
3,133,799!
1603
    return terrno;
×
1604
  }
1605

1606
  colDataSetInt32(pOffsetCol, pBlock->info.rows, &offset);
3,135,369✔
1607

1608
  SColumnInfoData* pLengthCol = taosArrayGet(pBlock->pDataBlock, 3);
3,135,369✔
1609
  if (pLengthCol == NULL) {
3,131,846!
1610
    return terrno;
×
1611
  }
1612

1613
  colDataSetInt32(pLengthCol, pBlock->info.rows, &length);
3,133,465✔
1614

1615
  if (pHandle->bSortPk) {
3,133,465✔
1616
    SBlockOrderInfo* extRowsPkOrder = taosArrayGet(pHandle->aExtRowsOrders, 1);
305,996✔
1617
    if (extRowsPkOrder == NULL) {
305,296!
1618
      return terrno;
×
1619
    }
1620

1621
    SColumnInfoData* pSrcPkCol = taosArrayGet(pSource->pDataBlock, extRowsPkOrder->slotId);
305,296✔
1622
    if (pSrcPkCol == NULL) {
305,305!
1623
      return terrno;
×
1624
    }
1625

1626
    SColumnInfoData* pPkCol = taosArrayGet(pBlock->pDataBlock, 4);
305,305✔
1627
    if (pPkCol == NULL) {
305,236!
1628
      return terrno;
×
1629
    }
1630

1631
    if (colDataIsNull_s(pSrcPkCol, *rowIndex)) {
611,242!
1632
      colDataSetNULL(pPkCol, pBlock->info.rows);
×
1633
    } else {
1634
      char* pPkData = colDataGetData(pSrcPkCol, *rowIndex);
305,621!
1635
      code = colDataSetVal(pPkCol, pBlock->info.rows, pPkData, false);
305,621✔
1636
      if (code) {
306,660!
1637
        return code;
×
1638
      }
1639
    }
1640
  }
1641

1642
  pBlock->info.rows += 1;
3,134,129✔
1643
  *rowIndex += 1;
3,134,129✔
1644
  return code;
3,134,129✔
1645
}
1646

1647
static int32_t initRowIdSort(SSortHandle* pHandle) {
23,326✔
1648
  SBlockOrderInfo* pkOrder = (pHandle->bSortPk) ? taosArrayGet(pHandle->aExtRowsOrders, 1) : NULL;
23,326✔
1649
  SColumnInfoData* extPkCol =
23,328✔
1650
      (pHandle->bSortPk) ? taosArrayGet(pHandle->pDataBlock->pDataBlock, pkOrder->slotId) : NULL;
23,328✔
1651

1652
  SColumnInfoData pkCol = {0};
23,328✔
1653
  SSDataBlock*    pSortInput = NULL;
23,328✔
1654
  int32_t         code = createDataBlock(&pSortInput);
23,328✔
1655
  if (code) {
23,335!
1656
    return code;
×
1657
  }
1658

1659
  SColumnInfoData tsCol = createColumnInfoData(TSDB_DATA_TYPE_TIMESTAMP, 8, 1);
23,335✔
1660
  code = blockDataAppendColInfo(pSortInput, &tsCol);
23,336✔
1661
  if (code) {
23,334!
1662
    blockDataDestroy(pSortInput);
×
1663
    return code;
×
1664
  }
1665

1666
  SColumnInfoData regionIdCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 2);
23,334✔
1667
  code = blockDataAppendColInfo(pSortInput, &regionIdCol);
23,336✔
1668
  if (code) {
23,332!
1669
    blockDataDestroy(pSortInput);
×
1670
    return code;
×
1671
  }
1672

1673
  SColumnInfoData offsetCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 3);
23,332✔
1674
  code = blockDataAppendColInfo(pSortInput, &offsetCol);
23,334✔
1675
  if (code) {
23,336!
1676
    blockDataDestroy(pSortInput);
×
1677
    return code;
×
1678
  }
1679

1680
  SColumnInfoData lengthCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 4);
23,336✔
1681
  code = blockDataAppendColInfo(pSortInput, &lengthCol);
23,337✔
1682
  if (code) {
23,330!
1683
    blockDataDestroy(pSortInput);
×
1684
    return code;
×
1685
  }
1686

1687
  if (pHandle->bSortPk) {
23,330✔
1688
    pkCol = createColumnInfoData(extPkCol->info.type, extPkCol->info.bytes, 5);
11,704✔
1689
    code = blockDataAppendColInfo(pSortInput, &pkCol);
11,703✔
1690
    if (code) {
11,703!
1691
      blockDataDestroy(pSortInput);
×
1692
      return code;
×
1693
    }
1694
  }
1695

1696
  blockDataDestroy(pHandle->pDataBlock);
23,329✔
1697
  pHandle->pDataBlock = pSortInput;
23,325✔
1698

1699
  //  int32_t  rowSize = blockDataGetRowSize(pHandle->pDataBlock);
1700
  //  size_t nCols = taosArrayGetSize(pHandle->pDataBlock->pDataBlock);
1701
  pHandle->pageSize = 256 * 1024;  // 256k
23,325✔
1702
  pHandle->numOfPages = 256;
23,325✔
1703

1704
  SArray* pOrderInfoList = taosArrayInit(1, sizeof(SBlockOrderInfo));
23,325✔
1705
  if (pOrderInfoList == NULL) {
23,333!
1706
    return terrno;
×
1707
  }
1708

1709
  int32_t tsOrder = ((SBlockOrderInfo*)taosArrayGet(pHandle->pSortInfo, 0))->order;
23,333✔
1710

1711
  SBlockOrderInfo biTs = {0};
23,335✔
1712
  biTs.order = tsOrder;
23,335✔
1713
  biTs.slotId = 0;
23,335✔
1714
  biTs.nullFirst = (biTs.order == TSDB_ORDER_ASC);
23,335✔
1715
  biTs.compFn = getKeyComparFunc(TSDB_DATA_TYPE_TIMESTAMP, biTs.order);
23,335✔
1716
  void* p = taosArrayPush(pOrderInfoList, &biTs);
23,332✔
1717
  if (p == NULL) {
23,332!
1718
    taosArrayDestroy(pOrderInfoList);
×
1719
    return terrno;
×
1720
  }
1721

1722
  if (pHandle->bSortPk) {
23,332✔
1723
    SBlockOrderInfo biPk = {0};
11,702✔
1724
    biPk.order = pkOrder->order;
11,702✔
1725
    biPk.slotId = 4;
11,702✔
1726
    biPk.nullFirst = (biPk.order == TSDB_ORDER_ASC);
11,702✔
1727
    biPk.compFn = getKeyComparFunc(pkCol.info.type, biPk.order);
11,702✔
1728

1729
    void* px = taosArrayPush(pOrderInfoList, &biPk);
11,701✔
1730
    if (px == NULL) {
11,701!
1731
      taosArrayDestroy(pOrderInfoList);
×
1732
      return terrno;
×
1733
    }
1734
  }
1735

1736
  taosArrayDestroy(pHandle->pSortInfo);
23,331✔
1737
  pHandle->pSortInfo = pOrderInfoList;
23,329✔
1738
  pHandle->cmpParam.pPkOrder = (pHandle->bSortPk) ? taosArrayGet(pHandle->pSortInfo, 1) : NULL;
23,329✔
1739
  return TSDB_CODE_SUCCESS;
23,328✔
1740
}
1741

1742
int32_t tsortSetSortByRowId(SSortHandle* pHandle, int32_t extRowsMemSize) {
23,323✔
1743
  pHandle->extRowBytes = blockDataGetRowSize(pHandle->pDataBlock) + taosArrayGetSize(pHandle->pDataBlock->pDataBlock) + sizeof(int32_t);
23,323✔
1744
  pHandle->extRowsMemSize = extRowsMemSize;
23,328✔
1745
  pHandle->aExtRowsOrders = taosArrayDup(pHandle->pSortInfo, NULL);
23,328✔
1746
  if (pHandle->aExtRowsOrders == NULL) {
23,333!
1747
    return terrno;
×
1748
  }
1749

1750
  int32_t code = initRowIdSort(pHandle);
23,333✔
1751
  if (code) {
23,327!
1752
    return code;
×
1753
  }
1754

1755
  if (!osTempSpaceAvailable()) {
23,327!
1756
    qError("create sort mem file failed since %s, tempDir:%s", terrstr(), tsTempDir);
×
1757
    return TSDB_CODE_NO_DISKSPACE;
×
1758
  }
1759

1760
  code = createSortMemFile(pHandle);
23,320✔
1761
  pHandle->bSortByRowId = true;
23,334✔
1762
  return code;
23,334✔
1763
}
1764

1765
typedef struct SBlkMergeSupport {
1766
  int64_t** aTs;
1767
  int32_t* aRowIdx;
1768
  int32_t tsOrder;
1769

1770
  SBlockOrderInfo* pPkOrder;
1771
  SSDataBlock** aBlks;
1772
} SBlkMergeSupport;
1773

1774
static int32_t blockCompareTsFn(const void* pLeft, const void* pRight, void* param) {
172,466,595✔
1775
  int32_t left = *(int32_t*)pLeft;
172,466,595✔
1776
  int32_t right = *(int32_t*)pRight;
172,466,595✔
1777

1778
  SBlkMergeSupport* pSup = (SBlkMergeSupport*)param;
172,466,595✔
1779
  if (pSup->aRowIdx[left] == -1) {
172,466,595✔
1780
    return 1;
10,790,938✔
1781
  } else if (pSup->aRowIdx[right] == -1) {
161,675,657✔
1782
    return -1;
248,842✔
1783
  }
1784

1785
  int64_t leftTs = pSup->aTs[left][pSup->aRowIdx[left]];
161,426,815✔
1786
  int64_t rightTs = pSup->aTs[right][pSup->aRowIdx[right]];
161,426,815✔
1787

1788
  int32_t ret = leftTs>rightTs ? 1 : ((leftTs < rightTs) ? -1 : 0);
161,426,815✔
1789
  if (pSup->tsOrder == TSDB_ORDER_DESC) {
161,426,815✔
1790
    ret = -1 * ret;
58,706,013✔
1791
  }
1792
  return ret;
161,426,815✔
1793
}
1794

1795
static int32_t blockCompareTsPkFn(const void* pLeft, const void* pRight, void* param) {
1,871,604✔
1796
  int32_t left = *(int32_t*)pLeft;
1,871,604✔
1797
  int32_t right = *(int32_t*)pRight;
1,871,604✔
1798

1799
  SBlkMergeSupport* pSup = (SBlkMergeSupport*)param;
1,871,604✔
1800
  if (pSup->aRowIdx[left] == -1) {
1,871,604✔
1801
    return 1;
59,202✔
1802
  } else if (pSup->aRowIdx[right] == -1) {
1,812,402✔
1803
    return -1;
41,505✔
1804
  }
1805

1806
  int64_t leftTs = pSup->aTs[left][pSup->aRowIdx[left]];
1,770,897✔
1807
  int64_t rightTs = pSup->aTs[right][pSup->aRowIdx[right]];
1,770,897✔
1808

1809
  int32_t ret = leftTs>rightTs ? 1 : ((leftTs < rightTs) ? -1 : 0);
1,770,897✔
1810
  if (pSup->tsOrder == TSDB_ORDER_DESC) {
1,770,897✔
1811
    ret = -1 * ret;
338,268✔
1812
  }
1813
  if (ret == 0 && pSup->pPkOrder) {
1,770,897!
1814
    ret = tsortComparBlockCell(pSup->aBlks[left], pSup->aBlks[right], pSup->aRowIdx[left], pSup->aRowIdx[right], pSup->pPkOrder);
1,465,840✔
1815
  }
1816
  return ret;  
1,778,431✔
1817
}
1818

1819
static int32_t appendDataBlockToPageBuf(SSortHandle* pHandle, SSDataBlock* blk, SArray* aPgId) {
197,627✔
1820
  int32_t pageId = -1;
197,627✔
1821
  void*   pPage = getNewBufPage(pHandle->pBuf, &pageId);
197,627✔
1822
  if (pPage == NULL) {
197,693!
1823
    return terrno;
×
1824
  }
1825

1826
  void* px = taosArrayPush(aPgId, &pageId);
197,696✔
1827
  if (px == NULL) {
197,696!
1828
    return terrno;
×
1829
  }
1830

1831
  int32_t size = blockDataGetSize(blk) + sizeof(int32_t) + taosArrayGetSize(blk->pDataBlock) * sizeof(int32_t);
197,696✔
1832
  if (size > getBufPageSize(pHandle->pBuf)) {
197,609!
1833
    qError("sort failed at: %s:%d", __func__, __LINE__);
×
1834
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1835
  }
1836
  
1837
  int32_t code = blockDataToBuf(pPage, blk);
197,605✔
1838

1839
  setBufPageDirty(pPage, true);
197,681✔
1840
  releaseBufPage(pHandle->pBuf, pPage);
197,700✔
1841

1842
  return code;
197,684✔
1843
}
1844

1845
static int32_t getPageBufIncForRow(SSDataBlock* pSrcBlock, int32_t srcRowIndex, int32_t dstRowIndex) {
42,688,539✔
1846
  int32_t size = 0;
42,688,539✔
1847
  int32_t numCols = taosArrayGetSize(pSrcBlock->pDataBlock);
42,688,539✔
1848

1849
  if (!pSrcBlock->info.hasVarCol) {
41,316,372✔
1850
    size += numCols * ((dstRowIndex & 0x7) == 0 ? 1: 0);
37,298,401✔
1851
    size += blockDataGetRowSize(pSrcBlock);
37,298,401✔
1852
  } else {
1853
    for (int32_t i = 0; i < numCols; ++i) {
23,998,538✔
1854
      SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pSrcBlock->pDataBlock, i);
19,980,567✔
1855
      if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
19,980,567!
1856
        if ((pColInfoData->varmeta.offset[srcRowIndex] != -1) && (pColInfoData->pData)) {
4,452,698!
1857
          char* p = colDataGetData(pColInfoData, srcRowIndex);
3,894,532!
1858

1859
          if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
3,894,532✔
1860
            size += getJsonValueLen(p);
76✔
1861
          } else {
1862
            size += varDataTLen(p);
3,894,456✔
1863
          }
1864
        }
1865

1866
        size += sizeof(pColInfoData->varmeta.offset[0]);
4,452,698✔
1867
      } else {
1868
        size += pColInfoData->info.bytes;
15,527,869✔
1869

1870
        if (((dstRowIndex) & 0x07) == 0) {
15,527,869✔
1871
          size += 1; // bitmap
2,157,117✔
1872
        }
1873
      }
1874
    }    
1875
  }
1876

1877
  return size;
41,309,241✔
1878
}
1879

1880
static int32_t getPageBufIncForRowIdSort(SSDataBlock* pDstBlock, int32_t srcRowIndex, int32_t dstRowIndex,
3,136,946✔
1881
                                         SColumnInfoData* pPkCol) {
1882
  int32_t size = 0;
3,136,946✔
1883
  int32_t numOfCols = blockDataGetNumOfCols(pDstBlock);
3,136,946✔
1884

1885
  if (pPkCol == NULL) { // no var column
3,137,250✔
1886
    if (!((numOfCols == 4) && (!pDstBlock->info.hasVarCol))) {
2,831,021✔
1887
      qError("sort failed at: %s:%d", __func__, __LINE__);
18!
1888
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1889
    }
1890

1891
    size += numOfCols * ((dstRowIndex & 0x7) == 0 ? 1: 0);
2,831,003✔
1892
    size += blockDataGetRowSize(pDstBlock);
2,831,003✔
1893
  } else {
1894
    if (numOfCols != 5) {
306,229!
1895
      qError("sort failed at: %s:%d", __func__, __LINE__);
×
1896
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1897
    }
1898

1899
    size += (numOfCols - 1) * (((dstRowIndex & 0x7) == 0)? 1:0);
306,229✔
1900
    for(int32_t i = 0; i < numOfCols - 1; ++i) {
1,530,354✔
1901
      SColumnInfoData* pColInfo = TARRAY_GET_ELEM(pDstBlock->pDataBlock, i);
1,224,125✔
1902
      size += pColInfo->info.bytes;
1,224,125✔
1903
    }
1904

1905
    // handle the pk column, the last column, may be the var char column
1906
    if (IS_VAR_DATA_TYPE(pPkCol->info.type)) {
306,229!
1907
      if ((pPkCol->varmeta.offset[srcRowIndex] != -1) && (pPkCol->pData)) {
107,999!
1908
        char* p = colDataGetData(pPkCol, srcRowIndex);
108,519!
1909
        size += varDataTLen(p);
108,519✔
1910
      }
1911

1912
      size += sizeof(pPkCol->varmeta.offset[0]);
107,999✔
1913
    } else {
1914
      size += pPkCol->info.bytes;
198,230✔
1915
      if (((dstRowIndex) & 0x07) == 0) {
198,230✔
1916
        size += 1; // bitmap
25,905✔
1917
      }
1918
    }
1919
  }
1920

1921
  return size;
3,137,277✔
1922
}
1923

1924
static int32_t getBufIncForNewRow(SSortHandle* pHandle, int32_t dstRowIndex, SSDataBlock* pSrcBlock,
45,665,143✔
1925
                                  int32_t srcRowIndex) {
1926
  int32_t inc = 0;
45,665,143✔
1927

1928
  if (pHandle->bSortByRowId) {
45,665,143✔
1929
    SColumnInfoData* pPkCol = NULL;
3,138,765✔
1930

1931
    // there may be varchar column exists, so we need to get the pk info, and then calculate the row length
1932
    if (pHandle->bSortPk) {
3,138,765✔
1933
      SBlockOrderInfo* extRowsPkOrder = taosArrayGet(pHandle->aExtRowsOrders, 1);
307,908✔
1934
      pPkCol = taosArrayGet(pSrcBlock->pDataBlock, extRowsPkOrder->slotId);
306,598✔
1935
    }
1936

1937
    inc = getPageBufIncForRowIdSort(pHandle->pDataBlock, srcRowIndex, dstRowIndex, pPkCol);
3,136,844✔
1938
  } else {
1939
    inc = getPageBufIncForRow(pSrcBlock, srcRowIndex, dstRowIndex);
42,526,378✔
1940
  }
1941

1942
  return inc;
44,367,477✔
1943
}
1944

1945
static int32_t initMergeSup(SBlkMergeSupport* pSup, SArray* pBlockList, int32_t tsOrder, int32_t tsSlotId, SBlockOrderInfo* pPkOrderInfo) {
64,945✔
1946
  int32_t code = TSDB_CODE_SUCCESS;
64,945✔
1947
  int32_t lino = 0;
64,945✔
1948
  memset(pSup, 0, sizeof(SBlkMergeSupport));
64,945✔
1949

1950
  int32_t numOfBlocks = taosArrayGetSize(pBlockList);
64,945✔
1951

1952
  pSup->aRowIdx = taosMemoryCalloc(numOfBlocks, sizeof(int32_t));
64,945✔
1953
  QUERY_CHECK_NULL(pSup->aRowIdx, code, lino, _end, terrno);
64,965!
1954

1955
  pSup->aTs = taosMemoryCalloc(numOfBlocks, sizeof(int64_t*));
64,965✔
1956
  QUERY_CHECK_NULL(pSup->aTs, code, lino, _end, terrno);
64,973!
1957

1958
  pSup->tsOrder = tsOrder;
64,973✔
1959
  pSup->aBlks = taosMemoryCalloc(numOfBlocks, sizeof(SSDataBlock*));
64,973✔
1960
  QUERY_CHECK_NULL(pSup->aBlks, code, lino, _end, terrno);
64,967!
1961

1962
  for (int32_t i = 0; i < numOfBlocks; ++i) {
478,869✔
1963
    SSDataBlock*     pBlock = taosArrayGetP(pBlockList, i);
413,906✔
1964
    SColumnInfoData* col = taosArrayGet(pBlock->pDataBlock, tsSlotId);
414,028✔
1965
    pSup->aTs[i] = (int64_t*)col->pData;
413,899✔
1966
    pSup->aRowIdx[i] = 0;
413,899✔
1967
    pSup->aBlks[i] = pBlock;
413,899✔
1968
  }
1969

1970
  pSup->pPkOrder = pPkOrderInfo;
64,963✔
1971

1972
_end:
64,963✔
1973
  if (code != TSDB_CODE_SUCCESS) {
64,963!
1974
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1975
  }
1976
  return code;
64,964✔
1977
}
1978

1979
static void cleanupMergeSup(SBlkMergeSupport* pSup) {
64,968✔
1980
  taosMemoryFree(pSup->aRowIdx);
64,968✔
1981
  taosMemoryFree(pSup->aTs);
64,972✔
1982
  taosMemoryFree(pSup->aBlks);
64,970✔
1983
}
64,974✔
1984

1985
static int32_t getTotalRows(SArray* pBlockList) {
64,959✔
1986
  int32_t totalRows = 0;
64,959✔
1987

1988
  for (int32_t i = 0; i < taosArrayGetSize(pBlockList); ++i) {
478,073✔
1989
    SSDataBlock* blk = taosArrayGetP(pBlockList, i);
412,870✔
1990
    totalRows += blk->info.rows;
413,114✔
1991
  }
1992

1993
  return totalRows;
64,945✔
1994
}
1995

1996
static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* aExtSrc) {
64,958✔
1997
  int32_t code = TSDB_CODE_SUCCESS;
64,958✔
1998
  int32_t pageHeaderSize = sizeof(int32_t) + sizeof(int32_t) * blockDataGetNumOfCols(pHandle->pDataBlock);
64,958✔
1999
  int32_t rowCap = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize, pageHeaderSize);
64,960✔
2000
  if (rowCap < 0) {
64,970!
2001
    return terrno;
×
2002
  }
2003
  
2004
  code = blockDataEnsureCapacity(pHandle->pDataBlock, rowCap);
64,970✔
2005
  if (code) {
64,973!
2006
    return code;
×
2007
  }
2008

2009
  blockDataCleanup(pHandle->pDataBlock);
64,973✔
2010
  SBlkMergeSupport sup = {0};
64,966✔
2011

2012
  SBlockOrderInfo* pOrigBlockTsOrder =
129,922✔
2013
      (!pHandle->bSortByRowId) ? taosArrayGet(pHandle->pSortInfo, 0) : taosArrayGet(pHandle->aExtRowsOrders, 0);
64,966✔
2014
  if (pOrigBlockTsOrder == NULL) {
64,956!
2015
    return terrno;
×
2016
  }
2017

2018
  SBlockOrderInfo* pHandleBlockTsOrder = taosArrayGet(pHandle->pSortInfo, 0);
64,956✔
2019
  if (pHandleBlockTsOrder == NULL) {
64,958!
2020
    return terrno;
×
2021
  }
2022

2023
  SBlockOrderInfo* pOrigBlockPkOrder = NULL;
64,958✔
2024
  if (pHandle->bSortPk) {
64,958✔
2025
    pOrigBlockPkOrder =
32,407✔
2026
        (!pHandle->bSortByRowId) ? taosArrayGet(pHandle->pSortInfo, 1) : taosArrayGet(pHandle->aExtRowsOrders, 1);
16,205✔
2027
    if (pOrigBlockPkOrder == NULL) {
16,202!
2028
      return terrno;
×
2029
    }
2030
  }
2031

2032
  code = initMergeSup(&sup, aBlk, pOrigBlockTsOrder->order, pOrigBlockTsOrder->slotId, pOrigBlockPkOrder);
64,955✔
2033
  if (code) {
64,965!
2034
    return code;
×
2035
  }
2036

2037
  int32_t totalRows = getTotalRows(aBlk);
64,965✔
2038

2039
  SMultiwayMergeTreeInfo* pTree = NULL;
64,943✔
2040
  __merge_compare_fn_t    mergeCompareFn = (!pHandle->bSortPk) ? blockCompareTsFn : blockCompareTsPkFn;
64,943✔
2041

2042
  code = tMergeTreeCreate(&pTree, taosArrayGetSize(aBlk), &sup, mergeCompareFn);
64,943✔
2043
  if (TSDB_CODE_SUCCESS != code) {
64,960!
2044
    cleanupMergeSup(&sup);
×
2045
    return code;
×
2046
  }
2047

2048
  SArray* aPgId = taosArrayInit(8, sizeof(int32_t));
64,960✔
2049
  if (aPgId == NULL) {
64,971!
2050
    goto _error;
×
2051
  }
2052

2053
  int32_t nRows = 0;
64,971✔
2054
  int32_t nMergedRows = 0;
64,971✔
2055
  bool    mergeLimitReached = false;
64,971✔
2056
  size_t  blkPgSz = pageHeaderSize;
64,971✔
2057
  int64_t lastPageBufTs = (pHandleBlockTsOrder->order == TSDB_ORDER_ASC) ? INT64_MAX : INT64_MIN;
64,971✔
2058

2059
  while (nRows < totalRows) {
46,667,752!
2060
    int32_t      minIdx = tMergeTreeGetChosenIndex(pTree);
46,801,765✔
2061
    SSDataBlock* minBlk = taosArrayGetP(aBlk, minIdx);
46,801,765✔
2062
    int32_t      minRow = sup.aRowIdx[minIdx];
45,527,967✔
2063

2064
    int32_t bufInc = getBufIncForNewRow(pHandle, pHandle->pDataBlock->info.rows, minBlk, minRow);
45,527,967✔
2065

2066
    if (blkPgSz <= pHandle->pageSize && blkPgSz + bufInc > pHandle->pageSize) {
44,121,100!
2067
      SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, pHandleBlockTsOrder->slotId);
134,471✔
2068
      lastPageBufTs = ((int64_t*)tsCol->pData)[pHandle->pDataBlock->info.rows - 1];
134,462✔
2069
      code = appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId);
134,462✔
2070
      if (code != TSDB_CODE_SUCCESS) {
134,508!
2071
        goto _error;
×
2072
      }
2073

2074
      nMergedRows += pHandle->pDataBlock->info.rows;
134,508✔
2075
      blockDataCleanup(pHandle->pDataBlock);
134,508✔
2076
      blkPgSz = pageHeaderSize;
134,457✔
2077

2078
      bufInc = getBufIncForNewRow(pHandle, 0, minBlk, minRow);
134,457✔
2079

2080
      if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
134,454✔
2081
        mergeLimitReached = true;
1,790✔
2082
        if ((lastPageBufTs < pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_ASC) ||
1,790!
2083
            (lastPageBufTs > pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_DESC)) {
1,017!
2084
          pHandle->currMergeLimitTs = lastPageBufTs;
1,790✔
2085
        }
2086

2087
        break;
1,790✔
2088
      }
2089
    }
2090

2091
    code = blockDataEnsureCapacity(pHandle->pDataBlock, pHandle->pDataBlock->info.rows + 1);
44,119,293✔
2092
    if (code) {
44,386,828!
2093
      goto _error;
×
2094
    }
2095

2096
    if (pHandle->bSortByRowId) {
44,386,828✔
2097
      code = appendToRowIndexDataBlock(pHandle, minBlk, &minRow);
3,137,729✔
2098
    } else {
2099
      code = appendOneRowToDataBlock(pHandle->pDataBlock, minBlk, &minRow);
41,249,099✔
2100
    }
2101

2102
    if (code) {
45,390,809!
2103
      goto _error;
×
2104
    }
2105

2106
    blkPgSz += bufInc;
45,390,809✔
2107
    if (blkPgSz != blockDataGetSize(pHandle->pDataBlock) + pageHeaderSize) {
45,390,809!
2108
      qError("sort failed at: %s:%d", __func__, __LINE__);
×
2109
      goto _error;
×
2110
    }
2111

2112
    ++nRows;
43,465,575✔
2113

2114
    if (sup.aRowIdx[minIdx] == minBlk->info.rows - 1) {
43,465,575✔
2115
      sup.aRowIdx[minIdx] = -1;
352,306✔
2116
    } else {
2117
      ++sup.aRowIdx[minIdx];
43,113,269✔
2118
    }
2119
    code = tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
43,465,575✔
2120
    if (TSDB_CODE_SUCCESS != code) {
46,602,781!
2121
      goto _error;
×
2122
    }
2123
  }
2124

2125
  if (pHandle->pDataBlock->info.rows > 0) {
×
2126
    if (!mergeLimitReached) {
63,177✔
2127
      SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, pHandleBlockTsOrder->slotId);
63,175✔
2128
      lastPageBufTs = ((int64_t*)tsCol->pData)[pHandle->pDataBlock->info.rows - 1];
63,174✔
2129
      code = appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId);
63,174✔
2130
      if (code != TSDB_CODE_SUCCESS) {
63,171!
2131
        goto _error;
×
2132
      }
2133
      nMergedRows += pHandle->pDataBlock->info.rows;
63,171✔
2134
      if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
63,171✔
2135
        mergeLimitReached = true;
1,998✔
2136
        if ((lastPageBufTs < pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_ASC) ||
1,998!
2137
            (lastPageBufTs > pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_DESC)) {
1,484!
2138
          pHandle->currMergeLimitTs = lastPageBufTs;
1,998✔
2139
        }
2140
      }
2141
    }
2142
    blockDataCleanup(pHandle->pDataBlock);
63,173✔
2143
  }
2144

2145
  SSDataBlock* pMemSrcBlk = NULL;
×
2146
  code = createOneDataBlock(pHandle->pDataBlock, false, &pMemSrcBlk);
×
2147
  if (code) goto _error;
64,965!
2148

2149
  code = doAddNewExternalMemSource(pHandle->pBuf, aExtSrc, pMemSrcBlk, &pHandle->sourceId, aPgId);
64,965✔
2150
  if (code != TSDB_CODE_SUCCESS) {
64,969!
2151
    blockDataDestroy(pMemSrcBlk);
×
2152
    goto _error;
×
2153
  }
2154

2155
  cleanupMergeSup(&sup);
64,969✔
2156
  tMergeTreeDestroy(&pTree);
64,970✔
2157

2158
  return code;
64,977✔
2159

2160
_error:
×
2161
  tMergeTreeDestroy(&pTree);
×
2162
  cleanupMergeSup(&sup);
×
2163
  if (aPgId) taosArrayDestroy(aPgId);
×
2164
  return code;
×
2165
}
2166

2167
static int32_t getRowsBlockWithinMergeLimit(const SSortHandle* pHandle, SSHashObj* mTableNumRows, SSDataBlock* pOrigBlk,
36,732✔
2168
                                            bool* pExtractedBlock, bool* pSkipBlock, SSDataBlock** pRes) {
2169
  int64_t nRows = 0;
36,732✔
2170
  int64_t prevRows = 0;
36,732✔
2171
  int32_t code = 0;
36,732✔
2172

2173
  *pRes = NULL;
36,732✔
2174

2175
  void*   pNum = tSimpleHashGet(mTableNumRows, &pOrigBlk->info.id.uid, sizeof(pOrigBlk->info.id.uid));
36,732✔
2176
  if (pNum == NULL) {
36,737✔
2177
    prevRows = 0;
36,649✔
2178
    nRows = pOrigBlk->info.rows;
36,649✔
2179
    code = tSimpleHashPut(mTableNumRows, &pOrigBlk->info.id.uid, sizeof(pOrigBlk->info.id.uid), &nRows, sizeof(nRows));
36,649✔
2180
    if (code) {
36,662!
2181
      return code;
×
2182
    }
2183
  } else {
2184
    prevRows = *(int64_t*)pNum;
88✔
2185
    *(int64_t*)pNum = *(int64_t*)pNum + pOrigBlk->info.rows;
88✔
2186
    nRows = *(int64_t*)pNum;
88✔
2187
  }
2188

2189
  int64_t keepRows = pOrigBlk->info.rows;
36,750✔
2190
  if (nRows >= pHandle->mergeLimit) {
36,750✔
2191
    if (pHandle->mergeLimitReachedFn) {
33,122!
2192
      pHandle->mergeLimitReachedFn(pOrigBlk->info.id.uid, pHandle->mergeLimitReachedParam);
33,122✔
2193
    }
2194
    keepRows = pHandle->mergeLimit > prevRows ? (pHandle->mergeLimit - prevRows) : 0;
33,137!
2195
  }
2196
 
2197
  if (keepRows == 0) {
36,765!
2198
    *pSkipBlock = true;
×
2199
    *pRes = pOrigBlk;
×
2200
  }
2201

2202
  *pSkipBlock = false;
36,765✔
2203
  SSDataBlock* pBlock = NULL;
36,765✔
2204
  if (keepRows != pOrigBlk->info.rows) {
36,765✔
2205
    code = blockDataExtractBlock(pOrigBlk, 0, keepRows, &pBlock);
32,998✔
2206
    if (code) {
32,981!
2207
      return code;
×
2208
    }
2209

2210
    *pExtractedBlock = true;
32,981✔
2211
  } else {
2212
    *pExtractedBlock = false;
3,767✔
2213
    pBlock = pOrigBlk;
3,767✔
2214
  }
2215

2216
  *pRes = pBlock;
36,748✔
2217
  return code;
36,748✔
2218
}
2219

2220
static void freeHelp(void* param) {
415,881✔
2221
  SSDataBlock** ptr = param;
415,881✔
2222
  if (*ptr != NULL) {
415,881!
2223
    blockDataDestroy(*ptr);
415,885✔
2224
  }
2225
}
415,913✔
2226

2227
static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
74,868✔
2228
  int32_t          szSort = 0;
74,868✔
2229
  int32_t          code = 0;
74,868✔
2230
  int32_t          lino = 0;
74,868✔
2231
  size_t           nSrc = taosArrayGetSize(pHandle->pOrderedSource);
74,868✔
2232
  SArray*          aExtSrc = NULL;
74,878✔
2233
  SArray*          aBlkSort = NULL;
74,878✔
2234
  SSHashObj*       mTableNumRows = NULL;
74,878✔
2235
  SSHashObj*       mUidBlk = NULL;
74,878✔
2236
  SBlockOrderInfo* pOrigTsOrder = NULL;
74,878✔
2237

2238
  aExtSrc = taosArrayInit(nSrc, POINTER_BYTES);
74,878✔
2239
  QUERY_CHECK_NULL(aExtSrc, code, lino, _err, terrno);
74,928!
2240

2241
  mTableNumRows = tSimpleHashInit(8192, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT));
74,928✔
2242
  QUERY_CHECK_NULL(mTableNumRows, code, lino, _err, terrno);
74,915!
2243

2244
  aBlkSort = taosArrayInit(8, POINTER_BYTES);
74,915✔
2245
  QUERY_CHECK_NULL(aBlkSort, code, lino, _err, terrno);
74,930!
2246

2247
  mUidBlk = tSimpleHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT));
74,930✔
2248
  QUERY_CHECK_NULL(mUidBlk, code, lino, _err, terrno);
74,912!
2249

2250
  size_t maxBufSize = (pHandle->bSortByRowId) ? pHandle->extRowsMemSize : (pHandle->numOfPages * pHandle->pageSize);
74,912✔
2251
  code = createPageBuf(pHandle);
74,912✔
2252
  QUERY_CHECK_CODE(code, lino, _err);
74,904!
2253

2254
  SSortSource* pSrc = taosArrayGetP(pHandle->pOrderedSource, 0);
74,904✔
2255
  QUERY_CHECK_NULL(pSrc, code, lino, _err, terrno);
74,905!
2256

2257
  pOrigTsOrder =
149,808✔
2258
      (!pHandle->bSortByRowId) ? taosArrayGet(pHandle->pSortInfo, 0) : taosArrayGet(pHandle->aExtRowsOrders, 0);
74,910✔
2259
  QUERY_CHECK_NULL(pOrigTsOrder, code, lino, _err, terrno);
74,898!
2260

2261
  pHandle->currMergeLimitTs = (pOrigTsOrder->order == TSDB_ORDER_ASC)? INT64_MAX:INT64_MIN;
74,898✔
2262

2263
  while (1) {
417,460✔
2264
    bool         bExtractedBlock = false;
492,358✔
2265
    bool         bSkipBlock = false;
492,358✔
2266
    SSDataBlock* pBlk = NULL;
492,358✔
2267

2268
    code = pHandle->fetchfp(pSrc->param, &pBlk);
492,358✔
2269
    QUERY_CHECK_CODE(code, lino, _err);
491,486!
2270

2271
    if (pBlk != NULL && pHandle->mergeLimit > 0) {
491,486✔
2272
      SSDataBlock* p = NULL;
36,748✔
2273
      code = getRowsBlockWithinMergeLimit(pHandle, mTableNumRows, pBlk, &bExtractedBlock, &bSkipBlock, &p);
36,748✔
2274
      if (bSkipBlock || code != 0) {
36,739!
2275
        continue;
×
2276
      }
2277

2278
      pBlk = p;
36,742✔
2279
    }
2280

2281
    if (pBlk != NULL) {
491,480✔
2282
      SColumnInfoData* tsCol = taosArrayGet(pBlk->pDataBlock, pOrigTsOrder->slotId);
416,815✔
2283
      QUERY_CHECK_NULL(tsCol, code, lino, _err, terrno);
416,427!
2284

2285
      int64_t firstRowTs = *(int64_t*)tsCol->pData;
416,427✔
2286
      if ((pOrigTsOrder->order == TSDB_ORDER_ASC && firstRowTs > pHandle->currMergeLimitTs) ||
416,427!
2287
          (pOrigTsOrder->order == TSDB_ORDER_DESC && firstRowTs < pHandle->currMergeLimitTs)) {
416,429!
UNCOV
2288
        if (bExtractedBlock) {
×
2289
          blockDataDestroy(pBlk);
×
2290
        }
2291
        continue;
×
2292
      }
2293
    }
2294

2295
    if (pBlk != NULL) {
491,094✔
2296
      szSort += blockDataGetSize(pBlk);
416,539✔
2297
      void* ppBlk = tSimpleHashGet(mUidBlk, &pBlk->info.id.uid, sizeof(pBlk->info.id.uid));
416,185✔
2298
      if (ppBlk != NULL) {
416,685✔
2299
        SSDataBlock* tBlk = *(SSDataBlock**)(ppBlk);
1,613✔
2300
        code = blockDataMerge(tBlk, pBlk);
1,613✔
2301
        QUERY_CHECK_CODE(code, lino, _err);
1,613!
2302

2303
        if (bExtractedBlock) {
1,613✔
2304
          blockDataDestroy(pBlk);
1✔
2305
        }
2306
      } else {
2307
        SSDataBlock* tBlk = NULL;
415,072✔
2308
        if (bExtractedBlock) {
415,072✔
2309
          tBlk = pBlk;
32,977✔
2310
        } else {
2311
          code = createOneDataBlock(pBlk, true, &tBlk);
382,095✔
2312
          QUERY_CHECK_CODE(code, lino, _err);
381,815!
2313
        }
2314

2315
        code = tSimpleHashPut(mUidBlk, &pBlk->info.id.uid, sizeof(pBlk->info.id.uid), &tBlk, POINTER_BYTES);
414,792✔
2316
        if (code != TSDB_CODE_SUCCESS) {
415,314!
2317
          blockDataDestroy(tBlk);
×
2318
        }
2319
        QUERY_CHECK_CODE(code, lino, _err);
415,352!
2320

2321
        void* px = taosArrayPush(aBlkSort, &tBlk);
415,285✔
2322
        if (px == NULL) {
415,285!
2323
          blockDataDestroy(tBlk);
×
2324
        }
2325
        QUERY_CHECK_NULL(px, code, lino, _err, terrno);
415,299!
2326
      }
2327
    }
2328

2329
    if ((pBlk != NULL && szSort > maxBufSize) || (pBlk == NULL && szSort > 0)) {
491,467!
2330
      tSimpleHashClear(mUidBlk);
64,939✔
2331

2332
      int64_t p = taosGetTimestampUs();
64,966✔
2333
      if (pHandle->bSortByRowId) {
64,966✔
2334
        code = tsortOpenRegion(pHandle);
18,872✔
2335
        QUERY_CHECK_CODE(code, lino, _err);
18,872!
2336
      }
2337

2338
      code = sortBlocksToExtSource(pHandle, aBlkSort, aExtSrc);
64,966✔
2339
      QUERY_CHECK_CODE(code, lino, _err);
64,971!
2340

2341
      if (pHandle->bSortByRowId) {
64,971✔
2342
        code = tsortCloseRegion(pHandle);  // ignore this error code
18,876✔
2343
      }
2344

2345
      int64_t el = taosGetTimestampUs() - p;
64,965✔
2346
      pHandle->sortElapsed += el;
64,965✔
2347
      taosArrayClearEx(aBlkSort, freeHelp);
64,965✔
2348

2349
      szSort = 0;
64,974✔
2350
      qDebug("%s source %zu created", pHandle->idStr, taosArrayGetSize(aExtSrc));
64,974✔
2351
    }
2352

2353
    if (pBlk == NULL) {
491,704✔
2354
      break;
74,916✔
2355
    }
2356

2357
    if (tsortIsClosed(pHandle)) {
416,788!
2358
      break;
×
2359
    }
2360
  }
2361

2362
  tsortClearOrderedSource(pHandle->pOrderedSource, NULL, NULL);
74,916✔
2363
  if (!tsortIsClosed(pHandle)) {
74,920✔
2364
    void* px = taosArrayAddAll(pHandle->pOrderedSource, aExtSrc);
74,929✔
2365
    QUERY_CHECK_NULL(px, code, lino, _err, terrno);
74,920!
2366
  }
2367

2368
  if (pHandle->bSortByRowId) {
74,923✔
2369
    code = tsortFinalizeRegions(pHandle);
23,334✔
2370
  }
2371

2372
  pHandle->type = SORT_SINGLESOURCE_SORT;
74,919✔
2373

2374
  _err:
74,919✔
2375
  if (code) {
74,919!
2376
    qError("%s %s failed at line %d since %s", pHandle->idStr, __func__, lino, tstrerror(code));
×
2377
  }
2378

2379
  if (aExtSrc) {
74,919✔
2380
    taosArrayDestroy(aExtSrc);
74,911✔
2381
  }
2382
  if (aBlkSort) {
74,932✔
2383
    taosArrayDestroyEx(aBlkSort, freeHelp);
74,925✔
2384
  }
2385
  if (mTableNumRows) {
74,932✔
2386
    tSimpleHashCleanup(mTableNumRows);
74,925✔
2387
  }
2388
  if (mUidBlk) {
74,933✔
2389
    tSimpleHashCleanup(mUidBlk);
74,928✔
2390
  }
2391
  return code;
74,928✔
2392
}
2393

2394
static void freeSortSource(void* p) {
178,511✔
2395
  SSortSource** pSource = (SSortSource**)p;
178,511✔
2396
  if (NULL == pSource || NULL == *pSource) {
178,511!
2397
    return;
×
2398
  }
2399

2400
  if ((*pSource)->pageIdList) {
178,520!
2401
    taosArrayDestroy((*pSource)->pageIdList);
×
2402
  }
2403

2404
  if (!(*pSource)->onlyRef) {
178,520✔
2405
    if ((*pSource)->param) {
180!
2406
      taosMemoryFree((*pSource)->param);
180✔
2407
    }
2408
    if ((*pSource)->src.pBlock) {
180!
2409
      blockDataDestroy((*pSource)->src.pBlock);
×
2410
    }
2411
  }
2412

2413
  taosMemoryFreeClear(*pSource);
178,520✔
2414
}
2415

2416
static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) {
178,462✔
2417
  int32_t       code = 0;
178,462✔
2418
  int32_t       lino = 0;
178,462✔
2419
  size_t        sortBufSize = pHandle->numOfPages * pHandle->pageSize;
178,462✔
2420
  SSortSource** p = taosArrayGet(pHandle->pOrderedSource, 0);
178,462✔
2421
  if (p == NULL) {
178,457!
2422
    return terrno;
×
2423
  }
2424

2425
  SSortSource* pSource = *p;
178,457✔
2426
  size_t       origSourceCount = taosArrayGetSize(pHandle->pOrderedSource);
178,457✔
2427

2428
  while (1) {
409,389✔
2429
    SSDataBlock* pBlock = NULL;
587,866✔
2430
    code = pHandle->fetchfp(pSource->param, &pBlock);
587,866✔
2431
    QUERY_CHECK_CODE(code, lino, _end);
587,715!
2432

2433
    if (pBlock == NULL) {
587,715✔
2434
      break;
178,508✔
2435
    }
2436

2437
    if (pHandle->pDataBlock == NULL) {
409,207✔
2438
      uint32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
154,372✔
2439
      pHandle->pageSize = getProperSortPageSize(blockDataGetRowSize(pBlock), numOfCols);
154,375✔
2440

2441
      // todo, number of pages are set according to the total available sort buffer
2442
      pHandle->numOfPages = 1024;
154,379✔
2443
      sortBufSize = pHandle->numOfPages * pHandle->pageSize;
154,379✔
2444
      code = createOneDataBlock(pBlock, false, &pHandle->pDataBlock);
154,379✔
2445
      QUERY_CHECK_CODE(code, lino, _end);
154,369!
2446
    }
2447

2448
    if (pHandle->beforeFp != NULL) {
409,204!
2449
      pHandle->beforeFp(pBlock, pHandle->param);
409,262✔
2450
    }
2451

2452
    code = blockDataMerge(pHandle->pDataBlock, pBlock);
409,215✔
2453
    QUERY_CHECK_CODE(code, lino, _end);
409,566!
2454

2455
    size_t size = blockDataGetSize(pHandle->pDataBlock);
409,566✔
2456
    if (size > sortBufSize) {
409,412✔
2457
      // Perform the in-memory sort and then flush data in the buffer into disk.
2458
      int64_t st = taosGetTimestampUs();
87✔
2459
      code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
87✔
2460
      QUERY_CHECK_CODE(code, lino, _end);
87!
2461

2462
      pHandle->sortElapsed += (taosGetTimestampUs() - st);
87✔
2463

2464
      if (pHandle->pqMaxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->pqMaxRows);
87!
2465
      code = doAddToBuf(pHandle->pDataBlock, pHandle);
87✔
2466
      QUERY_CHECK_CODE(code, lino, _end);
87!
2467
    }
2468
  }
2469

2470
  if (pHandle->pDataBlock != NULL && pHandle->pDataBlock->info.rows > 0) {
178,508!
2471
    size_t size = blockDataGetSize(pHandle->pDataBlock);
154,369✔
2472

2473
    // Perform the in-memory sort and then flush data in the buffer into disk.
2474
    int64_t st = taosGetTimestampUs();
154,377✔
2475
    code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
154,377✔
2476
    QUERY_CHECK_CODE(code, lino, _end);
154,378!
2477

2478
    if (pHandle->pqMaxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->pqMaxRows);
154,378✔
2479
    pHandle->sortElapsed += (taosGetTimestampUs() - st);
154,377✔
2480

2481
    // All sorted data can fit in memory, external memory sort is not needed. Return to directly
2482
    if (size <= sortBufSize && pHandle->pBuf == NULL) {
154,377!
2483
      pHandle->cmpParam.numOfSources = 1;
154,315✔
2484
      pHandle->inMemSort = true;
154,315✔
2485

2486
      pHandle->loops = 1;
154,315✔
2487
      pHandle->tupleHandle.rowIndex = -1;
154,315✔
2488
      pHandle->tupleHandle.pBlock = pHandle->pDataBlock;
154,315✔
2489
    } else {
2490
      code = doAddToBuf(pHandle->pDataBlock, pHandle);
62✔
2491
    }
2492
  }
2493

2494
_end:
24,139✔
2495
  if (code != TSDB_CODE_SUCCESS) {
178,522!
2496
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2497
  }
2498
  taosArrayRemoveBatch(pHandle->pOrderedSource, 0, origSourceCount, freeSortSource);
178,522✔
2499
  return code;
178,510✔
2500
}
2501

2502
static int32_t createInitialSources(SSortHandle* pHandle) {
343,299✔
2503
  int32_t code = 0;
343,299✔
2504

2505
  if (pHandle->type == SORT_SINGLESOURCE_SORT) {
343,299✔
2506
    code = createBlocksQuickSortInitialSources(pHandle);
178,461✔
2507
  } else if (pHandle->type == SORT_BLOCK_TS_MERGE) {
164,838✔
2508
    code = createBlocksMergeSortInitialSources(pHandle);
74,892✔
2509
  }
2510

2511
  qDebug("%s %zu sources created", pHandle->idStr, taosArrayGetSize(pHandle->pOrderedSource));
343,358✔
2512
  return code;
343,417✔
2513
}
2514

2515
static int32_t tsortOpenForBufMergeSort(SSortHandle* pHandle) {
343,345✔
2516
  int32_t code = createInitialSources(pHandle);
343,345✔
2517
  if (code != TSDB_CODE_SUCCESS) {
343,408!
2518
    return code;
×
2519
  }
2520

2521
  // do internal sort
2522
  code = doInternalMergeSort(pHandle);
343,408✔
2523
  if (code != TSDB_CODE_SUCCESS) {
343,418!
2524
    return code;
×
2525
  }
2526

2527
  int32_t numOfSources = taosArrayGetSize(pHandle->pOrderedSource);
343,418✔
2528
  if (pHandle->pBuf != NULL) {
343,412✔
2529
    if (numOfSources > getNumOfInMemBufPages(pHandle->pBuf)) {
74,975✔
2530
      qError("sort failed at: %s:%d", __func__, __LINE__);
2!
2531
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2532
    }
2533
  }
2534

2535
  if (numOfSources == 0) {
343,413✔
2536
    return 0;
188,396✔
2537
  }
2538

2539
  code = sortComparInit(&pHandle->cmpParam, pHandle->pOrderedSource, 0, numOfSources - 1, pHandle);
155,017✔
2540
  if (code != TSDB_CODE_SUCCESS) {
155,014!
2541
    return code;
×
2542
  }
2543

2544
  code = tMergeTreeCreate(&pHandle->pMergeTree, pHandle->cmpParam.numOfSources, &pHandle->cmpParam, pHandle->comparFn);
155,014✔
2545
  return code;
155,028✔
2546
}
2547

2548
void tsortClose(SSortHandle* pHandle) {
534,307✔
2549
  (void) atomic_val_compare_exchange_8(&pHandle->closed, 0, 1);
534,307✔
2550
}
534,535✔
2551

2552
bool tsortIsClosed(SSortHandle* pHandle) {
195,509,078✔
2553
  return atomic_val_compare_exchange_8(&pHandle->closed, 1, 2);
195,509,078✔
2554
}
2555

2556
void tsortSetClosed(SSortHandle* pHandle) {
×
2557
  atomic_store_8(&pHandle->closed, 2);
×
2558
}
×
2559

2560
void tsortSetMergeLimit(SSortHandle* pHandle, int64_t mergeLimit) {
236,975✔
2561
  pHandle->mergeLimit = mergeLimit;
236,975✔
2562
}
236,975✔
2563

2564
void tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fetchFp, void (*fp)(SSDataBlock*, void*),
533,972✔
2565
                               void* param) {
2566
  pHandle->fetchfp = fetchFp;
533,972✔
2567
  pHandle->beforeFp = fp;
533,972✔
2568
  pHandle->param = param;
533,972✔
2569
}
533,972✔
2570

2571
void tsortSetComparFp(SSortHandle* pHandle, _sort_merge_compar_fn_t fp) {
562,668✔
2572
  if (pHandle) {
562,668!
2573
    pHandle->comparFn = fp;
562,841✔
2574
  }
2575
}
562,668✔
2576

2577
void tsortSetCompareGroupId(SSortHandle* pHandle, bool compareGroupId) {
89,986✔
2578
  if (pHandle) {
89,986!
2579
    pHandle->cmpParam.cmpGroupId = compareGroupId;
89,987✔
2580
  }
2581
}
89,986✔
2582

2583
static int32_t tsortBufMergeSortNextTuple(SSortHandle* pHandle, STupleHandle** pTupleHandle) {
193,915,811✔
2584
  *pTupleHandle = NULL;
193,915,811✔
2585
  int32_t code = 0;
193,915,811✔
2586

2587
  if (tsortIsClosed(pHandle)) {
193,915,811!
2588
    return code;
×
2589
  }
2590

2591
  if (pHandle->cmpParam.numOfSources == pHandle->numOfCompletedSources) {
197,138,232✔
2592
    return code;
298,799✔
2593
  }
2594

2595
  // All the data are hold in the buffer, no external sort is invoked.
2596
  if (pHandle->inMemSort) {
196,839,433✔
2597
    pHandle->tupleHandle.rowIndex += 1;
48,506,714✔
2598
    if (pHandle->tupleHandle.rowIndex == pHandle->pDataBlock->info.rows) {
48,506,714✔
2599
      pHandle->numOfCompletedSources = 1;
154,247✔
2600
      return code;
154,247✔
2601
    }
2602

2603
    *pTupleHandle = &pHandle->tupleHandle;
48,352,467✔
2604
    return code;
48,352,467✔
2605
  }
2606

2607
  int32_t      index = tMergeTreeGetChosenIndex(pHandle->pMergeTree);
148,332,719✔
2608
  SSortSource* pSource = pHandle->cmpParam.pSources[index];
148,332,719✔
2609

2610
  if (pHandle->needAdjust) {
148,332,719!
2611
    code = adjustMergeTreeForNextTuple(pSource, pHandle->pMergeTree, pHandle, &pHandle->numOfCompletedSources);
148,678,087✔
2612
    if (code != TSDB_CODE_SUCCESS) {
146,580,077!
2613
      return code;
×
2614
    }
2615
  }
2616

2617
  // all sources are completed.
2618
  if (pHandle->cmpParam.numOfSources == pHandle->numOfCompletedSources) {
146,234,709✔
2619
    return code;
147,354✔
2620
  }
2621

2622
  // Get the adjusted value after the loser tree is updated.
2623
  index = tMergeTreeGetChosenIndex(pHandle->pMergeTree);
146,087,355✔
2624
  pSource = pHandle->cmpParam.pSources[index];
146,087,355✔
2625

2626
  if (pSource->src.pBlock == NULL) {
146,087,355!
2627
    qError("sort failed at: %s:%d", __func__, __LINE__);
×
2628
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2629
  }
2630

2631
  pHandle->tupleHandle.rowIndex = pSource->src.rowIndex;
146,087,355✔
2632
  pHandle->tupleHandle.pBlock = pSource->src.pBlock;
146,087,355✔
2633

2634
  pHandle->needAdjust = true;
146,087,355✔
2635
  pSource->src.rowIndex += 1;
146,087,355✔
2636

2637
  *pTupleHandle = &pHandle->tupleHandle;
146,087,355✔
2638
  return code;
146,087,355✔
2639
}
2640

2641
static bool tsortIsForceUsePQSort(SSortHandle* pHandle) {
207,104✔
2642
  return pHandle->forceUsePQSort == true;
207,104✔
2643
}
2644

2645
void tsortSetForceUsePQSort(SSortHandle* pHandle) {
×
2646
  pHandle->forceUsePQSort = true;
×
2647
}
×
2648

2649
static bool tsortIsPQSortApplicable(SSortHandle* pHandle) {
371,933✔
2650
  if (pHandle->type != SORT_SINGLESOURCE_SORT) return false;
371,933✔
2651
  if (tsortIsForceUsePQSort(pHandle)) return true;
207,049!
2652
  uint64_t maxRowsFitInMemory = pHandle->pqSortBufSize / (pHandle->pqMaxTupleLength + sizeof(char*));
207,107✔
2653
  return maxRowsFitInMemory > pHandle->pqMaxRows;
207,107✔
2654
}
2655

2656
static bool tsortPQCompFn(void* a, void* b, void* param) {
10,882,176✔
2657
  SSortHandle* pHandle = param;
10,882,176✔
2658
  int32_t res = pHandle->comparFn(a, b, param);
10,882,176✔
2659
  if (res < 0) return 1;
10,880,517✔
2660
  return 0;
3,131,591✔
2661
}
2662

2663
static bool tsortPQComFnReverse(void*a, void* b, void* param) {
4,883,916✔
2664
  SSortHandle* pHandle = param;
4,883,916✔
2665
  int32_t res = pHandle->comparFn(a, b, param);
4,883,916✔
2666
  if (res > 0) return 1;
4,883,897✔
2667
  return 0;
1,418,089✔
2668
}
2669

2670
static int32_t tupleComparFn(const void* pLeft, const void* pRight, void* param) {
15,765,669✔
2671
  TupleDesc* pLeftDesc = (TupleDesc*)pLeft;
15,765,669✔
2672
  TupleDesc* pRightDesc = (TupleDesc*)pRight;
15,765,669✔
2673

2674
  SSortHandle* pHandle = (SSortHandle*)param;
15,765,669✔
2675
  SArray* orderInfo = (SArray*)pHandle->pSortInfo;
15,765,669✔
2676
  uint32_t colNum = blockDataGetNumOfCols(pHandle->pDataBlock);
15,765,669✔
2677
  for (int32_t i = 0; i < orderInfo->size; ++i) {
25,180,690✔
2678
    SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(orderInfo, i);
24,990,720✔
2679
    void *lData = NULL, *rData = NULL;
24,990,720✔
2680

2681
    int32_t ret1 = tupleDescGetField(pLeftDesc, pOrder->slotId, colNum, &lData);
24,990,720✔
2682
    int32_t ret2 = tupleDescGetField(pRightDesc, pOrder->slotId, colNum, &rData);
24,989,762✔
2683
    if (ret1) {
24,989,182!
2684
      return ret1;
15,573,504✔
2685
    }
2686

2687
    if (ret2) {
24,989,182!
2688
      return ret2;
×
2689
    }
2690

2691
    if ((!lData) && (!rData)) {
24,989,182✔
2692
      continue;
9,414,965✔
2693
    }
2694

2695
    if (!lData) return pOrder->nullFirst ? -1 : 1;
24,988,357✔
2696
    if (!rData) return pOrder->nullFirst ? 1 : -1;
24,987,818✔
2697

2698
    SColumnInfoData* p = (SColumnInfoData*)taosArrayGet(pHandle->pDataBlock->pDataBlock, pOrder->slotId);
24,986,627✔
2699
    if (p == NULL) {
24,986,345!
2700
      return terrno;
×
2701
    }
2702

2703
    __compar_fn_t fn = getKeyComparFunc(p->info.type, pOrder->order);
24,986,345✔
2704

2705
    int32_t ret = fn(lData, rData);
24,986,239✔
2706
    if (ret == 0) {
24,985,914✔
2707
      continue;
9,414,140✔
2708
    } else {
2709
      return ret;
15,571,774✔
2710
    }
2711
  }
2712

2713
  return 0;
189,970✔
2714
}
2715

2716
static int32_t tsortOpenForPQSort(SSortHandle* pHandle) {
28,612✔
2717
  pHandle->pBoundedQueue = createBoundedQueue(pHandle->pqMaxRows, tsortPQCompFn, destroyTuple, pHandle);
28,612✔
2718
  if (NULL == pHandle->pBoundedQueue) {
28,645!
2719
    return TSDB_CODE_OUT_OF_MEMORY;
×
2720
  }
2721

2722
  tsortSetComparFp(pHandle, tupleComparFn);
28,645✔
2723

2724
  SSortSource** pSource = taosArrayGet(pHandle->pOrderedSource, 0);
28,657✔
2725
  if (pSource == NULL) {
28,648!
2726
    return terrno;
×
2727
  }
2728

2729
  SSortSource*      source = *pSource;
28,648✔
2730
  uint32_t          tupleLen = 0;
28,648✔
2731
  PriorityQueueNode pqNode;
2732
  pHandle->pDataBlock = NULL;
28,648✔
2733

2734
  while (1) {
28,238✔
2735
    // fetch data
2736
    SSDataBlock* pBlock = NULL;
56,886✔
2737
    TAOS_CHECK_RETURN(pHandle->fetchfp(source->param, &pBlock));
56,886!
2738
    if (NULL == pBlock) {
56,948✔
2739
      break;
28,669✔
2740
    }
2741

2742
    if (pHandle->beforeFp != NULL) {
28,279!
2743
      pHandle->beforeFp(pBlock, pHandle->param);
28,279✔
2744
    }
2745

2746
    if (pHandle->pDataBlock == NULL) {
28,279✔
2747
      int32_t code = createOneDataBlock(pBlock, false, &pHandle->pDataBlock);
16,783✔
2748
      if (code) {
16,783!
2749
        return code;
×
2750
      }
2751
    }
2752

2753
    size_t colNum = blockDataGetNumOfCols(pBlock);
28,279✔
2754

2755
    if (tupleLen == 0) {
28,279✔
2756
      for (size_t colIdx = 0; colIdx < colNum; ++colIdx) {
84,639✔
2757
        SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, colIdx);
67,856✔
2758
        if (pCol == NULL) {
67,856!
2759
          return terrno;
×
2760
        }
2761

2762
        tupleLen += pCol->info.bytes;
67,856✔
2763
        if (IS_VAR_DATA_TYPE(pCol->info.type)) {
67,856!
2764
          tupleLen += sizeof(VarDataLenT);
17,384✔
2765
        }
2766
      }
2767
    }
2768

2769
    ReferencedTuple refTuple = {.desc.data = (char*)pBlock, .desc.type = ReferencedTupleType, .rowIndex = 0};
28,279✔
2770
    for (size_t rowIdx = 0; rowIdx < pBlock->info.rows; ++rowIdx) {
5,714,121✔
2771
      refTuple.rowIndex = rowIdx;
5,685,883✔
2772
      pqNode.data = &refTuple;
5,685,883✔
2773
      PriorityQueueNode* pPushedNode = taosBQPush(pHandle->pBoundedQueue, &pqNode);
5,685,883✔
2774
      if (!pPushedNode) {
5,684,982✔
2775
        // do nothing if push failed
2776
      } else {
2777
        pPushedNode->data = NULL;
864,402✔
2778
        int32_t code = createAllocatedTuple(pBlock, colNum, tupleLen, rowIdx, (TupleDesc**)&pPushedNode->data);
864,402✔
2779
        if (code) {
865,262!
2780
          return code;
×
2781
        }
2782
      }
2783
    }
2784
  }
2785

2786
  return TSDB_CODE_SUCCESS;
28,669✔
2787
}
2788

2789
static int32_t tsortPQSortNextTuple(SSortHandle* pHandle, STupleHandle **pTupleHandle) {
295,961✔
2790
  int32_t code = 0;
295,961✔
2791

2792
  *pTupleHandle = NULL;
295,961✔
2793
  if (pHandle->pDataBlock == NULL) { // when no input stream datablock
295,961!
2794
    return code;
×
2795
  }
2796

2797
  blockDataCleanup(pHandle->pDataBlock);
295,961✔
2798
  code = blockDataEnsureCapacity(pHandle->pDataBlock, 1);
295,962✔
2799
  if (code) {
295,962!
2800
    return code;
×
2801
  }
2802

2803
  // abandon the top tuple if queue size bigger than max size
2804
  if (taosBQSize(pHandle->pBoundedQueue) == taosBQMaxSize(pHandle->pBoundedQueue) + 1) {
295,962✔
2805
    taosBQPop(pHandle->pBoundedQueue);
13,701✔
2806
  }
2807
  if (pHandle->tmpRowIdx == 0) {
295,962✔
2808
    // sort the results
2809
    taosBQSetFn(pHandle->pBoundedQueue, tsortPQComFnReverse);
16,783✔
2810
    taosBQBuildHeap(pHandle->pBoundedQueue);
16,783✔
2811
  }
2812
  if (taosBQSize(pHandle->pBoundedQueue) > 0) {
295,962✔
2813
    uint32_t           colNum = blockDataGetNumOfCols(pHandle->pDataBlock);
267,918✔
2814
    PriorityQueueNode* node = taosBQTop(pHandle->pBoundedQueue);
267,918✔
2815
    char*              pTuple = ((TupleDesc*)node->data)->data;
267,918✔
2816

2817
    for (uint32_t i = 0; i < colNum; ++i) {
1,886,582✔
2818
      void* pData = tupleGetField(pTuple, i, colNum);
1,618,663✔
2819

2820
      SColumnInfoData* p = NULL;
1,618,661✔
2821
      TAOS_CHECK_RETURN(bdGetColumnInfoData(pHandle->pDataBlock, i, &p));
1,618,661!
2822

2823
      if (!pData) {
1,618,661✔
2824
        colDataSetNULL(p, 0);
16,085✔
2825
      } else {
2826
        TAOS_CHECK_RETURN(colDataSetVal(p, 0, pData, false));
1,602,576✔
2827
      }
2828
    }
2829
    pHandle->pDataBlock->info.rows++;
267,919✔
2830
    pHandle->tmpRowIdx++;
267,919✔
2831
    taosBQPop(pHandle->pBoundedQueue);
267,919✔
2832
  }
2833

2834
  if (pHandle->pDataBlock->info.rows == 0) {
295,964✔
2835
    return code;
28,045✔
2836
  }
2837

2838
  pHandle->tupleHandle.pBlock = pHandle->pDataBlock;
267,919✔
2839
  *pTupleHandle = &pHandle->tupleHandle;
267,919✔
2840
  return code;
267,919✔
2841
}
2842

2843
static int32_t tsortSingleTableMergeNextTuple(SSortHandle* pHandle, STupleHandle** pTupleHandle) {
9,600,372✔
2844
  *pTupleHandle = NULL;
9,600,372✔
2845
  int32_t code = 0;
9,600,372✔
2846

2847
  if (1 == pHandle->numOfCompletedSources) {
9,600,372✔
2848
    return code;
77,823✔
2849
  }
2850

2851
  if (pHandle->tupleHandle.pBlock && pHandle->tupleHandle.rowIndex + 1 < pHandle->tupleHandle.pBlock->info.rows) {
9,522,549✔
2852
    pHandle->tupleHandle.rowIndex++;
9,298,565✔
2853
  } else {
2854
    if (pHandle->tupleHandle.rowIndex == -1) {
223,984!
2855
      return code;
162,302✔
2856
    }
2857

2858
    SSortSource** pSource = taosArrayGet(pHandle->pOrderedSource, 0);
223,984✔
2859
    if (pSource == NULL) {
226,116!
2860
      return terrno;
×
2861
    }
2862

2863
    SSortSource*  source = *pSource;
226,116✔
2864
    SSDataBlock*  pBlock = NULL;
226,116✔
2865
    TAOS_CHECK_RETURN(pHandle->fetchfp(source->param, &pBlock));
226,116!
2866
    
2867
    if (!pBlock || pBlock->info.rows == 0) {
226,273✔
2868
      setCurrentSourceDone(source, pHandle);
162,285✔
2869
      pHandle->tupleHandle.pBlock = NULL;
162,302✔
2870
      return code;
162,302✔
2871
    }
2872

2873
    pHandle->tupleHandle.pBlock = pBlock;
63,988✔
2874
    pHandle->tupleHandle.rowIndex = 0;
63,988✔
2875
  }
2876

2877
  *pTupleHandle = &pHandle->tupleHandle;
9,362,553✔
2878
  return code;
9,362,553✔
2879
}
2880

2881
int32_t tsortOpen(SSortHandle* pHandle) {
371,941✔
2882
  int32_t code = 0;
371,941✔
2883
  if (pHandle->opened) {
371,941!
2884
    return code;
×
2885
  }
2886

2887
  if (pHandle == NULL || pHandle->fetchfp == NULL || pHandle->comparFn == NULL) {
371,941!
2888
    return TSDB_CODE_INVALID_PARA;
×
2889
  }
2890

2891
  pHandle->opened = true;
372,037✔
2892
  if (tsortIsPQSortApplicable(pHandle)) {
372,037✔
2893
    code = tsortOpenForPQSort(pHandle);
28,665✔
2894
  } else {
2895
    code = tsortOpenForBufMergeSort(pHandle);
343,383✔
2896
  }
2897

2898
  return code;
372,055✔
2899
}
2900

2901
int32_t tsortNextTuple(SSortHandle* pHandle, STupleHandle** pTupleHandle) {
204,279,704✔
2902
  int32_t code = 0;
204,279,704✔
2903

2904
  if (pHandle->singleTableMerge) {
204,279,704✔
2905
    code = tsortSingleTableMergeNextTuple(pHandle, pTupleHandle);
9,601,059✔
2906
  } else if (pHandle->pBoundedQueue) {
194,678,645✔
2907
    code = tsortPQSortNextTuple(pHandle, pTupleHandle);
295,961✔
2908
  } else {
2909
    code = tsortBufMergeSortNextTuple(pHandle, pTupleHandle);
194,382,684✔
2910
  }
2911

2912
  return code;
204,610,196✔
2913
}
2914

2915
bool tsortIsNullVal(STupleHandle* pVHandle, int32_t colIndex) {
691,388,887✔
2916
  SColumnInfoData* pColInfoSrc = taosArrayGet(pVHandle->pBlock->pDataBlock, colIndex);
691,388,887✔
2917
  if (pColInfoSrc == NULL) {
684,144,791!
2918
    return true;
×
2919
  }
2920

2921
  return colDataIsNull_s(pColInfoSrc, pVHandle->rowIndex);
1,368,289,582✔
2922
}
2923

2924
void tsortGetValue(STupleHandle* pVHandle, int32_t colIndex, void** pVal) {
653,487,463✔
2925
  *pVal = NULL;
653,487,463✔
2926
  SColumnInfoData* pColInfo = TARRAY_GET_ELEM(pVHandle->pBlock->pDataBlock, colIndex);
653,487,463✔
2927
  if (pColInfo->pData != NULL) {
653,487,463!
2928
    *pVal = colDataGetData(pColInfo, pVHandle->rowIndex);
654,103,471!
2929
  }
2930
}
653,487,463✔
2931

2932
uint64_t tsortGetGroupId(STupleHandle* pVHandle) { return pVHandle->pBlock->info.id.groupId; }
75,175,647✔
2933
void tsortGetBlockInfo(STupleHandle* pVHandle, SDataBlockInfo* pBlockInfo) { *pBlockInfo = pVHandle->pBlock->info; }
206,390,169✔
2934

2935
SSortExecInfo tsortGetSortExecInfo(SSortHandle* pHandle) {
238,185✔
2936
  SSortExecInfo info = {0};
238,185✔
2937

2938
  if (pHandle == NULL) {
238,185!
2939
    info.sortMethod = SORT_QSORT_T;  // by default
×
2940
    info.sortBuffer = 2 * 1048576;   // 2mb by default
×
2941
  } else {
2942
    info.sortBuffer = pHandle->pageSize * pHandle->numOfPages;
238,185✔
2943
    info.sortMethod = pHandle->inMemSort ? SORT_QSORT_T : SORT_SPILLED_MERGE_SORT_T;
238,185✔
2944
    info.loops = pHandle->loops;
238,185✔
2945

2946
    if (pHandle->pBuf != NULL) {
238,185✔
2947
      SDiskbasedBufStatis st = getDBufStatis(pHandle->pBuf);
75,579✔
2948
      info.writeBytes = st.flushBytes;
75,556✔
2949
      info.readBytes = st.loadBytes;
75,556✔
2950
    }
2951
  }
2952

2953
  return info;
238,162✔
2954
}
2955

2956
int32_t tsortCompAndBuildKeys(const SArray* pSortCols, char* keyBuf, int32_t* keyLen,
21,374,957✔
2957
                              const STupleHandle* pTuple) {
2958
  int32_t ret;
2959
  if (0 == compKeys(pSortCols, keyBuf, *keyLen, pTuple->pBlock, pTuple->rowIndex)) {
21,374,957✔
2960
    ret = 0;
20,836,923✔
2961
  } else {
2962
    *keyLen = buildKeys(keyBuf, pSortCols, pTuple->pBlock, pTuple->rowIndex);
472,979✔
2963
    ret = 1;
482,495✔
2964
  }
2965
  return ret;
21,319,418✔
2966
}
2967

2968
void tsortSetMergeLimitReachedFp(SSortHandle* pHandle, void (*mergeLimitReachedCb)(uint64_t tableUid, void* param), void* param) {
236,784✔
2969
  pHandle->mergeLimitReachedFn = mergeLimitReachedCb;
236,784✔
2970
  pHandle->mergeLimitReachedParam = param;
236,784✔
2971
}
236,784✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc