• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.34
/source/dnode/vnode/src/tsdb/tsdbMergeTree.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tsdb.h"
17
#include "tsdbFSet2.h"
18
#include "tsdbMerge.h"
19
#include "tsdbReadUtil.h"
20
#include "tsdbSttFileRW.h"
21
#include "tsdbUtil2.h"
22

23
static void tLDataIterClose2(SLDataIter *pIter);
24

25
// SLDataIter =================================================
26
int32_t tCreateSttBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols, SSttBlockLoadInfo **pInfo) {
5,024,636✔
27
  *pInfo = NULL;
5,024,636✔
28

29
  SSttBlockLoadInfo *pLoadInfo = taosMemoryCalloc(1, sizeof(SSttBlockLoadInfo));
5,024,636!
30
  if (pLoadInfo == NULL) {
5,026,648!
31
    return terrno;
×
32
  }
33

34
  pLoadInfo->blockData[0].sttBlockIndex = -1;
5,026,648✔
35
  pLoadInfo->blockData[1].sttBlockIndex = -1;
5,026,648✔
36

37
  pLoadInfo->currentLoadBlockIndex = 1;
5,026,648✔
38

39
  int32_t code = tBlockDataCreate(&pLoadInfo->blockData[0].data);
5,026,648✔
40
  if (code) {
5,028,158!
41
    taosMemoryFreeClear(pLoadInfo);
×
42
    return code;
×
43
  }
44

45
  code = tBlockDataCreate(&pLoadInfo->blockData[1].data);
5,028,158✔
46
  if (code) {
5,027,038!
47
    taosMemoryFreeClear(pLoadInfo);
×
48
    return code;
×
49
  }
50

51
  pLoadInfo->aSttBlk = taosArrayInit(4, sizeof(SSttBlk));
5,027,038✔
52
  if (pLoadInfo->aSttBlk == NULL) {
5,027,346✔
53
    taosMemoryFreeClear(pLoadInfo);
359!
54
    return terrno;
359✔
55
  }
56

57
  pLoadInfo->pSchema = pSchema;
5,026,987✔
58
  pLoadInfo->colIds = colList;
5,026,987✔
59
  pLoadInfo->numOfCols = numOfCols;
5,026,987✔
60

61
  *pInfo = pLoadInfo;
5,026,987✔
62
  return code;
5,026,987✔
63
}
64

65
static void freeItem(void *pValue) {
48,955,633✔
66
  SValue *p = (SValue *)pValue;
48,955,633✔
67
  if (IS_VAR_DATA_TYPE(p->type) || p->type == TSDB_DATA_TYPE_DECIMAL) {
48,955,633!
68
    taosMemoryFree(p->pData);
×
69
  }
70
}
48,971,708✔
71

72
void destroySttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
5,030,817✔
73
  if (pLoadInfo == NULL) {
5,030,817!
74
    return;
×
75
  }
76

77
  pLoadInfo->currentLoadBlockIndex = 1;
5,030,817✔
78

79
  SBlockDataInfo *pInfo = &pLoadInfo->blockData[0];
5,030,817✔
80
  tBlockDataDestroy(&pInfo->data);
5,030,817✔
81
  pInfo->sttBlockIndex = -1;
5,030,960✔
82
  pInfo->pin = false;
5,030,960✔
83

84
  pInfo = &pLoadInfo->blockData[1];
5,030,960✔
85
  tBlockDataDestroy(&pInfo->data);
5,030,960✔
86
  pInfo->sttBlockIndex = -1;
5,030,972✔
87
  pInfo->pin = false;
5,030,972✔
88

89
  taosArrayDestroy(pLoadInfo->info.pUid);
5,030,972✔
90
  taosArrayDestroyEx(pLoadInfo->info.pFirstKey, freeItem);
5,030,694✔
91
  taosArrayDestroyEx(pLoadInfo->info.pLastKey, freeItem);
5,030,933✔
92
  taosArrayDestroy(pLoadInfo->info.pCount);
5,030,963✔
93
  taosArrayDestroy(pLoadInfo->info.pFirstTs);
5,030,922✔
94
  taosArrayDestroy(pLoadInfo->info.pLastTs);
5,030,972✔
95

96
  pLoadInfo->info.pUid = NULL;
5,030,998✔
97
  pLoadInfo->info.pFirstKey = NULL;
5,030,998✔
98
  pLoadInfo->info.pLastKey = NULL;
5,030,998✔
99
  pLoadInfo->info.pCount = NULL;
5,030,998✔
100
  pLoadInfo->info.pFirstTs = NULL;
5,030,998✔
101
  pLoadInfo->info.pLastTs = NULL;
5,030,998✔
102

103
  taosArrayDestroy(pLoadInfo->aSttBlk);
5,030,998✔
104
  taosMemoryFree(pLoadInfo);
5,031,060!
105
}
106

107
void destroyLDataIter(SLDataIter *pIter) {
5,030,263✔
108
  tLDataIterClose2(pIter);
5,030,263✔
109
  destroySttBlockLoadInfo(pIter->pBlockLoadInfo);
5,031,128✔
110
  taosMemoryFree(pIter);
5,031,042!
111
}
5,031,178✔
112

113
void destroySttBlockReader(SArray *pLDataIterArray, SSttBlockLoadCostInfo *pLoadCost) {
10,500,644✔
114
  if (pLDataIterArray == NULL) {
10,500,644✔
115
    return;
57,475✔
116
  }
117

118
  int32_t numOfLevel = taosArrayGetSize(pLDataIterArray);
10,443,169✔
119
  for (int32_t i = 0; i < numOfLevel; ++i) {
15,457,448✔
120
    SArray *pList = taosArrayGetP(pLDataIterArray, i);
5,014,937✔
121
    for (int32_t j = 0; j < taosArrayGetSize(pList); ++j) {
10,046,315✔
122
      SLDataIter *pIter = taosArrayGetP(pList, j);
5,030,410✔
123
      if (pIter->pBlockLoadInfo != NULL) {
5,030,192!
124
        SSttBlockLoadCostInfo *pCost = &pIter->pBlockLoadInfo->cost;
5,030,301✔
125
        if (pLoadCost != NULL) {
5,030,301!
126
          pLoadCost->loadBlocks += pCost->loadBlocks;
5,030,328✔
127
          pLoadCost->loadStatisBlocks += pCost->loadStatisBlocks;
5,030,328✔
128
          pLoadCost->blockElapsedTime += pCost->blockElapsedTime;
5,030,328✔
129
          pLoadCost->statisElapsedTime += pCost->statisElapsedTime;
5,030,328✔
130
        }
131
      }
132

133
      destroyLDataIter(pIter);
5,030,192✔
134
    }
135

136
    taosArrayDestroy(pList);
5,014,689✔
137
  }
138

139
  taosArrayDestroy(pLDataIterArray);
10,442,511✔
140
}
141

142
// choose the unpinned slot to load next data block
143
static void updateBlockLoadSlot(SSttBlockLoadInfo *pLoadInfo) {
3,663,095✔
144
  int32_t nextSlotIndex = pLoadInfo->currentLoadBlockIndex ^ 1;
3,663,095✔
145
  if (pLoadInfo->blockData[nextSlotIndex].pin) {
3,663,095!
146
    nextSlotIndex = nextSlotIndex ^ 1;
×
147
  }
148

149
  pLoadInfo->currentLoadBlockIndex = nextSlotIndex;
3,663,095✔
150
}
3,663,095✔
151

152
static int32_t loadLastBlock(SLDataIter *pIter, const char *idStr, SBlockData **pResBlock) {
746,131,718✔
153
  if (pResBlock != NULL) {
746,131,718!
154
    *pResBlock = NULL;
746,524,517✔
155
  }
156

157
  int32_t            code = 0;
746,131,718✔
158
  SSttBlockLoadInfo *pInfo = pIter->pBlockLoadInfo;
746,131,718✔
159

160
  if (pInfo->blockData[0].sttBlockIndex == pIter->iSttBlk) {
746,131,718✔
161
    if (pInfo->currentLoadBlockIndex != 0) {
510,859,430✔
162
      tsdbDebug("current load index is set to 0, block index:%d, fileVer:%" PRId64 ", due to uid:%" PRIu64
505✔
163
                ", load data, %s",
164
                pIter->iSttBlk, pIter->cid, pIter->uid, idStr);
165
      pInfo->currentLoadBlockIndex = 0;
505✔
166
    }
167

168
    *pResBlock = &pInfo->blockData[0].data;
510,859,430✔
169
    return code;
510,859,430✔
170
  }
171

172
  if (pInfo->blockData[1].sttBlockIndex == pIter->iSttBlk) {
235,272,288!
173
    if (pInfo->currentLoadBlockIndex != 1) {
243,919,118✔
174
      tsdbDebug("current load index is set to 1, block index:%d, fileVer:%" PRId64 ", due to uid:%" PRIu64
533✔
175
                ", load data, %s",
176
                pIter->iSttBlk, pIter->cid, pIter->uid, idStr);
177
      pInfo->currentLoadBlockIndex = 1;
533✔
178
    }
179

180
    *pResBlock = &pInfo->blockData[1].data;
243,919,118✔
181
    return code;
243,919,118✔
182
  }
183

184
  if (pIter->pSttBlk == NULL || pInfo->pSchema == NULL) {
×
185
    return code;
×
186
  }
187

188
  updateBlockLoadSlot(pInfo);
3,663,800✔
189
  int64_t st = taosGetTimestampUs();
3,664,685✔
190

191
  SBlockData *pBlock = &pInfo->blockData[pInfo->currentLoadBlockIndex].data;
3,664,685✔
192
  code = tsdbSttFileReadBlockDataByColumn(pIter->pReader, pIter->pSttBlk, pBlock, pInfo->pSchema, &pInfo->colIds[1],
3,664,685✔
193
                                          pInfo->numOfCols - 1);
3,664,685✔
194
  if (code != TSDB_CODE_SUCCESS) {
3,665,721!
195
    return code;
×
196
  }
197

198
  double el = (taosGetTimestampUs() - st) / 1000.0;
3,666,423✔
199
  pInfo->cost.blockElapsedTime += el;
3,666,423✔
200
  pInfo->cost.loadBlocks += 1;
3,666,423✔
201

202
  tsdbDebug("read stt block, total load:%" PRId64 ", trigger by uid:%" PRIu64 ", stt-fileVer:%" PRId64
3,666,423✔
203
            ", last block index:%d, entry:%d, rows:%d, uidRange:%" PRId64 "-%" PRId64 " tsRange:%" PRId64 "-%" PRId64
204
            " %p, elapsed time:%.2f ms, %s",
205
            pInfo->cost.loadBlocks, pIter->uid, pIter->cid, pIter->iSttBlk, pInfo->currentLoadBlockIndex, pBlock->nRow,
206
            pIter->pSttBlk->minUid, pIter->pSttBlk->maxUid, pIter->pSttBlk->minKey, pIter->pSttBlk->maxKey, pBlock, el,
207
            idStr);
208

209
  pInfo->blockData[pInfo->currentLoadBlockIndex].sttBlockIndex = pIter->iSttBlk;
3,665,368✔
210
  pIter->iRow = (pIter->backward) ? pInfo->blockData[pInfo->currentLoadBlockIndex].data.nRow : -1;
3,665,368✔
211

212
  tsdbDebug("last block index list:%d, %d, rowIndex:%d %s", pInfo->blockData[0].sttBlockIndex,
3,665,368✔
213
            pInfo->blockData[1].sttBlockIndex, pIter->iRow, idStr);
214

215
  *pResBlock = &pInfo->blockData[pInfo->currentLoadBlockIndex].data;
3,665,205✔
216
  return code;
3,665,205✔
217
}
218

219
// find the earliest block that contains the required records
220
static FORCE_INLINE int32_t findEarliestIndex(int32_t index, uint64_t uid, const SSttBlk *pBlockList, int32_t num,
221
                                              int32_t backward) {
222
  int32_t i = index;
4,190,097✔
223
  int32_t step = backward ? 1 : -1;
4,190,097✔
224
  while (i >= 0 && i < num && uid >= pBlockList[i].minUid && uid <= pBlockList[i].maxUid) {
8,408,263!
225
    i += step;
4,218,166✔
226
  }
227
  return i - step;
4,190,097✔
228
}
229

230
static int32_t binarySearchForStartBlock(SSttBlk *pBlockList, int32_t num, uint64_t uid, int32_t backward) {
11,168,887✔
231
  int32_t midPos = -1;
11,168,887✔
232
  if (num <= 0) {
11,168,887✔
233
    return -1;
2,012,226✔
234
  }
235

236
  int32_t firstPos = 0;
9,156,661✔
237
  int32_t lastPos = num - 1;
9,156,661✔
238

239
  // find the first position which is bigger than the key
240
  if ((uid > pBlockList[lastPos].maxUid) || (uid < pBlockList[firstPos].minUid)) {
9,156,661✔
241
    return -1;
4,966,379✔
242
  }
243

244
  while (1) {
476,641✔
245
    if (uid >= pBlockList[firstPos].minUid && uid <= pBlockList[firstPos].maxUid) {
4,666,923✔
246
      return findEarliestIndex(firstPos, uid, pBlockList, num, backward);
4,080,182✔
247
    }
248

249
    if (uid > pBlockList[lastPos].maxUid || uid < pBlockList[firstPos].minUid) {
586,741!
250
      return -1;
185✔
251
    }
252

253
    int32_t numOfRows = lastPos - firstPos + 1;
586,556✔
254
    midPos = (numOfRows >> 1u) + firstPos;
586,556✔
255

256
    if (uid < pBlockList[midPos].minUid) {
586,556✔
257
      lastPos = midPos - 1;
226,173✔
258
    } else if (uid > pBlockList[midPos].maxUid) {
360,383✔
259
      firstPos = midPos + 1;
250,468✔
260
    } else {
261
      return findEarliestIndex(midPos, uid, pBlockList, num, backward);
109,915✔
262
    }
263
  }
264
}
265

266
static FORCE_INLINE int32_t findEarliestRow(int32_t index, uint64_t uid, const uint64_t *uidList, int32_t num,
267
                                            int32_t backward) {
268
  int32_t i = index;
4,131,717✔
269
  int32_t step = backward ? 1 : -1;
4,131,717✔
270
  while (i >= 0 && i < num && uid == uidList[i]) {
76,414,430!
271
    i += step;
72,282,713✔
272
  }
273
  return i - step;
4,131,717✔
274
}
275

276
static int32_t binarySearchForStartRowIndex(uint64_t *uidList, int32_t num, uint64_t uid, int32_t backward) {
4,198,193✔
277
  int32_t firstPos = 0;
4,198,193✔
278
  int32_t lastPos = num - 1;
4,198,193✔
279

280
  // find the first position which is bigger than the key
281
  if ((uid > uidList[lastPos]) || (uid < uidList[firstPos])) {
4,198,193!
282
    return -1;
×
283
  }
284

285
  while (1) {
1,232,373✔
286
    if (uid == uidList[firstPos]) {
5,430,566✔
287
      return findEarliestRow(firstPos, uid, uidList, num, backward);
3,539,407✔
288
    }
289

290
    if (uid > uidList[lastPos] || uid < uidList[firstPos]) {
1,891,159✔
291
      return -1;
66,476✔
292
    }
293

294
    int32_t numOfRows = lastPos - firstPos + 1;
1,824,683✔
295
    int32_t midPos = (numOfRows >> 1u) + firstPos;
1,824,683✔
296

297
    if (uid < uidList[midPos]) {
1,824,683✔
298
      lastPos = midPos - 1;
522,829✔
299
    } else if (uid > uidList[midPos]) {
1,301,854✔
300
      firstPos = midPos + 1;
709,544✔
301
    } else {
302
      return findEarliestRow(midPos, uid, uidList, num, backward);
592,310✔
303
    }
304
  }
305
}
306

307
static int32_t extractSttBlockInfo(SLDataIter *pIter, const TSttBlkArray *pArray, SSttBlockLoadInfo *pBlockLoadInfo,
5,019,777✔
308
                                   uint64_t suid) {
309
  void   *px = NULL;
5,019,777✔
310
  int32_t code = TSDB_CODE_SUCCESS;
5,019,777✔
311
  if (TARRAY2_SIZE(pArray) <= 0) {
5,019,777✔
312
    return code;
8,905✔
313
  }
314

315
  SSttBlk *pStart = &pArray->data[0];
5,010,872✔
316
  SSttBlk *pEnd = &pArray->data[TARRAY2_SIZE(pArray) - 1];
5,010,872✔
317

318
  // all identical
319
  if (pStart->suid == pEnd->suid) {
5,010,872✔
320
    if (pStart->suid != suid) {  // no qualified stt block existed
2,194,205✔
321
      taosArrayClear(pBlockLoadInfo->aSttBlk);
898,328✔
322
      pIter->iSttBlk = -1;
898,220✔
323
      return TSDB_CODE_SUCCESS;
898,220✔
324
    } else {  // all blocks are qualified
325
      taosArrayClear(pBlockLoadInfo->aSttBlk);
1,295,877✔
326
      px = taosArrayAddBatch(pBlockLoadInfo->aSttBlk, pArray->data, pArray->size);
1,295,509✔
327
      if (px == NULL) {
1,295,185!
328
        return terrno;
×
329
      }
330
    }
331
  } else {
332
    SArray *pTmp = taosArrayInit(TARRAY2_SIZE(pArray), sizeof(SSttBlk));
2,816,667✔
333
    if (pTmp == NULL) {
2,820,031!
334
      return terrno;
×
335
    }
336

337
    for (int32_t i = 0; i < TARRAY2_SIZE(pArray); ++i) {
9,984,243✔
338
      SSttBlk *p = &pArray->data[i];
8,124,395✔
339
      if (p->suid < suid) {
8,124,395✔
340
        continue;
4,564,660✔
341
      }
342

343
      if (p->suid == suid) {
3,559,735✔
344
        void *px = taosArrayPush(pTmp, p);
2,600,332✔
345
        if (px == NULL) {
2,600,332!
346
          code = terrno;
×
347
          break;
×
348
        }
349
      } else if (p->suid > suid) {
959,674!
350
        break;
960,511✔
351
      }
352
    }
353

354
    taosArrayDestroy(pBlockLoadInfo->aSttBlk);
2,820,359✔
355
    pBlockLoadInfo->aSttBlk = pTmp;
2,821,348✔
356
  }
357

358
  return code;
4,116,533✔
359
}
360

361
static int32_t tValueDupPayload(SValue *pVal) {
15,199,275✔
362
  if (IS_VAR_DATA_TYPE(pVal->type) || pVal->type == TSDB_DATA_TYPE_DECIMAL) {
15,199,275!
363
    char *p = (char *)pVal->pData;
×
364
    char *pBuf = taosMemoryMalloc(pVal->nData);
×
365
    if (pBuf == NULL) {
×
366
      return terrno;
×
367
    }
368

369
    memcpy(pBuf, p, pVal->nData);
×
370
    pVal->pData = (uint8_t *)pBuf;
×
371
  }
372

373
  return TSDB_CODE_SUCCESS;
15,200,580✔
374
}
375

376
static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBlockLoadInfo *pBlockLoadInfo,
5,020,667✔
377
                                          TStatisBlkArray *pStatisBlkArray, uint64_t suid, const char *id) {
378
  int32_t code = TSDB_CODE_SUCCESS;
5,020,667✔
379
  int32_t lino = 0;
5,020,667✔
380
  void   *px = NULL;
5,020,667✔
381
  int32_t startIndex = 0;
5,020,667✔
382
  int32_t ret = 0;
5,020,667✔
383

384
  int32_t numOfBlocks = TARRAY2_SIZE(pStatisBlkArray);
5,020,667✔
385
  if (numOfBlocks <= 0) {
5,020,667✔
386
    return code;
8,905✔
387
  }
388

389
  while ((startIndex < numOfBlocks) && (pStatisBlkArray->data[startIndex].maxTbid.suid < suid)) {
5,794,946✔
390
    ++startIndex;
783,184✔
391
  }
392

393
  if (startIndex >= numOfBlocks || pStatisBlkArray->data[startIndex].minTbid.suid > suid) {
5,011,762✔
394
    return 0;
1,126,632✔
395
  }
396

397
  int32_t endIndex = startIndex;
3,885,130✔
398
  while (endIndex < numOfBlocks && pStatisBlkArray->data[endIndex].minTbid.suid <= suid) {
7,772,712✔
399
    ++endIndex;
3,887,582✔
400
  }
401

402
  int32_t num = endIndex - startIndex;
3,885,130✔
403
  pBlockLoadInfo->cost.loadStatisBlocks += num;
3,885,130✔
404

405
  STbStatisBlock block;
406
  code = tStatisBlockInit(&block);
3,885,130✔
407
  QUERY_CHECK_CODE(code, lino, _end);
3,884,194!
408

409
  int64_t st = taosGetTimestampUs();
3,884,922✔
410

411
  for (int32_t k = startIndex; k < endIndex; ++k) {
7,760,763✔
412
    code = tsdbSttFileReadStatisBlock(pSttFileReader, &pStatisBlkArray->data[k], &block);
3,886,438✔
413
    QUERY_CHECK_CODE(code, lino, _end);
3,888,272!
414

415
    int32_t i = 0;
3,888,272✔
416
    int32_t rows = block.numOfRecords;
3,888,272✔
417
    while (i < rows && ((int64_t *)block.suids.data)[i] != suid) {
8,317,758✔
418
      ++i;
4,429,486✔
419
    }
420

421
    // existed
422
    if (i < rows) {
3,888,272✔
423
      SSttTableRowsInfo *pInfo = &pBlockLoadInfo->info;
3,857,078✔
424

425
      if (pInfo->pUid == NULL) {
3,857,078✔
426
        pInfo->pUid = taosArrayInit(rows, sizeof(int64_t));
3,853,478✔
427
        pInfo->pFirstTs = taosArrayInit(rows, sizeof(int64_t));
3,854,539✔
428
        pInfo->pLastTs = taosArrayInit(rows, sizeof(int64_t));
3,854,906✔
429
        pInfo->pCount = taosArrayInit(rows, sizeof(int64_t));
3,854,972✔
430

431
        pInfo->pFirstKey = taosArrayInit(rows, sizeof(SValue));
3,855,030✔
432
        pInfo->pLastKey = taosArrayInit(rows, sizeof(SValue));
3,854,852✔
433

434
        if (pInfo->pUid == NULL || pInfo->pFirstTs == NULL || pInfo->pLastTs == NULL || pInfo->pCount == NULL ||
3,854,914!
435
            pInfo->pFirstKey == NULL || pInfo->pLastKey == NULL) {
3,854,870!
436
          code = terrno;
649✔
437
          goto _end;
×
438
        }
439
      }
440

441
      if (pStatisBlkArray->data[k].maxTbid.suid == suid) {
3,857,865✔
442
        int32_t size = rows - i;
2,933,404✔
443
        int32_t offset = i * sizeof(int64_t);
2,933,404✔
444

445
        px = taosArrayAddBatch(pInfo->pUid, tBufferGetDataAt(&block.uids, offset), size);
2,933,404✔
446
        TSDB_CHECK_NULL(px, code, lino, _end, terrno);
2,933,299!
447

448
        px = taosArrayAddBatch(pInfo->pFirstTs, tBufferGetDataAt(&block.firstKeyTimestamps, offset), size);
2,933,299✔
449
        TSDB_CHECK_NULL(px, code, lino, _end, terrno);
2,933,062!
450

451
        px = taosArrayAddBatch(pInfo->pLastTs, tBufferGetDataAt(&block.lastKeyTimestamps, offset), size);
2,933,062✔
452
        TSDB_CHECK_NULL(px, code, lino, _end, terrno);
2,932,888!
453

454
        px = taosArrayAddBatch(pInfo->pCount, tBufferGetDataAt(&block.counts, offset), size);
2,932,888✔
455
        TSDB_CHECK_NULL(px, code, lino, _end, terrno);
2,932,497✔
456

457
        if (block.numOfPKs > 0) {
2,932,230✔
458
          SValue vFirst = {0}, vLast = {0};
46,003✔
459
          for (int32_t f = i; f < rows; ++f) {
7,647,284✔
460
            code = tValueColumnGet(&block.firstKeyPKs[0], f, &vFirst);
7,601,235✔
461
            TSDB_CHECK_CODE(code, lino, _end);
7,601,246!
462

463
            code = tValueDupPayload(&vFirst);
7,601,246✔
464
            TSDB_CHECK_CODE(code, lino, _end);
7,601,633!
465

466
            px = taosArrayPush(pInfo->pFirstKey, &vFirst);
7,601,633✔
467
            TSDB_CHECK_NULL(px, code, lino, _end, terrno);
7,602,193!
468

469
            // todo add api to clone the original data
470
            code = tValueColumnGet(&block.lastKeyPKs[0], f, &vLast);
7,602,193✔
471
            TSDB_CHECK_CODE(code, lino, _end);
7,600,921!
472

473
            code = tValueDupPayload(&vLast);
7,600,921✔
474
            TSDB_CHECK_CODE(code, lino, _end);
7,600,982!
475

476
            px = taosArrayPush(pInfo->pLastKey, &vLast);
7,600,982✔
477
            TSDB_CHECK_NULL(px, code, lino, _end, terrno);
7,601,281!
478
          }
479
        } else {
480
          SValue vFirst = {0};
2,886,227✔
481
          for (int32_t j = 0; j < size; ++j) {
18,001,205✔
482
            px = taosArrayPush(pInfo->pFirstKey, &vFirst);
15,127,079✔
483
            TSDB_CHECK_NULL(px, code, lino, _end, terrno);
15,121,774!
484

485
            px = taosArrayPush(pInfo->pLastKey, &vFirst);
15,121,774✔
486
            TSDB_CHECK_NULL(px, code, lino, _end, terrno);
15,114,978!
487
          }
488
        }
489
      } else {
490
        STbStatisRecord record = {0};
924,461✔
491
        while (i < rows) {
2,193,862✔
492
          code = tStatisBlockGet(&block, i, &record);
2,193,824✔
493
          TSDB_CHECK_CODE(code, lino, _end);
2,193,706!
494

495
          if (record.suid != suid) {
2,193,706✔
496
            break;
924,434✔
497
          }
498

499
          px = taosArrayPush(pInfo->pUid, &record.uid);
1,269,272✔
500
          TSDB_CHECK_NULL(px, code, lino, _end, terrno);
1,269,376!
501

502
          px = taosArrayPush(pInfo->pCount, &record.count);
1,269,376✔
503
          TSDB_CHECK_NULL(px, code, lino, _end, terrno);
1,269,361!
504

505
          px = taosArrayPush(pInfo->pFirstTs, &record.firstKey.ts);
1,269,361✔
506
          TSDB_CHECK_NULL(px, code, lino, _end, terrno);
1,269,393!
507

508
          px = taosArrayPush(pInfo->pLastTs, &record.lastKey.ts);
1,269,393✔
509
          TSDB_CHECK_NULL(px, code, lino, _end, terrno);
1,269,392!
510

511
          if (record.firstKey.numOfPKs > 0) {
1,269,392!
512
            SValue s = record.firstKey.pks[0];
×
513
            code = tValueDupPayload(&s);
×
514
            TSDB_CHECK_CODE(code, lino, _end);
×
515

516
            px = taosArrayPush(pInfo->pFirstKey, &s);
×
517
            TSDB_CHECK_NULL(px, code, lino, _end, terrno);
×
518

519
            s = record.lastKey.pks[0];
×
520
            code = tValueDupPayload(&s);
×
521
            TSDB_CHECK_CODE(code, lino, _end);
×
522

523
            px = taosArrayPush(pInfo->pLastKey, &s);
×
524
            TSDB_CHECK_NULL(px, code, lino, _end, terrno);
×
525
          } else {
526
            SValue v = {0};
1,269,392✔
527
            px = taosArrayPush(pInfo->pFirstKey, &v);
1,269,392✔
528
            TSDB_CHECK_NULL(px, code, lino, _end, terrno);
1,269,331!
529

530
            px = taosArrayPush(pInfo->pLastKey, &v);
1,269,331✔
531
            TSDB_CHECK_NULL(px, code, lino, _end, terrno);
1,269,401!
532
          }
533

534
          i += 1;
1,269,401✔
535
        }
536
      }
537
    }
538
  }
539

540
_end:
3,874,325✔
541
  tStatisBlockDestroy(&block);
3,874,325✔
542
  if (code != 0) {
3,886,680!
543
    tsdbError("%s error happens at:%s line number: %d, code:%s", id, __func__, lino, tstrerror(code));
×
544
  } else {
545
    double el = (taosGetTimestampUs() - st) / 1000.0;
3,886,165✔
546
    pBlockLoadInfo->cost.statisElapsedTime += el;
3,886,165✔
547

548
    tsdbDebug("%s load %d statis blocks into buf, elapsed time:%.2fms", id, num, el);
3,886,165✔
549
  }
550
  return code;
3,886,711✔
551
}
552

553
static int32_t doLoadSttFilesBlk(SSttBlockLoadInfo *pBlockLoadInfo, SLDataIter *pIter, int64_t suid,
5,021,152✔
554
                                 _load_tomb_fn loadTombFn, void *pReader1, const char *idStr) {
555
  int64_t st = taosGetTimestampUs();
5,022,968✔
556

557
  const TSttBlkArray *pSttBlkArray = NULL;
5,022,968✔
558
  pBlockLoadInfo->sttBlockLoaded = true;
5,022,968✔
559

560
  // load the stt block info for each stt-block
561
  int32_t code = tsdbSttFileReadSttBlk(pIter->pReader, &pSttBlkArray);
5,022,968✔
562
  if (code != TSDB_CODE_SUCCESS) {
5,018,239!
563
    tsdbError("load stt blk failed, code:%s, %s", tstrerror(code), idStr);
×
564
    return code;
×
565
  }
566

567
  // load the stt block info for each stt file block
568
  code = extractSttBlockInfo(pIter, pSttBlkArray, pBlockLoadInfo, suid);
5,018,239✔
569
  if (code != TSDB_CODE_SUCCESS) {
5,022,487!
570
    tsdbError("load stt block info failed, code:%s, %s", tstrerror(code), idStr);
×
571
    return code;
×
572
  }
573

574
  // load stt statistics block for all stt-blocks, to decide if the data of queried table exists in current stt file
575
  TStatisBlkArray *pStatisBlkArray = NULL;
5,022,487✔
576
  code = tsdbSttFileReadStatisBlk(pIter->pReader, (const TStatisBlkArray **)&pStatisBlkArray);
5,022,487✔
577
  if (code != TSDB_CODE_SUCCESS) {
5,022,178!
578
    tsdbError("failed to load stt block statistics, code:%s, %s", tstrerror(code), idStr);
×
579
    return code;
×
580
  }
581

582
  // load statistics block for all tables in current stt file
583
  code = loadSttStatisticsBlockData(pIter->pReader, pIter->pBlockLoadInfo, pStatisBlkArray, suid, idStr);
5,022,178✔
584
  if (code != TSDB_CODE_SUCCESS) {
5,025,036!
585
    tsdbError("failed to load stt statistics block data, code:%s, %s", tstrerror(code), idStr);
×
586
    return code;
×
587
  }
588

589
  code = loadTombFn(pReader1, pIter->pReader, pIter->pBlockLoadInfo);
5,025,036✔
590

591
  double el = (taosGetTimestampUs() - st) / 1000.0;
5,023,636✔
592
  tsdbDebug("load the stt file blk info completed, elapsed time:%.2fms, %s", el, idStr);
5,023,636✔
593
  return code;
5,023,928✔
594
}
595

596
static int32_t uidComparFn(const void *p1, const void *p2) {
11,453,405✔
597
  const uint64_t *pFirst = p1;
11,453,405✔
598
  const uint64_t *pVal = p2;
11,453,405✔
599

600
  if (*pFirst == *pVal) {
11,453,405✔
601
    return 0;
4,127,528✔
602
  } else {
603
    return *pFirst < *pVal ? -1 : 1;
7,325,877✔
604
  }
605
}
606

607
static void setSttInfoForCurrentTable(SSttBlockLoadInfo *pLoadInfo, uint64_t uid, SSttKeyRange *pRange,
11,173,124✔
608
                                      int64_t *numOfRows) {
609
  if (pRange == NULL || taosArrayGetSize(pLoadInfo->info.pUid) == 0) {
11,173,124!
610
    return;
2,012,288✔
611
  }
612

613
  int32_t index = taosArraySearchIdx(pLoadInfo->info.pUid, &uid, uidComparFn, TD_EQ);
9,160,011✔
614
  if (index >= 0) {
9,159,750✔
615
    pRange->skey.ts = *(int64_t *)taosArrayGet(pLoadInfo->info.pFirstTs, index);
4,127,561✔
616
    pRange->ekey.ts = *(int64_t *)taosArrayGet(pLoadInfo->info.pLastTs, index);
4,126,543✔
617

618
    *numOfRows += *(int64_t *)taosArrayGet(pLoadInfo->info.pCount, index);
4,126,649✔
619

620
    if (pRange->skey.numOfPKs > 0) {
4,126,701✔
621
      memcpy(&pRange->skey.pks[0], taosArrayGet(pLoadInfo->info.pFirstKey, index), sizeof(SValue));
10,667✔
622
      memcpy(&pRange->ekey.pks[0], taosArrayGet(pLoadInfo->info.pLastKey, index), sizeof(SValue));
10,667✔
623
    }
624
  }
625
}
626

627
int32_t tLDataIterOpen2(SLDataIter *pIter, SSttFileReader *pSttFileReader, int32_t cid, int8_t backward,
11,172,980✔
628
                        SMergeTreeConf *pConf, SSttBlockLoadInfo *pBlockLoadInfo, SSttKeyRange *pKeyRange,
629
                        int64_t *numOfRows, const char *idStr) {
630
  int32_t code = TSDB_CODE_SUCCESS;
11,172,980✔
631

632
  pIter->uid = pConf->uid;
11,172,980✔
633
  pIter->cid = cid;
11,172,980✔
634
  pIter->backward = backward;
11,172,980✔
635
  pIter->verRange.minVer = pConf->verRange.minVer;
11,172,980✔
636
  pIter->verRange.maxVer = pConf->verRange.maxVer;
11,172,980✔
637
  pIter->timeWindow.skey = pConf->timewindow.skey;
11,172,980✔
638
  pIter->timeWindow.ekey = pConf->timewindow.ekey;
11,172,980✔
639

640
  pIter->pStartRowKey = pConf->pCurRowKey;
11,172,980✔
641
  pIter->pReader = pSttFileReader;
11,172,980✔
642
  pIter->pBlockLoadInfo = pBlockLoadInfo;
11,172,980✔
643

644
  // open stt file failed, ignore and continue
645
  if (pIter->pReader == NULL) {
11,172,980!
646
    tsdbError("stt file reader is null, %s", idStr);
×
647
    pIter->pSttBlk = NULL;
×
648
    pIter->iSttBlk = -1;
×
649
    return TSDB_CODE_SUCCESS;
×
650
  }
651

652
  if (!pBlockLoadInfo->sttBlockLoaded) {
11,172,980✔
653
    code = doLoadSttFilesBlk(pBlockLoadInfo, pIter, pConf->suid, pConf->loadTombFn, pConf->pReader, idStr);
5,022,423✔
654
    if (code != TSDB_CODE_SUCCESS) {
5,023,647!
655
      return code;
×
656
    }
657
  }
658

659
  setSttInfoForCurrentTable(pBlockLoadInfo, pConf->uid, pKeyRange, numOfRows);
11,174,204✔
660

661
  // find the start block, actually we could load the position to avoid repeatly searching for the start position when
662
  // the skey is updated.
663
  size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk);
11,170,459✔
664
  pIter->iSttBlk = binarySearchForStartBlock(pBlockLoadInfo->aSttBlk->pData, size, pConf->uid, backward);
11,170,967✔
665
  if (pIter->iSttBlk != -1) {
11,169,705✔
666
    pIter->pSttBlk = taosArrayGet(pBlockLoadInfo->aSttBlk, pIter->iSttBlk);
4,192,904✔
667
    pIter->iRow = (pIter->backward) ? pIter->pSttBlk->nRow : -1;
4,192,450✔
668

669
    if ((!backward) && ((pConf->strictTimeRange && pIter->pSttBlk->minKey >= pIter->timeWindow.ekey) ||
4,192,450!
670
                        (!pConf->strictTimeRange && pIter->pSttBlk->minKey > pIter->timeWindow.ekey))) {
3,587,759!
671
      pIter->pSttBlk = NULL;
507✔
672
    }
673

674
    if (backward && ((pConf->strictTimeRange && pIter->pSttBlk->maxKey <= pIter->timeWindow.skey) ||
4,192,450!
675
                     (!pConf->strictTimeRange && pIter->pSttBlk->maxKey < pIter->timeWindow.skey))) {
604,161✔
676
      pIter->pSttBlk = NULL;
391✔
677
      pIter->ignoreEarlierTs = true;
391✔
678
    }
679
  }
680

681
  return code;
11,169,251✔
682
}
683

684
void tLDataIterClose2(SLDataIter *pIter) {
5,030,107✔
685
  tsdbSttFileReaderClose(&pIter->pReader);
5,030,107✔
686
  pIter->pReader = NULL;
5,031,141✔
687
}
5,031,141✔
688

689
void tLDataIterNextBlock(SLDataIter *pIter, const char *idStr) {
4,242,346✔
690
  int32_t step = pIter->backward ? -1 : 1;
4,242,346✔
691
  int32_t oldIndex = pIter->iSttBlk;
4,242,346✔
692

693
  pIter->iSttBlk += step;
4,242,346✔
694

695
  int32_t index = -1;
4,242,346✔
696
  size_t  size = pIter->pBlockLoadInfo->aSttBlk->size;
4,242,346✔
697
  for (int32_t i = pIter->iSttBlk; i < size && i >= 0; i += step) {
4,242,703!
698
    SSttBlk *p = taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, i);
251,594✔
699
    if ((!pIter->backward) && p->minUid > pIter->uid) {
251,579✔
700
      break;
161,374✔
701
    }
702

703
    if (pIter->backward && p->maxUid < pIter->uid) {
90,205✔
704
      break;
24,255✔
705
    }
706

707
    // check uid firstly
708
    if (p->minUid <= pIter->uid && p->maxUid >= pIter->uid) {
65,950!
709
      if ((!pIter->backward) && p->minKey > pIter->timeWindow.ekey) {
65,959✔
710
        break;
274✔
711
      }
712

713
      if (pIter->backward && p->maxKey < pIter->timeWindow.skey) {
65,685✔
714
        break;
2✔
715
      }
716

717
      // check time range secondly
718
      if (p->minKey <= pIter->timeWindow.ekey && p->maxKey >= pIter->timeWindow.skey) {
65,683✔
719
        if ((!pIter->backward) && p->minVer > pIter->verRange.maxVer) {
65,312!
720
          break;
×
721
        }
722

723
        if (pIter->backward && p->maxVer < pIter->verRange.minVer) {
65,312!
724
          break;
×
725
        }
726

727
        if (p->minVer <= pIter->verRange.maxVer && p->maxVer >= pIter->verRange.minVer) {
65,312!
728
          index = i;
65,317✔
729
          break;
65,317✔
730
        }
731
      }
732
    }
733
  }
734

735
  pIter->pSttBlk = NULL;
4,242,331✔
736
  if (index != -1) {
4,242,331✔
737
    SSttBlk *p = taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, index);
65,317✔
738

739
    pIter->iSttBlk = index;
65,316✔
740
    pIter->pSttBlk = (SSttBlk *)taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, pIter->iSttBlk);
65,316✔
741
    tsdbDebug("try next stt-file block:%d from %d, trigger by uid:%" PRIu64 ", stt-fileVer:%" PRId64
65,314✔
742
              ", uidRange:%" PRId64 "-%" PRId64 " %s",
743
              pIter->iSttBlk, oldIndex, pIter->uid, pIter->cid, p->minUid, p->maxUid, idStr);
744
  } else {
745
    tsdbDebug("no more last block qualified, uid:%" PRIu64 ", stt-file block:%d, %s", pIter->uid, oldIndex, idStr);
4,177,014✔
746
  }
747
}
4,242,329✔
748

749
static int32_t findNextValidRow(SLDataIter *pIter, const char *idStr) {
378,959,636✔
750
  bool        hasVal = false;
378,959,636✔
751
  int32_t     step = pIter->backward ? -1 : 1;
378,959,636✔
752
  int32_t     i = pIter->iRow;
378,959,636✔
753
  SBlockData *pData = NULL;
378,959,636✔
754

755
  int32_t code = loadLastBlock(pIter, idStr, &pData);
378,959,636✔
756
  if (code) {
375,822,200!
757
    tsdbError("failed to load stt block, code:%s, %s", tstrerror(code), idStr);
×
758
    return code;
×
759
  }
760

761
  // mostly we only need to find the start position for a given table
762
  if ((((i == 0) && (!pIter->backward)) || (i == pData->nRow - 1 && pIter->backward)) && pData->aUid != NULL) {
375,822,200✔
763
    i = binarySearchForStartRowIndex((uint64_t *)pData->aUid, pData->nRow, pIter->uid, pIter->backward);
4,198,109✔
764
    if (i == -1) {
4,199,093!
765
      tsdbDebug("failed to find the data in pBlockData, uid:%" PRIu64 " , %s", pIter->uid, idStr);
×
766
      pIter->iRow = -1;
66,587✔
767
      return code;
66,587✔
768
    }
769
  }
770

771
  for (; i < pData->nRow && i >= 0; i += step) {
385,157,256!
772
    if (pData->aUid != NULL) {
383,256,982✔
773
      if (!pIter->backward) {
381,579,797✔
774
        if (pData->aUid[i] > pIter->uid) {
353,309,784✔
775
          break;
671,336✔
776
        }
777
      } else {
778
        if (pData->aUid[i] < pIter->uid) {
28,270,013✔
779
          break;
44,898✔
780
        }
781
      }
782
    }
783

784
    int64_t ts = pData->aTSKEY[i];
382,540,748✔
785
    if (!pIter->backward) {               // asc
382,540,748✔
786
      if (ts > pIter->timeWindow.ekey) {  // no more data
353,964,454✔
787
        break;
177,259✔
788
      } else {
789
        if (ts < pIter->timeWindow.skey) {
353,787,195✔
790
          continue;
4,028,448✔
791
        }
792

793
        if (ts == pIter->timeWindow.skey && pIter->pStartRowKey->numOfPKs > 0) {
349,758,747!
794
          SRowKey key;
795
          tColRowGetKey(pData, i, &key);
×
796
          int32_t ret = pkCompEx(&key, pIter->pStartRowKey);
×
797
          if (ret < 0) {
×
798
            continue;
×
799
          }
800
        }
801
      }
802
    } else {
803
      if (ts < pIter->timeWindow.skey) {
28,576,294✔
804
        break;
154,720✔
805
      } else {
806
        if (ts > pIter->timeWindow.ekey) {
28,421,574✔
807
          continue;
4,736,303✔
808
        }
809

810
        if (ts == pIter->timeWindow.ekey && pIter->pStartRowKey->numOfPKs > 0) {
23,685,271!
811
          SRowKey key;
812
          tColRowGetKey(pData, i, &key);
×
813
          int32_t ret = pkCompEx(&key, pIter->pStartRowKey);
×
814
          if (ret > 0) {
×
815
            continue;
×
816
          }
817
        }
818
      }
819
    }
820

821
    int64_t ver = pData->aVersion[i];
373,444,018✔
822
    if (ver < pIter->verRange.minVer) {
373,444,018!
823
      continue;
×
824
    }
825

826
    // todo opt handle desc case
827
    if (ver > pIter->verRange.maxVer) {
373,444,018!
UNCOV
828
      continue;
×
829
    }
830

831
    hasVal = true;
373,444,018✔
832
    break;
373,444,018✔
833
  }
834

835
  pIter->iRow = (hasVal) ? i : -1;
376,392,505✔
836
  return code;
376,392,505✔
837
}
838

839
int32_t tLDataIterNextRow(SLDataIter *pIter, const char *idStr, bool *hasNext) {
385,636,133✔
840
  int32_t     step = pIter->backward ? -1 : 1;
385,636,133✔
841
  int32_t     code = 0;
385,636,133✔
842
  int32_t     iBlockL = pIter->iSttBlk;
385,636,133✔
843
  SBlockData *pBlockData = NULL;
385,636,133✔
844
  int32_t     lino = 0;
385,636,133✔
845

846
  *hasNext = false;
385,636,133✔
847

848
  // no qualified last file block in current file, no need to fetch row
849
  if (pIter->pSttBlk == NULL) {
385,636,133✔
850
    return code;
6,979,603✔
851
  }
852

853
  code = loadLastBlock(pIter, idStr, &pBlockData);
378,656,530✔
854
  if (pBlockData == NULL || code != TSDB_CODE_SUCCESS) {
379,085,107!
855
    lino = __LINE__;
×
856
    goto _exit;
×
857
  }
858

859
  pIter->iRow += step;
379,248,397✔
860

861
  while (1) {
64,166✔
862
    bool skipBlock = false;
379,312,563✔
863
    code = findNextValidRow(pIter, idStr);
379,312,563✔
864
    TSDB_CHECK_CODE(code, lino, _exit);
377,446,822!
865

866
    if (pIter->pBlockLoadInfo->checkRemainingRow) {
377,446,822!
867
      skipBlock = true;
×
868
      int16_t *aCols = pIter->pBlockLoadInfo->colIds;
×
869
      int      nCols = pIter->pBlockLoadInfo->numOfCols;
×
870
      bool     isLast = pIter->pBlockLoadInfo->isLast;
×
871
      for (int inputColIndex = 0; inputColIndex < nCols; ++inputColIndex) {
×
872
        for (int colIndex = 0; colIndex < pBlockData->nColData; ++colIndex) {
×
873
          SColData *pColData = &pBlockData->aColData[colIndex];
×
874
          int16_t   cid = pColData->cid;
×
875

876
          if (cid == aCols[inputColIndex]) {
×
877
            if (isLast && (pColData->flag & HAS_VALUE)) {
×
878
              skipBlock = false;
×
879
              break;
×
880
            } else if (pColData->flag & (HAS_VALUE | HAS_NULL)) {
×
881
              skipBlock = false;
×
882
              break;
×
883
            }
884
          }
885
        }
886
      }
887
    }
888

889
    if (skipBlock || pIter->iRow >= pBlockData->nRow || pIter->iRow < 0) {
377,446,822✔
890
      tLDataIterNextBlock(pIter, idStr);
4,228,408✔
891
      if (pIter->pSttBlk == NULL) {  // no more data
4,240,818✔
892
        goto _exit;
4,176,649✔
893
      }
894
    } else {
895
      break;
896
    }
897

898
    if (iBlockL != pIter->iSttBlk) {
64,169!
899
      code = loadLastBlock(pIter, idStr, &pBlockData);
65,315✔
900
      if ((pBlockData == NULL) || (code != 0)) {
65,304!
901
        lino = __LINE__;
×
902
        goto _exit;
×
903
      }
904

905
      // set start row index
906
      pIter->iRow = pIter->backward ? pBlockData->nRow - 1 : 0;
65,312✔
907
    }
908
  }
909

910
  pIter->rInfo.suid = pBlockData->suid;
373,218,414✔
911
  pIter->rInfo.uid = pBlockData->uid;
373,218,414✔
912
  pIter->rInfo.row = tsdbRowFromBlockData(pBlockData, pIter->iRow);
373,218,414✔
913

914
_exit:
377,231,765✔
915
  if (code) {
377,231,765!
916
    tsdbError("failed to exec stt-file nextIter, lino:%d, code:%s, %s", lino, tstrerror(code), idStr);
×
917
  }
918

919
  *hasNext = (code == TSDB_CODE_SUCCESS) && (pIter->pSttBlk != NULL) && (pBlockData != NULL);
377,458,027!
920
  return code;
377,458,027✔
921
}
922

923
// SMergeTree =================================================
924
static FORCE_INLINE int32_t tLDataIterCmprFn(const SRBTreeNode *p1, const SRBTreeNode *p2) {
41,408,425✔
925
  SLDataIter *pIter1 = (SLDataIter *)(((uint8_t *)p1) - offsetof(SLDataIter, node));
41,515,416✔
926
  SLDataIter *pIter2 = (SLDataIter *)(((uint8_t *)p2) - offsetof(SLDataIter, node));
41,515,416✔
927

928
  SRowKey rkey1 = {0}, rkey2 = {0};
41,515,416✔
929
  tRowGetKeyEx(&pIter1->rInfo.row, &rkey1);
41,515,416!
930
  tRowGetKeyEx(&pIter2->rInfo.row, &rkey2);
41,737,160!
931

932
  int32_t ret = tRowKeyCompare(&rkey1, &rkey2);
41,661,898✔
933
  if (ret < 0) {
41,638,760✔
934
    return -1;
24,575,033✔
935
  } else if (ret > 0) {
17,063,727✔
936
    return 1;
6,846,396✔
937
  } else {
938
    int64_t ver1 = TSDBROW_VERSION(&pIter1->rInfo.row);
10,217,331!
939
    int64_t ver2 = TSDBROW_VERSION(&pIter2->rInfo.row);
10,217,331!
940

941
    if (ver1 < ver2) {
10,217,331✔
942
      return -1;
4,133,691✔
943
    } else if (ver1 > ver2) {
6,083,640!
944
      return 1;
6,523,530✔
945
    } else {
946
      return 0;
×
947
    }
948
  }
949
}
950

951
static FORCE_INLINE int32_t tLDataIterDescCmprFn(const SRBTreeNode *p1, const SRBTreeNode *p2) {
106,991!
952
  return -1 * tLDataIterCmprFn(p1, p2);
106,991✔
953
}
954

955
int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoForTable *pSttDataInfo) {
11,595,866✔
956
  int32_t code = TSDB_CODE_SUCCESS;
11,595,866✔
957

958
  pMTree->pIter = NULL;
11,595,866✔
959
  pMTree->backward = pConf->backward;
11,595,866✔
960
  pMTree->idStr = pConf->idstr;
11,595,866✔
961
  int32_t lino = 0;
11,595,866✔
962

963
  if (!pMTree->backward) {  // asc
11,595,866✔
964
    tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn);
9,700,044✔
965
  } else {  // desc
966
    tRBTreeCreate(&pMTree->rbt, tLDataIterDescCmprFn);
1,895,822✔
967
  }
968

969
  pMTree->ignoreEarlierTs = false;
11,597,229✔
970

971
  // no data exists, go to end
972
  int32_t numOfLevels = ((STFileSet *)pConf->pCurrentFileset)->lvlArr->size;
11,597,229✔
973
  if (numOfLevels == 0) {
11,597,229✔
974
    goto _end;
605,926✔
975
  }
976

977
  code = adjustSttDataIters(pConf->pSttFileBlockIterArray, pConf->pCurrentFileset);
10,991,303✔
978
  if (code) {
10,991,814!
979
    goto _end;
×
980
  }
981

982
  for (int32_t j = 0; j < numOfLevels; ++j) {
22,138,205✔
983
    SSttLvl *pSttLevel = ((STFileSet *)pConf->pCurrentFileset)->lvlArr->data[j];
11,147,638✔
984
    SArray  *pList = taosArrayGetP(pConf->pSttFileBlockIterArray, j);
11,147,638✔
985

986
    for (int32_t i = 0; i < TARRAY2_SIZE(pSttLevel->fobjArr); ++i) {  // open all last file
22,317,063✔
987
      SLDataIter *pIter = taosArrayGetP(pList, i);
11,170,672✔
988

989
      SSttFileReader    *pSttFileReader = pIter->pReader;
11,170,895✔
990
      SSttBlockLoadInfo *pLoadInfo = pIter->pBlockLoadInfo;
11,170,895✔
991

992
      // open stt file reader if not opened yet
993
      // if failed to open this stt file, ignore the error and try next one
994
      if (pSttFileReader == NULL) {
11,170,895✔
995
        SSttFileReaderConfig conf = {.tsdb = pConf->pTsdb, .szPage = pConf->pTsdb->pVnode->config.tsdbPageSize};
5,022,047✔
996
        conf.file[0] = *pSttLevel->fobjArr->data[i]->f;
5,022,047✔
997

998
        code = tsdbSttFileReaderOpen(pSttLevel->fobjArr->data[i]->fname, &conf, &pSttFileReader);
5,022,047✔
999
        if (code != TSDB_CODE_SUCCESS) {
5,020,453!
1000
          tsdbError("open stt file reader error. file name %s, code %s, %s", pSttLevel->fobjArr->data[i]->fname,
×
1001
                    tstrerror(code), pMTree->idStr);
1002
        }
1003
      }
1004

1005
      if (pLoadInfo == NULL) {
11,167,527✔
1006
        code = tCreateSttBlockLoadInfo(pConf->pSchema, pConf->pCols, pConf->numOfCols, &pLoadInfo);
5,020,863✔
1007
        if (code != TSDB_CODE_SUCCESS) {
5,022,168!
1008
          goto _end;
×
1009
        }
1010
      }
1011

1012
      memset(pIter, 0, sizeof(SLDataIter));
11,168,832✔
1013

1014
      SSttKeyRange range = {.skey.numOfPKs = pConf->pCurRowKey->numOfPKs, .ekey.numOfPKs = pConf->pCurRowKey->numOfPKs};
11,168,832✔
1015
      int64_t      numOfRows = 0;
11,168,832✔
1016
      int64_t      cid = pSttLevel->fobjArr->data[i]->f->cid;
11,168,832✔
1017

1018
      code = tLDataIterOpen2(pIter, pSttFileReader, cid, pMTree->backward, pConf, pLoadInfo, &range, &numOfRows,
11,168,832✔
1019
                             pMTree->idStr);
1020
      if (code != TSDB_CODE_SUCCESS) {
11,168,981!
1021
        goto _end;
×
1022
      }
1023

1024
      bool hasVal = NULL;
11,168,981✔
1025
      code = tLDataIterNextRow(pIter, pMTree->idStr, &hasVal);
11,168,981✔
1026
      if (code) {
11,171,889!
1027
        goto _end;
×
1028
      }
1029

1030
      if (hasVal) {
11,171,889✔
1031
        tMergeTreeAddIter(pMTree, pIter);
4,124,264✔
1032

1033
        // let's record the time window for current table of uid in the stt files
1034
        if (pSttDataInfo != NULL && numOfRows > 0) {
4,122,380!
1035
          void *px = taosArrayPush(pSttDataInfo->pKeyRangeList, &range);
4,123,176✔
1036
          QUERY_CHECK_NULL(px, code, lino, _end, terrno);
4,121,957!
1037

1038
          pSttDataInfo->numOfRows += numOfRows;
4,121,957✔
1039
        }
1040
      } else {
1041
        if (!pMTree->ignoreEarlierTs) {
7,047,625✔
1042
          pMTree->ignoreEarlierTs = pIter->ignoreEarlierTs;
7,047,237✔
1043
        }
1044
      }
1045
    }
1046
  }
1047

1048
  return code;
10,990,567✔
1049

1050
_end:
605,926✔
1051
  tMergeTreeClose(pMTree);
605,926✔
1052
  return code;
605,859✔
1053
}
1054

1055
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter) {
4,123,617✔
1056
  SRBTreeNode *node = tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pIter);
4,123,617✔
1057
}
4,122,281✔
1058

1059
bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree) { return pMTree->ignoreEarlierTs; }
×
1060

1061
static void tLDataIterPinSttBlock(SLDataIter *pIter, const char *id) {
228,716,730✔
1062
  SSttBlockLoadInfo *pInfo = pIter->pBlockLoadInfo;
228,716,730✔
1063

1064
  if (pInfo->blockData[0].sttBlockIndex == pIter->iSttBlk) {
228,716,730✔
1065
    pInfo->blockData[0].pin = true;
127,945,066✔
1066
    tsdbTrace("pin stt-block, blockIndex:%d, stt-fileVer:%" PRId64 " %s", pIter->iSttBlk, pIter->cid, id);
127,945,066✔
1067
    return;
127,969,055✔
1068
  }
1069

1070
  if (pInfo->blockData[1].sttBlockIndex == pIter->iSttBlk) {
100,771,664!
1071
    pInfo->blockData[1].pin = true;
104,529,819✔
1072
    tsdbTrace("pin stt-block, blockIndex:%d, stt-fileVer:%" PRId64 " %s", pIter->iSttBlk, pIter->cid, id);
104,529,819✔
1073
    return;
104,536,331✔
1074
  }
1075

1076
  tsdbError("failed to pin any stt block, sttBlock:%d stt-fileVer:%" PRId64 " %s", pIter->iSttBlk, pIter->cid, id);
×
1077
}
1078

1079
static void tLDataIterUnpinSttBlock(SLDataIter *pIter, const char *id) {
225,168,733✔
1080
  SSttBlockLoadInfo *pInfo = pIter->pBlockLoadInfo;
225,168,733✔
1081
  if (pInfo->blockData[0].pin) {
225,168,733✔
1082
    pInfo->blockData[0].pin = false;
126,780,514✔
1083
    tsdbTrace("unpin stt-block:%d, stt-fileVer:%" PRId64 " %s", pInfo->blockData[0].sttBlockIndex, pIter->cid, id);
126,780,514✔
1084
    return;
126,797,050✔
1085
  }
1086

1087
  if (pInfo->blockData[1].pin) {
98,388,219!
1088
    pInfo->blockData[1].pin = false;
103,583,777✔
1089
    tsdbTrace("unpin stt-block:%d, stt-fileVer:%" PRId64 " %s", pInfo->blockData[1].sttBlockIndex, pIter->cid, id);
103,583,777✔
1090
    return;
103,539,434✔
1091
  }
1092

1093
  tsdbError("failed to unpin any stt block, sttBlock:%d stt-fileVer:%" PRId64 " %s", pIter->iSttBlk, pIter->cid, id);
×
1094
}
1095

1096
void tMergeTreePinSttBlock(SMergeTree *pMTree) {
228,890,729✔
1097
  if (pMTree->pIter == NULL) {
228,890,729!
1098
    return;
×
1099
  }
1100

1101
  SLDataIter *pIter = pMTree->pIter;
228,890,729✔
1102
  pMTree->pPinnedBlockIter = pIter;
228,890,729✔
1103
  tLDataIterPinSttBlock(pIter, pMTree->idStr);
228,890,729✔
1104
}
1105

1106
void tMergeTreeUnpinSttBlock(SMergeTree *pMTree) {
225,302,171✔
1107
  if (pMTree->pPinnedBlockIter == NULL) {
225,302,171!
1108
    return;
×
1109
  }
1110

1111
  SLDataIter *pIter = pMTree->pPinnedBlockIter;
225,302,171✔
1112
  pMTree->pPinnedBlockIter = NULL;
225,302,171✔
1113
  tLDataIterUnpinSttBlock(pIter, pMTree->idStr);
225,302,171✔
1114
}
1115

1116
int32_t tMergeTreeNext(SMergeTree *pMTree, bool *pHasNext) {
384,144,820✔
1117
  int32_t code = 0;
384,144,820✔
1118
  if (pHasNext == NULL) {
384,144,820!
1119
    return TSDB_CODE_INVALID_PARA;
×
1120
  }
1121

1122
  *pHasNext = false;
384,144,820✔
1123
  if (pMTree->pIter) {
384,144,820✔
1124
    SLDataIter *pIter = pMTree->pIter;
373,054,727✔
1125
    bool        hasVal = false;
373,054,727✔
1126
    code = tLDataIterNextRow(pIter, pMTree->idStr, &hasVal);
373,054,727✔
1127
    if (!hasVal || (code != 0)) {
373,207,970✔
1128
      if (code == TSDB_CODE_FILE_CORRUPTED) {
4,107,171!
1129
        code = 0;  // suppress the file corrupt error to enable all queries within this cluster can run without failed.
×
1130
      }
1131

1132
      pMTree->pIter = NULL;
4,107,171✔
1133
    }
1134

1135
    // compare with min in RB Tree
1136
    pIter = (SLDataIter *)tRBTreeMin(&pMTree->rbt);
373,207,970✔
1137
    if (pMTree->pIter && pIter) {
373,207,970✔
1138
      int32_t c = pMTree->rbt.cmprFn(&pMTree->pIter->node, &pIter->node);
24,342,685✔
1139
      if (c > 0) {
24,310,021✔
1140
        (void)tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pMTree->pIter);
4,096,402✔
1141
        pMTree->pIter = NULL;
3,992,318✔
1142
      } else if (!c) {
20,213,619!
1143
        return TSDB_CODE_INTERNAL_ERROR;
×
1144
      }
1145
    }
1146
  }
1147

1148
  if (pMTree->pIter == NULL) {
384,161,315✔
1149
    pMTree->pIter = (SLDataIter *)tRBTreeMin(&pMTree->rbt);
19,675,796✔
1150
    if (pMTree->pIter) {
19,675,796✔
1151
      tRBTreeDrop(&pMTree->rbt, (SRBTreeNode *)pMTree->pIter);
8,096,357✔
1152
    }
1153
  }
1154

1155
  *pHasNext = (pMTree->pIter != NULL);
384,363,033✔
1156
  return code;
384,363,033✔
1157
}
1158

1159
void tMergeTreeClose(SMergeTree *pMTree) {
21,975,789✔
1160
  pMTree->pIter = NULL;
21,975,789✔
1161
  pMTree->pPinnedBlockIter = NULL;
21,975,789✔
1162
}
21,975,789✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc