• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4847

11 Nov 2025 05:50AM UTC coverage: 62.651% (+0.3%) from 62.306%
#4847

push

travis-ci

web-flow
Merge e78cd6509 into 47a2ea7a0

542 of 650 new or added lines in 16 files covered. (83.38%)

1515 existing lines in 91 files now uncovered.

113826 of 181682 relevant lines covered (62.65%)

113230552.12 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.08
/source/dnode/vnode/src/tsdb/tsdbMergeTree.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tlrucache.h"
17
#include "tsdb.h"
18
#include "tsdbFSet2.h"
19
#include "tsdbMerge.h"
20
#include "tsdbReadUtil.h"
21
#include "tsdbSttFileRW.h"
22

23
typedef struct SSttStatisCacheKey {
24
  int64_t suid;
25
  int32_t vgId;
26
  int32_t fid;
159,434,335✔
27
} SSttStatisCacheKey;
159,434,335✔
28

29
typedef struct SSttStatisCacheValue {
159,464,525✔
30
  int64_t commitTs;
159,373,786✔
NEW
31
  SArray *pLevel;  // SArray<SArray<SSttTableRowsInfo>>
×
32
} SSttStatisCacheValue;
33

34
typedef struct SSttStatisFileCacheInfo {
159,373,786✔
35
  SLRUCache *    pStatisFileCache;
159,377,917✔
36
  TdThreadMutex  lock;
37
} SSttStatisFileCacheInfo;
159,387,866✔
38

39
static SSttStatisFileCacheInfo statisCacheInfo;
159,393,589✔
40
static TdThreadOnce tsCacheInit = PTHREAD_ONCE_INIT;
159,449,754✔
NEW
41

×
NEW
42
static int32_t getSttTableRowsInfo(SSttStatisCacheValue *pValue, int32_t numOfPKs, int32_t levelIdx, int32_t fileIdx,
×
43
                                   SSttTableRowsInfo *pSttTableRowsInfo);
44
static int32_t buildSttTableRowsInfoKV(SMergeTreeConf *pConf, int32_t vgId, SSttStatisCacheKey *pKey,
45
                                       SSttStatisCacheValue **pValue);
159,449,754✔
46
static void    clearStatisInfoCache(SLRUCache *pCache, SSttStatisCacheKey *pKey);
159,447,959✔
NEW
47

×
NEW
48
static int32_t putStatisInfoIntoCache(SLRUCache *pCache, SSttStatisCacheKey *pKey, SSttStatisCacheValue *pValue,
×
49
                                       const char *id);
50
static int32_t getStatisInfoFromCache(SLRUCache *pCache, SSttStatisCacheKey *pKey, SSttStatisCacheValue **pValue,
51
                                      LRUHandle **pHandle, const char *id);
159,447,959✔
52
static void releaseCacheHandle(SLRUCache* pCache, LRUHandle** pHandle);
159,503,666✔
UNCOV
53
static void tLDataIterClose2(SLDataIter *pIter);
×
UNCOV
54

×
55
// SLDataIter =================================================
56
int32_t tCreateSttBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols, SSttBlockLoadInfo **pInfo) {
57
  *pInfo = NULL;
159,489,548✔
58

159,499,393✔
59
  SSttBlockLoadInfo *pLoadInfo = taosMemoryCalloc(1, sizeof(SSttBlockLoadInfo));
159,502,771✔
60
  if (pLoadInfo == NULL) {
61
    return terrno;
159,493,755✔
62
  }
159,469,497✔
63

64
  pLoadInfo->blockData[0].sttBlockIndex = -1;
65
  pLoadInfo->blockData[1].sttBlockIndex = -1;
2,147,483,647✔
66

2,147,483,647✔
67
  pLoadInfo->currentLoadBlockIndex = 1;
2,147,483,647✔
68

5,561,848✔
69
  int32_t code = tBlockDataCreate(&pLoadInfo->blockData[0].data);
70
  if (code) {
2,147,483,647✔
71
    taosMemoryFreeClear(pLoadInfo);
72
    return code;
159,434,682✔
73
  }
159,434,682✔
UNCOV
74

×
75
  code = tBlockDataCreate(&pLoadInfo->blockData[1].data);
76
  if (code) {
77
    taosMemoryFreeClear(pLoadInfo);
159,434,682✔
78
    return code;
79
  }
159,494,006✔
80

159,453,490✔
81
  pLoadInfo->aSttBlk = taosArrayInit(4, sizeof(SSttBlk));
159,494,236✔
82
  if (pLoadInfo->aSttBlk == NULL) {
159,459,874✔
83
    taosMemoryFreeClear(pLoadInfo);
84
    return terrno;
159,463,322✔
85
  }
159,471,404✔
86

159,493,792✔
87
  pLoadInfo->pSchema = pSchema;
159,464,539✔
88
  pLoadInfo->colIds = colList;
89
  pLoadInfo->numOfCols = numOfCols;
159,517,670✔
90

159,476,804✔
91
  *pInfo = pLoadInfo;
159,463,377✔
92
  return code;
159,499,834✔
93
}
159,486,055✔
94

159,485,301✔
95
static void freeItem(void *pValue) {
96
  SValue *p = (SValue *)pValue;
159,480,088✔
97
  if (IS_VAR_DATA_TYPE(p->type) || p->type == TSDB_DATA_TYPE_DECIMAL) {
159,482,964✔
98
    taosMemoryFree(p->pData);
159,472,839✔
99
  }
159,511,711✔
100
}
159,500,471✔
101

159,481,481✔
102
void destroySttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
103
  if (pLoadInfo == NULL) {
159,478,493✔
104
    return;
159,487,690✔
105
  }
106

107
  pLoadInfo->currentLoadBlockIndex = 1;
159,500,901✔
108

159,500,901✔
109
  SBlockDataInfo *pInfo = &pLoadInfo->blockData[0];
159,466,088✔
110
  tBlockDataDestroy(&pInfo->data);
159,469,387✔
111
  pInfo->sttBlockIndex = -1;
159,445,664✔
112
  pInfo->pin = false;
113

295,107,131✔
114
  pInfo = &pLoadInfo->blockData[1];
295,107,131✔
115
  tBlockDataDestroy(&pInfo->data);
3,559,560✔
116
  pInfo->sttBlockIndex = -1;
117
  pInfo->pin = false;
118

291,547,571✔
119
  taosArrayDestroy(pLoadInfo->info.pUid);
450,924,166✔
120
  taosArrayDestroyEx(pLoadInfo->info.pFirstKey, freeItem);
159,341,937✔
121
  taosArrayDestroyEx(pLoadInfo->info.pLastKey, freeItem);
318,834,735✔
122
  taosArrayDestroy(pLoadInfo->info.pCount);
159,520,089✔
123
  taosArrayDestroy(pLoadInfo->info.pFirstTs);
159,523,436✔
124
  taosArrayDestroy(pLoadInfo->info.pLastTs);
159,530,144✔
125

159,527,011✔
126
  pLoadInfo->info.pUid = NULL;
159,515,008✔
127
  pLoadInfo->info.pFirstKey = NULL;
159,517,627✔
128
  pLoadInfo->info.pLastKey = NULL;
159,519,674✔
129
  pLoadInfo->info.pCount = NULL;
159,500,258✔
130
  pLoadInfo->info.pFirstTs = NULL;
131
  pLoadInfo->info.pLastTs = NULL;
132

133
  taosArrayDestroy(pLoadInfo->aSttBlk);
159,515,629✔
134
  taosMemoryFree(pLoadInfo);
135
}
136

159,332,538✔
137
void destroyLDataIter(SLDataIter *pIter) {
138
  tLDataIterClose2(pIter);
139
  destroySttBlockLoadInfo(pIter->pBlockLoadInfo);
291,582,229✔
140
  taosMemoryFree(pIter);
141
}
142

143
void destroySttBlockReader(SArray *pLDataIterArray, SSttBlockLoadCostInfo *pLoadCost) {
108,346,038✔
144
  if (pLDataIterArray == NULL) {
108,346,038✔
145
    return;
108,416,089✔
UNCOV
146
  }
×
147

148
  int32_t numOfLevel = taosArrayGetSize(pLDataIterArray);
149
  for (int32_t i = 0; i < numOfLevel; ++i) {
108,384,571✔
150
    SArray *pList = taosArrayGetP(pLDataIterArray, i);
108,401,520✔
151
    for (int32_t j = 0; j < taosArrayGetSize(pList); ++j) {
152
      SLDataIter *pIter = taosArrayGetP(pList, j);
2,147,483,647✔
153
      if (pIter->pBlockLoadInfo != NULL) {
2,147,483,647✔
154
        SSttBlockLoadCostInfo *pCost = &pIter->pBlockLoadInfo->cost;
2,147,483,647✔
155
        if (pLoadCost != NULL) {
156
          pLoadCost->loadBlocks += pCost->loadBlocks;
157
          pLoadCost->loadStatisBlocks += pCost->loadStatisBlocks;
2,147,483,647✔
158
          pLoadCost->blockElapsedTime += pCost->blockElapsedTime;
2,147,483,647✔
159
          pLoadCost->statisElapsedTime += pCost->statisElapsedTime;
160
        }
2,147,483,647✔
161
      }
2,147,483,647✔
162

11,783✔
163
      destroyLDataIter(pIter);
164
    }
165

11,783✔
166
    taosArrayDestroy(pList);
167
  }
168

2,147,483,647✔
169
  taosArrayDestroy(pLDataIterArray);
2,147,483,647✔
170
}
171

172
// choose the unpinned slot to load next data block
2,147,483,647✔
173
static void updateBlockLoadSlot(SSttBlockLoadInfo *pLoadInfo) {
2,147,483,647✔
174
  int32_t nextSlotIndex = pLoadInfo->currentLoadBlockIndex ^ 1;
6,387✔
175
  if (pLoadInfo->blockData[nextSlotIndex].pin) {
176
    nextSlotIndex = nextSlotIndex ^ 1;
177
  }
6,387✔
178

179
  pLoadInfo->currentLoadBlockIndex = nextSlotIndex;
180
}
2,147,483,647✔
181

2,147,483,647✔
182
static int32_t loadLastBlock(SLDataIter *pIter, const char *idStr, SBlockData **pResBlock) {
183
  if (pResBlock != NULL) {
184
    *pResBlock = NULL;
108,337,994✔
UNCOV
185
  }
×
186

187
  int32_t            code = 0;
188
  SSttBlockLoadInfo *pInfo = pIter->pBlockLoadInfo;
108,400,742✔
189

108,423,517✔
190
  if (pInfo->blockData[0].sttBlockIndex == pIter->iSttBlk) {
191
    if (pInfo->currentLoadBlockIndex != 0) {
108,423,517✔
192
      tsdbDebug("current load index is set to 0, block index:%d, fileVer:%" PRId64 ", due to uid:%" PRIu64
108,378,813✔
193
                ", load data, %s",
108,371,710✔
194
                pIter->iSttBlk, pIter->cid, pIter->uid, idStr);
108,394,853✔
UNCOV
195
      pInfo->currentLoadBlockIndex = 0;
×
196
    }
197

198
    *pResBlock = &pInfo->blockData[0].data;
108,404,196✔
199
    return code;
108,404,196✔
200
  }
108,414,225✔
201

202
  if (pInfo->blockData[1].sttBlockIndex == pIter->iSttBlk) {
108,414,401✔
203
    if (pInfo->currentLoadBlockIndex != 1) {
204
      tsdbDebug("current load index is set to 1, block index:%d, fileVer:%" PRId64 ", due to uid:%" PRIu64
205
                ", load data, %s",
206
                pIter->iSttBlk, pIter->cid, pIter->uid, idStr);
207
      pInfo->currentLoadBlockIndex = 1;
208
    }
209

108,414,313✔
210
    *pResBlock = &pInfo->blockData[1].data;
108,418,014✔
211
    return code;
212
  }
108,427,926✔
213

214
  if (pIter->pSttBlk == NULL || pInfo->pSchema == NULL) {
215
    return code;
108,427,926✔
216
  }
108,379,286✔
217

218
  updateBlockLoadSlot(pInfo);
219
  int64_t st = taosGetTimestampUs();
220

221
  SBlockData *pBlock = &pInfo->blockData[pInfo->currentLoadBlockIndex].data;
222
  code = tsdbSttFileReadBlockDataByColumn(pIter->pReader, pIter->pSttBlk, pBlock, pInfo->pSchema, &pInfo->colIds[1],
123,969,339✔
223
                                          pInfo->numOfCols - 1);
123,969,339✔
224
  if (code != TSDB_CODE_SUCCESS) {
249,142,965✔
225
    return code;
125,173,626✔
226
  }
227

123,950,663✔
228
  double el = (taosGetTimestampUs() - st) / 1000.0;
229
  pInfo->cost.blockElapsedTime += el;
230
  pInfo->cost.loadBlocks += 1;
228,956,455✔
231

228,956,455✔
232
  tsdbDebug("read stt block, total load:%" PRId64 ", trigger by uid:%" PRIu64 ", stt-fileVer:%" PRId64
228,956,455✔
233
            ", last block index:%d, entry:%d, rows:%d, uidRange:%" PRId64 "-%" PRId64 " tsRange:%" PRId64 "-%" PRId64
47,334,697✔
234
            " %p, elapsed time:%.2f ms, %s",
235
            pInfo->cost.loadBlocks, pIter->uid, pIter->cid, pIter->iSttBlk, pInfo->currentLoadBlockIndex, pBlock->nRow,
236
            pIter->pSttBlk->minUid, pIter->pSttBlk->maxUid, pIter->pSttBlk->minKey, pIter->pSttBlk->maxKey, pBlock, el,
181,621,758✔
237
            idStr);
181,621,758✔
238

239
  pInfo->blockData[pInfo->currentLoadBlockIndex].sttBlockIndex = pIter->iSttBlk;
240
  pIter->iRow = (pIter->backward) ? pInfo->blockData[pInfo->currentLoadBlockIndex].data.nRow : -1;
181,621,758✔
241

57,710,236✔
242
  tsdbDebug("last block index list:%d, %d, rowIndex:%d %s", pInfo->blockData[0].sttBlockIndex,
243
            pInfo->blockData[1].sttBlockIndex, pIter->iRow, idStr);
244

5,710,306✔
245
  *pResBlock = &pInfo->blockData[pInfo->currentLoadBlockIndex].data;
129,639,008✔
246
  return code;
119,437,621✔
247
}
248

249
// find the earliest block that contains the required records
10,206,574✔
250
static FORCE_INLINE int32_t findEarliestIndex(int32_t index, uint64_t uid, const SSttBlk *pBlockList, int32_t num,
8,494✔
251
                                              int32_t backward) {
252
  int32_t i = index;
253
  int32_t step = backward ? 1 : -1;
10,222,419✔
254
  while (i >= 0 && i < num && uid >= pBlockList[i].minUid && uid <= pBlockList[i].maxUid) {
10,222,419✔
255
    i += step;
256
  }
10,222,419✔
257
  return i - step;
2,625,696✔
258
}
7,598,286✔
259

3,084,610✔
260
static int32_t binarySearchForStartBlock(SSttBlk *pBlockList, int32_t num, uint64_t uid, int32_t backward) {
261
  int32_t midPos = -1;
4,513,042✔
262
  if (num <= 0) {
263
    return -1;
264
  }
265

266
  int32_t firstPos = 0;
267
  int32_t lastPos = num - 1;
268

125,584,127✔
269
  // find the first position which is bigger than the key
125,584,127✔
270
  if ((uid > pBlockList[lastPos].maxUid) || (uid < pBlockList[firstPos].minUid)) {
2,147,483,647✔
271
    return -1;
2,131,676,575✔
272
  }
273

125,588,258✔
274
  while (1) {
275
    if (uid >= pBlockList[firstPos].minUid && uid <= pBlockList[firstPos].maxUid) {
276
      return findEarliestIndex(firstPos, uid, pBlockList, num, backward);
125,664,354✔
277
    }
125,664,354✔
278

125,664,354✔
279
    if (uid > pBlockList[lastPos].maxUid || uid < pBlockList[firstPos].minUid) {
280
      return -1;
281
    }
125,664,354✔
UNCOV
282

×
283
    int32_t numOfRows = lastPos - firstPos + 1;
284
    midPos = (numOfRows >> 1u) + firstPos;
285

34,625,968✔
286
    if (uid < pBlockList[midPos].minUid) {
160,320,923✔
287
      lastPos = midPos - 1;
99,694,752✔
288
    } else if (uid > pBlockList[midPos].maxUid) {
289
      firstPos = midPos + 1;
290
    } else {
60,603,318✔
291
      return findEarliestIndex(midPos, uid, pBlockList, num, backward);
85,596✔
292
    }
293
  }
294
}
60,517,159✔
295

60,517,159✔
296
static FORCE_INLINE int32_t findEarliestRow(int32_t index, uint64_t uid, const uint64_t *uidList, int32_t num,
297
                                            int32_t backward) {
60,517,159✔
298
  int32_t i = index;
11,163,111✔
299
  int32_t step = backward ? 1 : -1;
49,357,200✔
300
  while (i >= 0 && i < num && uid == uidList[i]) {
23,462,857✔
301
    i += step;
302
  }
25,893,506✔
303
  return i - step;
304
}
305

306
static int32_t binarySearchForStartRowIndex(uint64_t *uidList, int32_t num, uint64_t uid, int32_t backward) {
307
  int32_t firstPos = 0;
159,424,537✔
308
  int32_t lastPos = num - 1;
309

159,424,537✔
310
  // find the first position which is bigger than the key
159,424,537✔
311
  if ((uid > uidList[lastPos]) || (uid < uidList[firstPos])) {
159,424,537✔
312
    return -1;
144,328✔
313
  }
314

315
  while (1) {
159,315,646✔
316
    if (uid == uidList[firstPos]) {
159,311,788✔
317
      return findEarliestRow(firstPos, uid, uidList, num, backward);
318
    }
319

159,330,925✔
320
    if (uid > uidList[lastPos] || uid < uidList[firstPos]) {
135,678,542✔
321
      return -1;
28,337,697✔
322
    }
28,338,325✔
323

28,339,183✔
324
    int32_t numOfRows = lastPos - firstPos + 1;
325
    int32_t midPos = (numOfRows >> 1u) + firstPos;
107,302,352✔
326

107,339,216✔
327
    if (uid < uidList[midPos]) {
107,379,467✔
UNCOV
328
      lastPos = midPos - 1;
×
329
    } else if (uid > uidList[midPos]) {
330
      firstPos = midPos + 1;
331
    } else {
332
      return findEarliestRow(midPos, uid, uidList, num, backward);
23,632,378✔
333
    }
23,637,249✔
UNCOV
334
  }
×
335
}
336

337
static int32_t extractSttBlockInfo(SLDataIter *pIter, const TSttBlkArray *pArray, SSttBlockLoadInfo *pBlockLoadInfo,
82,276,735✔
338
                                   uint64_t suid) {
67,034,675✔
339
  void   *px = NULL;
67,033,312✔
340
  int32_t code = TSDB_CODE_SUCCESS;
38,858,225✔
341
  if (TARRAY2_SIZE(pArray) <= 0) {
342
    return code;
343
  }
28,181,431✔
344

19,781,261✔
345
  SSttBlk *pStart = &pArray->data[0];
19,781,261✔
UNCOV
346
  SSttBlk *pEnd = &pArray->data[TARRAY2_SIZE(pArray) - 1];
×
UNCOV
347

×
348
  // all identical
349
  if (pStart->suid == pEnd->suid) {
8,401,797✔
350
    if (pStart->suid != suid) {  // no qualified stt block existed
8,402,425✔
351
      taosArrayClear(pBlockLoadInfo->aSttBlk);
352
      pIter->iSttBlk = -1;
353
      return TSDB_CODE_SUCCESS;
354
    } else {  // all blocks are qualified
23,637,873✔
355
      taosArrayClear(pBlockLoadInfo->aSttBlk);
23,636,137✔
356
      px = taosArrayAddBatch(pBlockLoadInfo->aSttBlk, pArray->data, pArray->size);
357
      if (px == NULL) {
358
        return terrno;
130,939,335✔
359
      }
360
    }
361
  } else {
19,930,885✔
362
    SArray *pTmp = taosArrayInit(TARRAY2_SIZE(pArray), sizeof(SSttBlk));
19,930,885✔
363
    if (pTmp == NULL) {
5,565,785✔
364
      return terrno;
5,560,588✔
365
    }
5,559,958✔
UNCOV
366

×
367
    for (int32_t i = 0; i < TARRAY2_SIZE(pArray); ++i) {
368
      SSttBlk *p = &pArray->data[i];
369
      if (p->suid < suid) {
5,559,958✔
370
        continue;
5,559,958✔
371
      }
372

373
      if (p->suid == suid) {
19,932,145✔
374
        void *px = taosArrayPush(pTmp, p);
375
        if (px == NULL) {
376
          code = terrno;
159,443,137✔
377
          break;
378
        }
159,443,137✔
379
      } else if (p->suid > suid) {
159,443,137✔
380
        break;
159,443,137✔
381
      }
159,443,137✔
382
    }
159,443,137✔
383

384
    taosArrayDestroy(pBlockLoadInfo->aSttBlk);
159,443,137✔
385
    pBlockLoadInfo->aSttBlk = pTmp;
159,428,588✔
386
  }
144,328✔
387

388
  return code;
389
}
174,996,048✔
390

15,711,788✔
391
static int32_t tValueDupPayload(SValue *pVal) {
392
  if (IS_VAR_DATA_TYPE(pVal->type) || pVal->type == TSDB_DATA_TYPE_DECIMAL) {
393
    char *p = (char *)pVal->pData;
159,329,075✔
394
    char *pBuf = taosMemoryMalloc(pVal->nData);
34,513,980✔
395
    if (pBuf == NULL) {
396
      return terrno;
397
    }
124,787,890✔
398

252,101,240✔
399
    memcpy(pBuf, p, pVal->nData);
127,313,350✔
400
    pVal->pData = (uint8_t *)pBuf;
401
  }
402

124,760,890✔
403
  return TSDB_CODE_SUCCESS;
124,760,890✔
404
}
405

124,809,030✔
406
static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBlockLoadInfo *pBlockLoadInfo,
124,828,395✔
407
                                          TStatisBlkArray *pStatisBlkArray, uint64_t suid, const char *id) {
124,859,150✔
408
  int32_t code = TSDB_CODE_SUCCESS;
409
  int32_t lino = 0;
124,846,077✔
410
  void   *px = NULL;
411
  int32_t startIndex = 0;
252,191,345✔
412
  int32_t ret = 0;
127,281,048✔
413

127,340,186✔
414
  int32_t numOfBlocks = TARRAY2_SIZE(pStatisBlkArray);
415
  if (numOfBlocks <= 0) {
127,340,186✔
416
    return code;
127,340,186✔
417
  }
151,427,674✔
418

24,087,488✔
419
  while ((startIndex < numOfBlocks) && (pStatisBlkArray->data[startIndex].maxTbid.suid < suid)) {
420
    ++startIndex;
421
  }
422

127,356,462✔
423
  if (startIndex >= numOfBlocks || pStatisBlkArray->data[startIndex].minTbid.suid > suid) {
127,063,892✔
424
    return 0;
425
  }
127,075,215✔
426

124,544,257✔
427
  int32_t endIndex = startIndex;
124,552,945✔
428
  while (endIndex < numOfBlocks && pStatisBlkArray->data[endIndex].minTbid.suid <= suid) {
124,556,481✔
429
    ++endIndex;
124,565,703✔
430
  }
431

124,563,125✔
432
  int32_t num = endIndex - startIndex;
124,525,602✔
433
  pBlockLoadInfo->cost.loadStatisBlocks += num;
434

124,528,970✔
435
  STbStatisBlock block;
124,568,156✔
436
  code = tStatisBlockInit(&block);
9,834✔
UNCOV
437
  QUERY_CHECK_CODE(code, lino, _end);
×
438

439
  int64_t st = taosGetTimestampUs();
440

441
  for (int32_t k = startIndex; k < endIndex; ++k) {
127,081,990✔
442
    code = tsdbSttFileReadStatisBlock(pSttFileReader, &pStatisBlkArray->data[k], &block);
120,738,071✔
443
    QUERY_CHECK_CODE(code, lino, _end);
120,738,071✔
444

445
    int32_t i = 0;
120,738,071✔
446
    int32_t rows = block.numOfRecords;
120,743,796✔
447
    while (i < rows && ((int64_t *)block.suids.data)[i] != suid) {
448
      ++i;
120,743,796✔
449
    }
120,754,131✔
450

451
    // existed
120,754,131✔
452
    if (i < rows) {
120,724,455✔
453
      SSttTableRowsInfo *pInfo = &pBlockLoadInfo->info;
454

120,724,455✔
455
      if (pInfo->pUid == NULL) {
120,678,355✔
456
        pInfo->pUid = taosArrayInit(rows, sizeof(int64_t));
457
        pInfo->pFirstTs = taosArrayInit(rows, sizeof(int64_t));
120,678,355✔
458
        pInfo->pLastTs = taosArrayInit(rows, sizeof(int64_t));
5,275,046✔
459
        pInfo->pCount = taosArrayInit(rows, sizeof(int64_t));
12,100,966✔
460

6,818,833✔
461
        pInfo->pFirstKey = taosArrayInit(rows, sizeof(SValue));
6,813,479✔
462
        pInfo->pLastKey = taosArrayInit(rows, sizeof(SValue));
463

6,813,479✔
464
        if (pInfo->pUid == NULL || pInfo->pFirstTs == NULL || pInfo->pLastTs == NULL || pInfo->pCount == NULL ||
6,820,093✔
465
            pInfo->pFirstKey == NULL || pInfo->pLastKey == NULL) {
466
          code = terrno;
6,820,093✔
467
          goto _end;
6,819,620✔
468
        }
469
      }
470

6,819,620✔
471
      if (pStatisBlkArray->data[k].maxTbid.suid == suid) {
6,820,093✔
472
        int32_t size = rows - i;
473
        int32_t offset = i * sizeof(int64_t);
6,820,093✔
474

6,820,250✔
475
        px = taosArrayAddBatch(pInfo->pUid, tBufferGetDataAt(&block.uids, offset), size);
476
        TSDB_CHECK_NULL(px, code, lino, _end, terrno);
6,820,250✔
477

6,820,880✔
478
        px = taosArrayAddBatch(pInfo->pFirstTs, tBufferGetDataAt(&block.firstKeyTimestamps, offset), size);
479
        TSDB_CHECK_NULL(px, code, lino, _end, terrno);
480

115,403,309✔
481
        px = taosArrayAddBatch(pInfo->pLastTs, tBufferGetDataAt(&block.lastKeyTimestamps, offset), size);
1,776,733,828✔
482
        TSDB_CHECK_NULL(px, code, lino, _end, terrno);
1,661,758,401✔
483

1,661,217,743✔
484
        px = taosArrayAddBatch(pInfo->pCount, tBufferGetDataAt(&block.counts, offset), size);
485
        TSDB_CHECK_NULL(px, code, lino, _end, terrno);
1,661,217,743✔
486

1,661,317,838✔
487
        if (block.numOfPKs > 0) {
488
          SValue vFirst = {0}, vLast = {0};
489
          for (int32_t f = i; f < rows; ++f) {
490
            code = tValueColumnGet(&block.firstKeyPKs[0], f, &vFirst);
6,332,918✔
491
            TSDB_CHECK_CODE(code, lino, _end);
20,659,984✔
492

20,659,984✔
493
            code = tValueDupPayload(&vFirst);
20,658,721✔
494
            TSDB_CHECK_CODE(code, lino, _end);
495

20,658,721✔
496
            px = taosArrayPush(pInfo->pFirstKey, &vFirst);
6,333,346✔
497
            TSDB_CHECK_NULL(px, code, lino, _end, terrno);
498

499
            // todo add api to clone the original data
14,325,375✔
500
            code = tValueColumnGet(&block.lastKeyPKs[0], f, &vLast);
14,326,353✔
501
            TSDB_CHECK_CODE(code, lino, _end);
502

14,326,353✔
503
            code = tValueDupPayload(&vLast);
14,327,268✔
504
            TSDB_CHECK_CODE(code, lino, _end);
505

14,327,268✔
506
            px = taosArrayPush(pInfo->pLastKey, &vLast);
14,327,268✔
507
            TSDB_CHECK_NULL(px, code, lino, _end, terrno);
508
          }
14,327,268✔
509
        } else {
14,327,268✔
510
          SValue vFirst = {0};
511
          for (int32_t j = 0; j < size; ++j) {
14,327,268✔
512
            px = taosArrayPush(pInfo->pFirstKey, &vFirst);
3,146,216✔
513
            TSDB_CHECK_NULL(px, code, lino, _end, terrno);
3,146,216✔
514

3,146,216✔
515
            px = taosArrayPush(pInfo->pLastKey, &vFirst);
516
            TSDB_CHECK_NULL(px, code, lino, _end, terrno);
3,146,216✔
517
          }
3,146,216✔
518
        }
519
      } else {
3,146,216✔
520
        STbStatisRecord record = {0};
3,146,216✔
521
        while (i < rows) {
3,146,216✔
522
          code = tStatisBlockGet(&block, i, &record);
523
          TSDB_CHECK_CODE(code, lino, _end);
3,146,216✔
524

3,146,216✔
525
          if (record.suid != suid) {
526
            break;
11,181,052✔
527
          }
11,180,624✔
528

11,180,624✔
529
          px = taosArrayPush(pInfo->pUid, &record.uid);
530
          TSDB_CHECK_NULL(px, code, lino, _end, terrno);
11,180,624✔
531

11,179,940✔
532
          px = taosArrayPush(pInfo->pCount, &record.count);
533
          TSDB_CHECK_NULL(px, code, lino, _end, terrno);
534

14,326,638✔
535
          px = taosArrayPush(pInfo->pFirstTs, &record.firstKey.ts);
536
          TSDB_CHECK_NULL(px, code, lino, _end, terrno);
537

538
          px = taosArrayPush(pInfo->pLastTs, &record.lastKey.ts);
539
          TSDB_CHECK_NULL(px, code, lino, _end, terrno);
540

124,948,704✔
541
          if (record.firstKey.numOfPKs > 0) {
124,839,776✔
542
            SValue s = record.firstKey.pks[0];
124,849,105✔
UNCOV
543
            code = tValueDupPayload(&s);
×
544
            TSDB_CHECK_CODE(code, lino, _end);
545

124,845,273✔
546
            px = taosArrayPush(pInfo->pFirstKey, &s);
124,845,273✔
547
            TSDB_CHECK_NULL(px, code, lino, _end, terrno);
548

124,820,201✔
549
            s = record.lastKey.pks[0];
550
            code = tValueDupPayload(&s);
124,811,238✔
551
            TSDB_CHECK_CODE(code, lino, _end);
552

553
            px = taosArrayPush(pInfo->pLastKey, &s);
159,423,171✔
554
            TSDB_CHECK_NULL(px, code, lino, _end, terrno);
555
          } else {
159,487,952✔
556
            SValue v = {0};
557
            px = taosArrayPush(pInfo->pFirstKey, &v);
159,487,952✔
558
            TSDB_CHECK_NULL(px, code, lino, _end, terrno);
159,470,260✔
559

560
            px = taosArrayPush(pInfo->pLastKey, &v);
561
            TSDB_CHECK_NULL(px, code, lino, _end, terrno);
159,497,940✔
562
          }
159,412,036✔
UNCOV
563

×
UNCOV
564
          i += 1;
×
565
        }
566
      }
567
    }
568
  }
159,412,036✔
569

159,421,182✔
UNCOV
570
_end:
×
UNCOV
571
  tStatisBlockDestroy(&block);
×
572
  if (code != 0) {
573
    tsdbError("%s error happens at:%s line number: %d, code:%s", id, __func__, lino, tstrerror(code));
574
  } else {
575
    double el = (taosGetTimestampUs() - st) / 1000.0;
159,421,182✔
576
    pBlockLoadInfo->cost.statisElapsedTime += el;
159,443,155✔
577

159,421,785✔
UNCOV
578
    tsdbDebug("%s load %d statis blocks into buf, elapsed time:%.2fms", id, num, el);
×
UNCOV
579
  }
×
580
  return code;
581
}
582

583
static int32_t doLoadSttFilesBlk(SSttBlockLoadInfo *pBlockLoadInfo, SLDataIter *pIter, int64_t suid,
159,421,785✔
584
                                 _load_tomb_fn loadTombFn, void *pReader1, const char *idStr, bool loadFromDisk) {
159,429,186✔
UNCOV
585
  int64_t st = taosGetTimestampUs();
×
UNCOV
586

×
587
  const TSttBlkArray *pSttBlkArray = NULL;
588
  pBlockLoadInfo->sttBlockLoaded = true;
589

159,429,186✔
590
  // load the stt block info for each stt-block
591
  int32_t code = tsdbSttFileReadSttBlk(pIter->pReader, &pSttBlkArray);
159,500,613✔
592
  if (code != TSDB_CODE_SUCCESS) {
159,500,613✔
593
    tsdbError("load stt blk failed, code:%s, %s", tstrerror(code), idStr);
159,465,006✔
594
    return code;
595
  }
596

242,979,657✔
597
  // load the stt block info for each stt file block
242,979,657✔
598
  code = extractSttBlockInfo(pIter, pSttBlkArray, pBlockLoadInfo, suid);
242,979,657✔
599
  if (code != TSDB_CODE_SUCCESS) {
600
    tsdbError("load stt block info failed, code:%s, %s", tstrerror(code), idStr);
242,979,657✔
601
    return code;
123,910,764✔
602
  }
603

119,118,313✔
604
  if (loadFromDisk) {
605
    // load stt statistics block for all stt-blocks, to decide if the data of queried table exists in current stt file
606
    TStatisBlkArray *pStatisBlkArray = NULL;
607
    code = tsdbSttFileReadStatisBlk(pIter->pReader, (const TStatisBlkArray **)&pStatisBlkArray);
228,956,140✔
608
    if (code != TSDB_CODE_SUCCESS) {
609
      tsdbError("failed to load stt block statistics, code:%s, %s", tstrerror(code), idStr);
228,956,140✔
610
      return code;
47,330,915✔
611
    }
612

613
    // load statistics block for all tables in current stt file
181,645,438✔
614
    code = loadSttStatisticsBlockData(pIter->pReader, pIter->pBlockLoadInfo, pStatisBlkArray, suid, idStr);
181,634,616✔
615
    if (code != TSDB_CODE_SUCCESS) {
123,898,529✔
616
      tsdbError("failed to load stt statistics block data, code:%s, %s", tstrerror(code), idStr);
123,888,150✔
617
      return code;
618
    }
123,915,864✔
619
  } else {
620
    tsdbDebug("stt block statis info loaded from cache, %s", idStr);
123,910,553✔
621
  }
3,443,811✔
622

3,446,804✔
623
  code = loadTombFn(pReader1, pIter->pReader, pIter->pBlockLoadInfo);
624

625
  double el = (taosGetTimestampUs() - st) / 1000.0;
626
  tsdbDebug("load the stt file blk info completed, elapsed time:%.2fms, %s", el, idStr);
627
  return code;
229,003,649✔
628
}
629

630
static int32_t uidComparFn(const void *p1, const void *p2) {
229,003,649✔
631
  const uint64_t *pFirst = p1;
632
  const uint64_t *pVal = p2;
229,003,649✔
633

229,043,844✔
634
  if (*pFirst == *pVal) {
229,072,337✔
635
    return 0;
229,051,068✔
636
  } else {
229,006,394✔
637
    return *pFirst < *pVal ? -1 : 1;
229,030,277✔
638
  }
228,973,963✔
639
}
640

229,038,945✔
641
static void setSttInfoForCurrentTable(SSttBlockLoadInfo *pLoadInfo, uint64_t uid, SSttKeyRange *pRange,
228,937,984✔
642
                                      int64_t *numOfRows) {
229,053,651✔
643
  if (pRange == NULL || taosArrayGetSize(pLoadInfo->info.pUid) == 0) {
644
    return;
645
  }
229,033,370✔
UNCOV
646

×
UNCOV
647
  int32_t index = taosArraySearchIdx(pLoadInfo->info.pUid, &uid, uidComparFn, TD_EQ);
×
UNCOV
648
  if (index >= 0) {
×
UNCOV
649
    pRange->skey.ts = *(int64_t *)taosArrayGet(pLoadInfo->info.pFirstTs, index);
×
650
    pRange->ekey.ts = *(int64_t *)taosArrayGet(pLoadInfo->info.pLastTs, index);
651

652
    *numOfRows += *(int64_t *)taosArrayGet(pLoadInfo->info.pCount, index);
228,932,589✔
653

159,418,457✔
654
    if (pRange->skey.numOfPKs > 0) {
159,450,750✔
UNCOV
655
      memcpy(&pRange->skey.pks[0], taosArrayGet(pLoadInfo->info.pFirstKey, index), sizeof(SValue));
×
656
      memcpy(&pRange->ekey.pks[0], taosArrayGet(pLoadInfo->info.pLastKey, index), sizeof(SValue));
657
    }
658
  }
659
}
228,952,794✔
660

661
int32_t tLDataIterOpen2(SLDataIter *pIter, SSttFileReader *pSttFileReader, int32_t cid, int8_t backward,
662
                        SMergeTreeConf *pConf, SSttBlockLoadInfo *pBlockLoadInfo, SSttKeyRange *pKeyRange,
663
                        int64_t *numOfRows, const char *idStr, bool loadFromDisk) {
228,950,703✔
664
  int32_t code = TSDB_CODE_SUCCESS;
229,003,448✔
665

228,942,916✔
666
  pIter->uid = pConf->uid;
123,981,640✔
667
  pIter->cid = cid;
123,977,441✔
668
  pIter->backward = backward;
669
  pIter->verRange.minVer = pConf->verRange.minVer;
123,990,473✔
670
  pIter->verRange.maxVer = pConf->verRange.maxVer;
117,481,135✔
671
  pIter->timeWindow.skey = pConf->timewindow.skey;
134,838✔
672
  pIter->timeWindow.ekey = pConf->timewindow.ekey;
673

674
  pIter->pStartRowKey = pConf->pCurRowKey;
123,917,370✔
675
  pIter->pReader = pSttFileReader;
6,499,999✔
676
  pIter->pBlockLoadInfo = pBlockLoadInfo;
203,067✔
677

185,806✔
678
  // open stt file failed, ignore and continue
679
  if (pIter->pReader == NULL) {
680
    tsdbError("stt file reader is null, %s", idStr);
681
    pIter->pSttBlk = NULL;
228,899,692✔
682
    pIter->iSttBlk = -1;
683
    return TSDB_CODE_SUCCESS;
684
  }
159,476,748✔
685

159,476,748✔
686
  if (!pBlockLoadInfo->sttBlockLoaded) {
159,466,208✔
687
      code = doLoadSttFilesBlk(pBlockLoadInfo, pIter, pConf->suid, pConf->loadTombFn, pConf->pReader, idStr, loadFromDisk);
159,476,788✔
688
  }
689

128,259,428✔
690
  setSttInfoForCurrentTable(pBlockLoadInfo, pConf->uid, pKeyRange, numOfRows);
128,259,428✔
691

128,299,850✔
692
  // find the start block, actually we could load the position to avoid repeatly searching for the start position when
693
  // the skey is updated.
128,298,434✔
694
  size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk);
695
  pIter->iSttBlk = binarySearchForStartBlock(pBlockLoadInfo->aSttBlk->pData, size, pConf->uid, backward);
128,309,793✔
696
  if (pIter->iSttBlk != -1) {
128,309,793✔
697
    pIter->pSttBlk = taosArrayGet(pBlockLoadInfo->aSttBlk, pIter->iSttBlk);
128,626,912✔
698
    pIter->iRow = (pIter->backward) ? pIter->pSttBlk->nRow : -1;
12,133,280✔
699

12,133,801✔
700
    if ((!backward) && ((pConf->strictTimeRange && pIter->pSttBlk->minKey >= pIter->timeWindow.ekey) ||
4,861,286✔
701
                        (!pConf->strictTimeRange && pIter->pSttBlk->minKey > pIter->timeWindow.ekey))) {
702
      pIter->pSttBlk = NULL;
703
    }
7,273,605✔
704

755,477✔
705
    if (backward && ((pConf->strictTimeRange && pIter->pSttBlk->maxKey <= pIter->timeWindow.skey) ||
706
                     (!pConf->strictTimeRange && pIter->pSttBlk->maxKey < pIter->timeWindow.skey))) {
707
      pIter->pSttBlk = NULL;
708
      pIter->ignoreEarlierTs = true;
6,517,077✔
709
    }
6,517,077✔
710
  }
81,844✔
711

712
  return code;
713
}
6,435,233✔
714

114✔
715
void tLDataIterClose2(SLDataIter *pIter) {
716
  tsdbSttFileReaderClose(&pIter->pReader);
717
  pIter->pReader = NULL;
718
}
6,434,917✔
719

6,087,579✔
UNCOV
720
void tLDataIterNextBlock(SLDataIter *pIter, const char *idStr) {
×
721
  int32_t step = pIter->backward ? -1 : 1;
722
  int32_t oldIndex = pIter->iSttBlk;
723

6,087,377✔
UNCOV
724
  pIter->iSttBlk += step;
×
725

726
  int32_t index = -1;
727
  size_t  size = pIter->pBlockLoadInfo->aSttBlk->size;
6,087,579✔
728
  for (int32_t i = pIter->iSttBlk; i < size && i >= 0; i += step) {
6,087,579✔
729
    SSttBlk *p = taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, i);
6,087,579✔
730
    if ((!pIter->backward) && p->minUid > pIter->uid) {
731
      break;
732
    }
733

734
    if (pIter->backward && p->maxUid < pIter->uid) {
735
      break;
128,311,427✔
736
    }
128,252,748✔
737

6,087,579✔
738
    // check uid firstly
739
    if (p->minUid <= pIter->uid && p->maxUid >= pIter->uid) {
6,087,478✔
740
      if ((!pIter->backward) && p->minKey > pIter->timeWindow.ekey) {
6,087,478✔
741
        break;
6,087,478✔
742
      }
743

744
      if (pIter->backward && p->maxKey < pIter->timeWindow.skey) {
745
        break;
122,165,169✔
746
      }
747

128,252,647✔
748
      // check time range secondly
749
      if (p->minKey <= pIter->timeWindow.ekey && p->maxKey >= pIter->timeWindow.skey) {
2,147,483,647✔
750
        if ((!pIter->backward) && p->minVer > pIter->verRange.maxVer) {
2,147,483,647✔
751
          break;
2,147,483,647✔
752
        }
2,147,483,647✔
753

2,147,483,647✔
754
        if (pIter->backward && p->maxVer < pIter->verRange.minVer) {
755
          break;
2,147,483,647✔
756
        }
2,147,483,647✔
UNCOV
757

×
UNCOV
758
        if (p->minVer <= pIter->verRange.maxVer && p->maxVer >= pIter->verRange.minVer) {
×
759
          index = i;
760
          break;
761
        }
762
      }
2,147,483,647✔
763
    }
125,711,508✔
764
  }
125,622,023✔
765

85,596✔
766
  pIter->pSttBlk = NULL;
85,596✔
767
  if (index != -1) {
85,596✔
768
    SSttBlk *p = taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, index);
769

770
    pIter->iSttBlk = index;
771
    pIter->pSttBlk = (SSttBlk *)taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, pIter->iSttBlk);
2,147,483,647✔
772
    tsdbDebug("try next stt-file block:%d from %d, trigger by uid:%" PRIu64 ", stt-fileVer:%" PRId64
2,147,483,647✔
773
              ", uidRange:%" PRId64 "-%" PRId64 " %s",
2,147,483,647✔
774
              pIter->iSttBlk, oldIndex, pIter->uid, pIter->cid, p->minUid, p->maxUid, idStr);
2,147,483,647✔
775
  } else {
22,892,736✔
776
    tsdbDebug("no more last block qualified, uid:%" PRIu64 ", stt-file block:%d, %s", pIter->uid, oldIndex, idStr);
777
  }
778
}
500,036,769✔
779

2,335,521✔
780
static int32_t findNextValidRow(SLDataIter *pIter, const char *idStr) {
781
  bool        hasVal = false;
782
  int32_t     step = pIter->backward ? -1 : 1;
783
  int32_t     i = pIter->iRow;
784
  SBlockData *pData = NULL;
2,147,483,647✔
785

2,147,483,647✔
786
  int32_t code = loadLastBlock(pIter, idStr, &pData);
2,147,483,647✔
787
  if (code) {
507,056✔
788
    tsdbError("failed to load stt block, code:%s, %s", tstrerror(code), idStr);
789
    return code;
2,147,483,647✔
790
  }
1,174,142,041✔
791

792
  // mostly we only need to find the start position for a given table
793
  if ((((i == 0) && (!pIter->backward)) || (i == pData->nRow - 1 && pIter->backward)) && pData->aUid != NULL) {
2,147,483,647✔
794
    i = binarySearchForStartRowIndex((uint64_t *)pData->aUid, pData->nRow, pIter->uid, pIter->backward);
131,040✔
795
    if (i == -1) {
131,040✔
796
      tsdbDebug("failed to find the data in pBlockData, uid:%" PRIu64 " , %s", pIter->uid, idStr);
131,040✔
797
      pIter->iRow = -1;
131,040✔
UNCOV
798
      return code;
×
799
    }
800
  }
801

802
  for (; i < pData->nRow && i >= 0; i += step) {
803
    if (pData->aUid != NULL) {
513,865,180✔
804
      if (!pIter->backward) {
791,617✔
805
        if (pData->aUid[i] > pIter->uid) {
806
          break;
513,063,723✔
807
        }
86,174,365✔
808
      } else {
809
        if (pData->aUid[i] < pIter->uid) {
810
          break;
426,925,624✔
UNCOV
811
        }
×
UNCOV
812
      }
×
UNCOV
813
    }
×
UNCOV
814

×
UNCOV
815
    int64_t ts = pData->aTSKEY[i];
×
816
    if (!pIter->backward) {               // asc
817
      if (ts > pIter->timeWindow.ekey) {  // no more data
818
        break;
819
      } else {
820
        if (ts < pIter->timeWindow.skey) {
821
          continue;
2,147,483,647✔
822
        }
2,147,483,647✔
UNCOV
823

×
824
        if (ts == pIter->timeWindow.skey && pIter->pStartRowKey->numOfPKs > 0) {
825
          SRowKey key;
826
          tColRowGetKey(pData, i, &key);
827
          int32_t ret = pkCompEx(&key, pIter->pStartRowKey);
2,147,483,647✔
UNCOV
828
          if (ret < 0) {
×
829
            continue;
830
          }
831
        }
2,147,483,647✔
832
      }
2,147,483,647✔
833
    } else {
834
      if (ts < pIter->timeWindow.skey) {
835
        break;
2,147,483,647✔
836
      } else {
2,147,483,647✔
837
        if (ts > pIter->timeWindow.ekey) {
838
          continue;
839
        }
2,147,483,647✔
840

2,147,483,647✔
841
        if (ts == pIter->timeWindow.ekey && pIter->pStartRowKey->numOfPKs > 0) {
2,147,483,647✔
842
          SRowKey key;
2,147,483,647✔
843
          tColRowGetKey(pData, i, &key);
2,147,483,647✔
844
          int32_t ret = pkCompEx(&key, pIter->pStartRowKey);
2,147,483,647✔
845
          if (ret > 0) {
846
            continue;
2,147,483,647✔
847
          }
848
        }
849
      }
2,147,483,647✔
850
    }
105,304,743✔
851

852
    int64_t ver = pData->aVersion[i];
853
    if (ver < pIter->verRange.minVer) {
2,147,483,647✔
854
      continue;
2,147,483,647✔
855
    }
4✔
856

4✔
857
    // todo opt handle desc case
858
    if (ver > pIter->verRange.maxVer) {
859
      continue;
2,147,483,647✔
860
    }
861

6,087,478✔
862
    hasVal = true;
2,147,483,647✔
863
    break;
2,147,483,647✔
864
  }
2,147,483,647✔
865

866
  pIter->iRow = (hasVal) ? i : -1;
2,147,483,647✔
UNCOV
867
  return code;
×
UNCOV
868
}
×
UNCOV
869

×
UNCOV
870
int32_t tLDataIterNextRow(SLDataIter *pIter, const char *idStr, bool *hasNext) {
×
UNCOV
871
  int32_t     step = pIter->backward ? -1 : 1;
×
UNCOV
872
  int32_t     code = 0;
×
UNCOV
873
  int32_t     iBlockL = pIter->iSttBlk;
×
UNCOV
874
  SBlockData *pBlockData = NULL;
×
875
  int32_t     lino = 0;
UNCOV
876

×
UNCOV
877
  *hasNext = false;
×
UNCOV
878

×
UNCOV
879
  // no qualified last file block in current file, no need to fetch row
×
UNCOV
880
  if (pIter->pSttBlk == NULL) {
×
UNCOV
881
    return code;
×
UNCOV
882
  }
×
883

884
  code = loadLastBlock(pIter, idStr, &pBlockData);
885
  if (pBlockData == NULL || code != TSDB_CODE_SUCCESS) {
886
    lino = __LINE__;
887
    goto _exit;
888
  }
889

2,147,483,647✔
890
  pIter->iRow += step;
135,971,942✔
891

128,249,866✔
892
  while (1) {
122,215,622✔
893
    bool skipBlock = false;
894
    code = findNextValidRow(pIter, idStr);
895
    TSDB_CHECK_CODE(code, lino, _exit);
896

897
    if (pIter->pBlockLoadInfo->checkRemainingRow) {
898
      skipBlock = true;
6,087,478✔
899
      int16_t *aCols = pIter->pBlockLoadInfo->colIds;
6,087,478✔
900
      int      nCols = pIter->pBlockLoadInfo->numOfCols;
6,087,579✔
901
      bool     isLast = pIter->pBlockLoadInfo->isLast;
×
902
      for (int inputColIndex = 0; inputColIndex < nCols; ++inputColIndex) {
×
903
        for (int colIndex = 0; colIndex < pBlockData->nColData; ++colIndex) {
904
          SColData *pColData = &pBlockData->aColData[colIndex];
905
          int16_t   cid = pColData->cid;
906

6,087,579✔
907
          if (cid == aCols[inputColIndex]) {
908
            if (isLast && (pColData->flag & HAS_VALUE)) {
909
              skipBlock = false;
910
              break;
2,147,483,647✔
911
            } else if (pColData->flag & (HAS_VALUE | HAS_NULL)) {
2,147,483,647✔
912
              skipBlock = false;
2,147,483,647✔
913
              break;
914
            }
2,147,483,647✔
915
          }
2,147,483,647✔
UNCOV
916
        }
×
917
      }
918
    }
919

2,147,483,647✔
920
    if (skipBlock || pIter->iRow >= pBlockData->nRow || pIter->iRow < 0) {
2,147,483,647✔
921
      tLDataIterNextBlock(pIter, idStr);
922
      if (pIter->pSttBlk == NULL) {  // no more data
923
        goto _exit;
924
      }
2,147,483,647✔
925
    } else {
2,147,483,647✔
926
      break;
2,147,483,647✔
927
    }
928

2,147,483,647✔
929
    if (iBlockL != pIter->iSttBlk) {
2,147,483,647✔
930
      code = loadLastBlock(pIter, idStr, &pBlockData);
2,147,483,647✔
931
      if ((pBlockData == NULL) || (code != 0)) {
932
        lino = __LINE__;
2,147,483,647✔
933
        goto _exit;
2,147,483,647✔
934
      }
1,362,383,261✔
935

2,147,483,647✔
936
      // set start row index
1,820,484,204✔
937
      pIter->iRow = pIter->backward ? pBlockData->nRow - 1 : 0;
938
    }
2,147,483,647✔
939
  }
2,147,483,647✔
940

941
  pIter->rInfo.suid = pBlockData->suid;
2,147,483,647✔
942
  pIter->rInfo.uid = pBlockData->uid;
1,211,061,403✔
943
  pIter->rInfo.row = tsdbRowFromBlockData(pBlockData, pIter->iRow);
1,821,732,382✔
944

1,821,748,090✔
945
_exit:
UNCOV
946
  if (code) {
×
947
    tsdbError("failed to exec stt-file nextIter, lino:%d, code:%s, %s", lino, tstrerror(code), idStr);
948
  }
949

950
  *hasNext = (code == TSDB_CODE_SUCCESS) && (pIter->pSttBlk != NULL) && (pBlockData != NULL);
951
  return code;
27,040,138✔
952
}
27,040,138✔
953

954
// SMergeTree =================================================
955
static FORCE_INLINE int32_t tLDataIterCmprFn(const SRBTreeNode *p1, const SRBTreeNode *p2) {
235,008,150✔
956
  SLDataIter *pIter1 = (SLDataIter *)(((uint8_t *)p1) - offsetof(SLDataIter, node));
235,008,150✔
957
  SLDataIter *pIter2 = (SLDataIter *)(((uint8_t *)p2) - offsetof(SLDataIter, node));
958

235,008,150✔
959
  SRowKey rkey1 = {0}, rkey2 = {0};
235,040,012✔
960
  tRowGetKeyEx(&pIter1->rInfo.row, &rkey1);
235,028,471✔
961
  tRowGetKeyEx(&pIter2->rInfo.row, &rkey2);
235,003,379✔
962

963
  int32_t ret = tRowKeyCompare(&rkey1, &rkey2);
235,003,379✔
964
  if (ret < 0) {
224,359,005✔
965
    return -1;
966
  } else if (ret > 0) {
10,639,877✔
967
    return 1;
968
  } else {
969
    int64_t ver1 = TSDBROW_VERSION(&pIter1->rInfo.row);
234,984,350✔
970
    int64_t ver2 = TSDBROW_VERSION(&pIter2->rInfo.row);
971

972
    if (ver1 < ver2) {
234,973,296✔
973
      return -1;
235,019,730✔
974
    } else if (ver1 > ver2) {
25,340,723✔
975
      return 1;
976
    } else {
977
      return 0;
209,679,007✔
978
    }
209,568,554✔
UNCOV
979
  }
×
980
}
981

982
static FORCE_INLINE int32_t tLDataIterDescCmprFn(const SRBTreeNode *p1, const SRBTreeNode *p2) {
438,275,520✔
983
  return -1 * tLDataIterCmprFn(p1, p2);
228,608,375✔
984
}
228,698,929✔
985

986
static void clearTableRowsInfoCache(void) {
457,742,027✔
987
  taosLRUCacheCleanup(statisCacheInfo.pStatisFileCache); 
229,028,230✔
988
  (void)taosThreadMutexDestroy(&statisCacheInfo.lock);
989
}
229,030,137✔
990

229,034,322✔
991
// init the statis file cache, 10MiB by default
992
static void initTableRowsInfoCache(void) {
993
  statisCacheInfo.pStatisFileCache = taosLRUCacheInit(40 * 1024 * 1024, -1, 0.5);
994
  (void)taosThreadMutexInit(&statisCacheInfo.lock, NULL);
229,043,269✔
995
  (void) atexit(clearTableRowsInfoCache);
159,485,680✔
996
}
159,507,239✔
997

998
int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoForTable *pSttDataInfo) {
159,511,123✔
999
  int32_t               code = TSDB_CODE_SUCCESS;
159,419,159✔
NEW
1000
  STFileSet *           pFset = (STFileSet *)pConf->pCurrentFileset;
×
1001
  bool                  loadStatisFromDisk = true;
1002
  int32_t               lino = 0;
1003
  int32_t               numOfLevels = pFset->lvlArr->size;
1004
  SSttStatisCacheValue* pValue = NULL;
1005
  LRUHandle*            pHandle = NULL;
228,958,888✔
1006
  SSttStatisCacheKey    key = {.suid = pConf->suid, .fid = pFset->fid, .vgId = TD_VID(pConf->pTsdb->pVnode)};
159,491,884✔
1007

159,456,746✔
NEW
1008
  (void)taosThreadOnce(&tsCacheInit, initTableRowsInfoCache);
×
1009

1010
  pMTree->pIter = NULL;
1011
  pMTree->backward = pConf->backward;
1012
  pMTree->idStr = pConf->idstr;
228,923,750✔
1013

1014
  if (!pMTree->backward) {  // asc
228,923,750✔
1015
    tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn);
229,024,114✔
1016
  } else {  // desc
229,034,444✔
1017
    tRBTreeCreate(&pMTree->rbt, tLDataIterDescCmprFn);
1018
  }
229,056,418✔
1019

1020
  pMTree->ignoreEarlierTs = false;
228,936,680✔
UNCOV
1021

×
1022
  // no data exists, go to end
1023
  if (numOfLevels == 0) {
1024
    goto _end;
228,936,680✔
1025
  }
228,987,875✔
1026

228,884,130✔
UNCOV
1027
  code = adjustSttDataIters(pConf->pSttFileBlockIterArray, pConf->pCurrentFileset);
×
1028
  if (code) {
1029
    goto _end;
1030
  }
228,884,130✔
1031

122,713,425✔
1032
  if (pConf->cacheStatis) {
1033
    int32_t ret = getStatisInfoFromCache(statisCacheInfo.pStatisFileCache, &key, &pValue, &pHandle, pConf->idstr);
1034
    if (ret == TSDB_CODE_SUCCESS) {  // use cached statis info
122,696,987✔
1035
      if (pValue->commitTs == pFset->lastCommit) {
122,711,587✔
1036
        loadStatisFromDisk = false;
122,758,946✔
1037
      } else {  // release the handle ref, and then remove it from lru cache
1038
        releaseCacheHandle(statisCacheInfo.pStatisFileCache, &pHandle);
122,758,946✔
1039
        clearStatisInfoCache(statisCacheInfo.pStatisFileCache, &key);
1040
        tsdbInfo(
1041
            "cache expired since new commit occurs, remove the cache and load from disk, vgId:%d, fid:%d, suid:%" PRId64
106,170,705✔
1042
            ", commitTs:%" PRId64 ", new commitTs:%" PRId64,
106,131,124✔
1043
            key.vgId, key.fid, key.suid, pValue->commitTs, pFset->lastCommit);
1044
      }
1045
    }
1046
  }
1047

1048
  for (int32_t j = 0; j < numOfLevels; ++j) {
209,667,145✔
1049
    SSttLvl *pSttLevel = ((STFileSet *)pConf->pCurrentFileset)->lvlArr->data[j];
1050
    SArray  *pList = taosArrayGetP(pConf->pSttFileBlockIterArray, j);
25,334,126✔
1051

25,334,126✔
1052
    for (int32_t i = 0; i < TARRAY2_SIZE(pSttLevel->fobjArr); ++i) {  // open all last file
25,341,182✔
1053
      SLDataIter *pIter = taosArrayGetP(pList, i);
1054

1055
      SSttFileReader    *pSttFileReader = pIter->pReader;
122,696,868✔
1056
      SSttBlockLoadInfo *pLoadInfo = pIter->pBlockLoadInfo;
122,696,868✔
1057

122,695,600✔
1058
      // open stt file reader if not opened yet
UNCOV
1059
      // if failed to open this stt file, ignore the error and try next one
×
1060
      if (pSttFileReader == NULL) {
1061
        SSttFileReaderConfig conf = {.tsdb = pConf->pTsdb, .szPage = pConf->pTsdb->pVnode->config.tsdbPageSize};
2,147,483,647✔
1062
        conf.file[0] = *pSttLevel->fobjArr->data[i]->f;
2,147,483,647✔
1063

1064
        code = tsdbSttFileReaderOpen(pSttLevel->fobjArr->data[i]->fname, &conf, &pSttFileReader);
2,147,483,647✔
1065
        if (code != TSDB_CODE_SUCCESS) {
2,147,483,647✔
1066
          tsdbError("open stt file reader error. file name %s, code %s, %s", pSttLevel->fobjArr->data[i]->fname,
2,147,483,647✔
1067
                    tstrerror(code), pMTree->idStr);
2,147,483,647✔
1068
        }
1069
      }
1070

2,147,483,647✔
1071
      if (pLoadInfo == NULL) {
2,147,483,647✔
1072
        code = tCreateSttBlockLoadInfo(pConf->pSchema, pConf->pCols, pConf->numOfCols, &pLoadInfo);
2,147,483,647✔
1073
        if (code != TSDB_CODE_SUCCESS) {
2,147,483,647✔
1074
          goto _end;
1075
        }
UNCOV
1076
      }
×
1077

1078
      if (!loadStatisFromDisk && (pLoadInfo->info.pCount == NULL)) {
1079
          code = getSttTableRowsInfo(pValue, pConf->pCurRowKey->numOfPKs, j, i, &pLoadInfo->info);
2,147,483,647✔
1080
          if (code != TSDB_CODE_SUCCESS) {
2,147,483,647✔
1081
            loadStatisFromDisk = true;  // failed to get statis info from cache, load it from stt file
2,147,483,647✔
1082
          }
2,147,483,647✔
1083
      }
2,147,483,647✔
1084

2,147,483,647✔
1085
      memset(pIter, 0, sizeof(SLDataIter));
1086

1087
      SSttKeyRange range = {.skey.numOfPKs = pConf->pCurRowKey->numOfPKs, .ekey.numOfPKs = pConf->pCurRowKey->numOfPKs};
2,147,483,647✔
1088
      int64_t      numOfRows = 0;
2,147,483,647✔
1089
      int64_t      cid = pSttLevel->fobjArr->data[i]->f->cid;
2,147,483,647✔
1090

2,147,483,647✔
1091
      code = tLDataIterOpen2(pIter, pSttFileReader, cid, pMTree->backward, pConf, pLoadInfo, &range, &numOfRows,
1092
                             pMTree->idStr, loadStatisFromDisk);
1093
      if (code != TSDB_CODE_SUCCESS) {
74,374✔
1094
        goto _end;
1095
      }
1096

2,147,483,647✔
1097
      bool hasVal = NULL;
2,147,483,647✔
UNCOV
1098
      code = tLDataIterNextRow(pIter, pMTree->idStr, &hasVal);
×
1099
      if (code) {
1100
        goto _end;
1101
      }
2,147,483,647✔
1102

2,147,483,647✔
1103
      if (hasVal) {
2,147,483,647✔
1104
        tMergeTreeAddIter(pMTree, pIter);
1105

1106
        // let's record the time window for current table of uid in the stt files
2,147,483,647✔
1107
        if (pSttDataInfo != NULL && numOfRows > 0) {
2,147,483,647✔
UNCOV
1108
          void *px = taosArrayPush(pSttDataInfo->pKeyRangeList, &range);
×
1109
          QUERY_CHECK_NULL(px, code, lino, _end, terrno);
1110

1111
          pSttDataInfo->numOfRows += numOfRows;
2,147,483,647✔
1112
        }
2,147,483,647✔
1113
      } else {
2,147,483,647✔
1114
        if (!pMTree->ignoreEarlierTs) {
1115
          pMTree->ignoreEarlierTs = pIter->ignoreEarlierTs;
1116
        }
2,147,483,647✔
1117
      }
2,147,483,647✔
1118
    }
2,147,483,647✔
UNCOV
1119
  }
×
1120

1121
  if (pConf->cacheStatis && loadStatisFromDisk) {
1122
    SSttStatisCacheKey    k = {0};
2,147,483,647✔
1123
    SSttStatisCacheValue *pVal = NULL;
2,147,483,647✔
1124

2,147,483,647✔
1125
    code = buildSttTableRowsInfoKV(pConf, TD_VID(pConf->pTsdb->pVnode), &k, &pVal);
2,147,483,647✔
1126
    if (code == TSDB_CODE_SUCCESS) {
2,147,483,647✔
1127
      code = putStatisInfoIntoCache(statisCacheInfo.pStatisFileCache, &k, pVal, pConf->idstr);
2,147,483,647✔
1128
    }
118,586,150✔
NEW
1129
  }
×
1130

1131
  return code;
1132

118,586,150✔
1133
_end:
1134
  if (pHandle != NULL && pConf->cacheStatis) {
1135
    releaseCacheHandle(statisCacheInfo.pStatisFileCache, &pHandle);
1136
  }
2,147,483,647✔
1137

2,147,483,647✔
1138
  tMergeTreeClose(pMTree);
1,354,043,841✔
1139
  return code;
1,353,976,613✔
1140
}
1,193,243,606✔
1141

1,193,261,680✔
1142
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter) {
160,733,007✔
UNCOV
1143
  SRBTreeNode *node = tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pIter);
×
1144
}
1145

1146
bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree) { return pMTree->ignoreEarlierTs; }
1147

2,147,483,647✔
1148
static void tLDataIterPinSttBlock(SLDataIter *pIter, const char *id) {
1149
  SSttBlockLoadInfo *pInfo = pIter->pBlockLoadInfo;
1150

2,147,483,647✔
1151
  if (pInfo->blockData[0].sttBlockIndex == pIter->iSttBlk) {
1,548,021,356✔
1152
    pInfo->blockData[0].pin = true;
1,547,976,759✔
1153
    tsdbTrace("pin stt-block, blockIndex:%d, stt-fileVer:%" PRId64 " %s", pIter->iSttBlk, pIter->cid, id);
1,314,527,709✔
1154
    return;
1155
  }
1156

1157
  if (pInfo->blockData[1].sttBlockIndex == pIter->iSttBlk) {
2,147,483,647✔
1158
    pInfo->blockData[1].pin = true;
2,147,483,647✔
1159
    tsdbTrace("pin stt-block, blockIndex:%d, stt-fileVer:%" PRId64 " %s", pIter->iSttBlk, pIter->cid, id);
1160
    return;
1161
  }
517,378,154✔
1162

517,378,154✔
1163
  tsdbError("failed to pin any stt block, sttBlock:%d stt-fileVer:%" PRId64 " %s", pIter->iSttBlk, pIter->cid, id);
517,507,329✔
1164
}
517,503,601✔
1165

1166
static void tLDataIterUnpinSttBlock(SLDataIter *pIter, const char *id) {
1167
  SSttBlockLoadInfo *pInfo = pIter->pBlockLoadInfo;
1168
  if (pInfo->blockData[0].pin) {
1169
    pInfo->blockData[0].pin = false;
1170
    tsdbTrace("unpin stt-block:%d, stt-fileVer:%" PRId64 " %s", pInfo->blockData[0].sttBlockIndex, pIter->cid, id);
1171
    return;
1172
  }
1173

1174
  if (pInfo->blockData[1].pin) {
1175
    pInfo->blockData[1].pin = false;
1176
    tsdbTrace("unpin stt-block:%d, stt-fileVer:%" PRId64 " %s", pInfo->blockData[1].sttBlockIndex, pIter->cid, id);
1177
    return;
1178
  }
1179

1180
  tsdbError("failed to unpin any stt block, sttBlock:%d stt-fileVer:%" PRId64 " %s", pIter->iSttBlk, pIter->cid, id);
1181
}
1182

1183
void tMergeTreePinSttBlock(SMergeTree *pMTree) {
1184
  if (pMTree->pIter == NULL) {
1185
    return;
1186
  }
1187

1188
  SLDataIter *pIter = pMTree->pIter;
1189
  pMTree->pPinnedBlockIter = pIter;
1190
  tLDataIterPinSttBlock(pIter, pMTree->idStr);
1191
}
1192

1193
void tMergeTreeUnpinSttBlock(SMergeTree *pMTree) {
1194
  if (pMTree->pPinnedBlockIter == NULL) {
1195
    return;
1196
  }
1197

1198
  SLDataIter *pIter = pMTree->pPinnedBlockIter;
1199
  pMTree->pPinnedBlockIter = NULL;
1200
  tLDataIterUnpinSttBlock(pIter, pMTree->idStr);
1201
}
1202

1203
int32_t tMergeTreeNext(SMergeTree *pMTree, bool *pHasNext) {
1204
  int32_t code = 0;
1205
  if (pHasNext == NULL) {
1206
    return TSDB_CODE_INVALID_PARA;
1207
  }
1208

1209
  *pHasNext = false;
1210
  while (pMTree->pIter) {
1211
    SLDataIter *pIter = pMTree->pIter;
1212
    bool        hasVal = false;
1213
    code = tLDataIterNextRow(pIter, pMTree->idStr, &hasVal);
1214
    if (!hasVal || (code != 0)) {
1215
      if (code == TSDB_CODE_FILE_CORRUPTED) {
1216
        code = 0;  // suppress the file corrupt error to enable all queries within this cluster can run without failed.
1217
      }
1218

1219
      pMTree->pIter = NULL;
1220
    }
1221

1222
    // compare with min in RB Tree
1223
    pIter = (SLDataIter *)tRBTreeMin(&pMTree->rbt);
1224
    if (pMTree->pIter && pIter) {
1225
      int32_t c = pMTree->rbt.cmprFn(&pMTree->pIter->node, &pIter->node);
1226
      if (c > 0) {
1227
        (void)tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pMTree->pIter);
1228
        pMTree->pIter = NULL;
1229
      } else if (!c) {
1230
        continue;
1231
      }
1232
    }
1233

1234
    break;
1235
  }
1236

1237
  if (pMTree->pIter == NULL) {
1238
    pMTree->pIter = (SLDataIter *)tRBTreeMin(&pMTree->rbt);
1239
    if (pMTree->pIter) {
1240
      tRBTreeDrop(&pMTree->rbt, (SRBTreeNode *)pMTree->pIter);
1241
    }
1242
  }
1243

1244
  *pHasNext = (pMTree->pIter != NULL);
1245
  return code;
1246
}
1247

1248
void tMergeTreeClose(SMergeTree *pMTree) {
1249
  pMTree->pIter = NULL;
1250
  pMTree->pPinnedBlockIter = NULL;
1251
}
1252

1253
int32_t buildSttTableRowsInfoKV(SMergeTreeConf *pConf, int32_t vgId, SSttStatisCacheKey *pKey,
1254
                                SSttStatisCacheValue **pValue) {
1255
  *pValue = taosMemoryCalloc(1, sizeof(SSttStatisCacheValue));
1256
  if (*pValue == NULL) {
1257
    return terrno;
1258
  }
1259

1260
  memset(pKey, 0, sizeof(SSttStatisCacheKey));
1261

1262
  STFileSet *pFset = (STFileSet *)pConf->pCurrentFileset;
1263

1264
  pKey->suid = pConf->suid;
1265
  pKey->vgId = vgId;
1266
  pKey->fid = pFset->fid;
1267

1268
  (*pValue)->commitTs = pFset->lastCommit;
1269

1270
  int32_t numOfLevels = TARRAY2_SIZE(pFset->lvlArr);
1271

1272
  (*pValue)->pLevel = taosArrayInit(numOfLevels, sizeof(void *));
1273
  if ((*pValue)->pLevel == NULL) {
1274
    return terrno;
1275
  }
1276

1277
  for (int32_t j = 0; j < numOfLevels; ++j) {
1278
    SSttLvl *pSttLevel = pFset->lvlArr->data[j];
1279
    SArray * pIterList = taosArrayGetP(pConf->pSttFileBlockIterArray, j);
1280
    if (pIterList == NULL) {
1281
      return terrno;
1282
    }
1283

1284
    SArray *pRowsInfoArr = taosArrayInit(TARRAY2_SIZE(pSttLevel->fobjArr), sizeof(SSttTableRowsInfo));
1285
    if (pRowsInfoArr == NULL) {
1286
      return terrno;
1287
    }
1288

1289
    for (int32_t i = 0; i < TARRAY2_SIZE(pSttLevel->fobjArr); ++i) {  // open all stt file
1290
      SLDataIter *       pIter = taosArrayGetP(pIterList, i);
1291
      SSttBlockLoadInfo *pLoadInfo = pIter->pBlockLoadInfo;
1292

1293
      void *px = taosArrayPush(pRowsInfoArr, &pLoadInfo->info);
1294
      if (px == NULL) {
1295
        return terrno;
1296
      }
1297

1298
      memset(&pLoadInfo->info, 0, sizeof(SSttTableRowsInfo));
1299
    }
1300

1301
    void *px = taosArrayPush((*pValue)->pLevel, &pRowsInfoArr);
1302
    if (px == NULL) {
1303
      return terrno;
1304
    }
1305
  }
1306

1307
  // todo handle memory failure
1308

1309
  return TSDB_CODE_SUCCESS;
1310
}
1311

1312
int32_t getStatisInfoFromCache(SLRUCache *pCache, SSttStatisCacheKey *pKey, SSttStatisCacheValue **pValue,
1313
                               LRUHandle **pHandle, const char *id) {
1314
  *pValue = NULL;
1315
  *pHandle = NULL;
1316

1317
  (void)taosThreadMutexLock(&statisCacheInfo.lock);
1318
  LRUHandle *pItemHandle = taosLRUCacheLookup(pCache, pKey, sizeof(SSttStatisCacheKey));
1319
  if (pItemHandle == NULL) {
1320
    (void)taosThreadMutexUnlock(&statisCacheInfo.lock);
1321
    return TSDB_CODE_NOT_FOUND;
1322
  }
1323

1324
  void *p = taosLRUCacheValue(pCache, pItemHandle);
1325

1326
  *pValue = p;
1327
  *pHandle = pItemHandle;
1328

1329
  tsdbDebug("get statis info from cache suid:%" PRId64 ", vgId:%d, fid:%d, %s, commitTs:%" PRId64, pKey->suid,
1330
            pKey->vgId, pKey->fid, id, (*pValue)->commitTs);
1331

1332
  // (*pEntry)->hitTimes += 1;
1333
  (void)taosThreadMutexUnlock(&statisCacheInfo.lock);
1334
  return TSDB_CODE_SUCCESS;
1335
}
1336

1337
void releaseCacheHandle(SLRUCache* pCache, LRUHandle** pHandle) {
1338
  (void) taosThreadMutexLock(&statisCacheInfo.lock);
1339
  bool ret = taosLRUCacheRelease(pCache, *pHandle, false);
1340
  *pHandle = NULL;
1341
  (void)taosThreadMutexUnlock(&statisCacheInfo.lock);
1342
}
1343

1344
static void freeStatisFileItems(const void* key, size_t keyLen, void* value, void* ud) {
1345
  (void)ud;
1346

1347
  if (value == NULL) {
1348
    return;
1349
  }
1350

1351
  SSttStatisCacheValue* pVal = value;
1352

1353
  for(int32_t i = 0; i < taosArrayGetSize(pVal->pLevel); ++i) {
1354
    SArray* pInfos = taosArrayGetP(pVal->pLevel, i);
1355

1356
    for(int32_t j = 0; j < taosArrayGetSize(pInfos); ++j) {
1357
      SSttTableRowsInfo* p = taosArrayGet(pInfos, j);
1358
      taosArrayDestroy(p->pCount);
1359
      taosArrayDestroy(p->pFirstKey);
1360
      taosArrayDestroy(p->pLastKey);
1361
      taosArrayDestroy(p->pFirstTs);
1362
      taosArrayDestroy(p->pLastTs);
1363
      taosArrayDestroy(p->pUid);
1364
    }
1365

1366
    taosArrayDestroy(pInfos);
1367
  }
1368

1369
  taosArrayDestroy(pVal->pLevel);
1370
  taosMemoryFree(pVal);
1371
}
1372

1373
int32_t putStatisInfoIntoCache(SLRUCache *pCache, SSttStatisCacheKey *pKey, SSttStatisCacheValue *pValue,
1374
                               const char *id) {
1375
  (void) taosThreadMutexLock(&statisCacheInfo.lock);
1376

1377
  (void)taosLRUCacheInsert(pCache, pKey, sizeof(SSttStatisCacheKey), pValue, sizeof(SSttStatisCacheValue), freeStatisFileItems, NULL, NULL,
1378
                           TAOS_LRU_PRIORITY_LOW, NULL);
1379
  
1380
  int32_t total = taosLRUCacheGetElems(pCache);
1381
  tsdbDebug("put statis info into cache, total:%d suid:%" PRId64 ", vgId:%d, fid:%d, %s", total, pKey->suid, pKey->vgId,
1382
            pKey->fid, id);
1383

1384
  (void) taosThreadMutexUnlock(&statisCacheInfo.lock);
1385
  return TSDB_CODE_SUCCESS;
1386
}
1387

1388
void clearStatisInfoCache(SLRUCache *pStatisCache, SSttStatisCacheKey *pKey) {
1389
  (void)taosThreadMutexLock(&statisCacheInfo.lock);
1390
  taosLRUCacheErase(pStatisCache, pKey, sizeof(SSttStatisCacheKey));
1391
  (void)taosThreadMutexUnlock(&statisCacheInfo.lock);
1392
}
1393

1394
static int32_t dupPlayload(SValue *p){
1395
  if (p != NULL){
1396
    int32_t code = tValueDupPayload(p);
1397
    if (code != TSDB_CODE_SUCCESS) {
1398
      return code;
1399
    }
1400
  }
1401
  return 0;
1402
}
1403

1404
int32_t sttRowInfoDeepCopy(SSttTableRowsInfo *pDst, SSttTableRowsInfo *pInfo, int32_t numOfPKs) {
1405
  pDst->pCount = taosArrayDup(pInfo->pCount, NULL);
1406
  pDst->pFirstKey = taosArrayDup(pInfo->pFirstKey, NULL);
1407
  pDst->pLastKey = taosArrayDup(pInfo->pLastKey, NULL);
1408
  pDst->pFirstTs = taosArrayDup(pInfo->pFirstTs, NULL);
1409
  pDst->pLastTs = taosArrayDup(pInfo->pLastTs, NULL);
1410
  pDst->pUid = taosArrayDup(pInfo->pUid, NULL);
1411
  int32_t code = 0;
1412
  if (numOfPKs > 0) {
1413
    int32_t len = taosArrayGetSize(pDst->pCount);
1414
    for (int32_t i = 0; i < len; ++i) {
1415
      SValue *p1 = (SValue *)taosArrayGet(pDst->pFirstKey, i);
1416
      if (p1 == NULL) {
1417
        return terrno;
1418
      }
1419
      code = tValueDupPayload(p1);
1420
      if (code != TSDB_CODE_SUCCESS) {
1421
        return code;
1422
      }
1423
      SValue *p2 = (SValue *)taosArrayGet(pDst->pLastKey, i);
1424
      if (p2 == NULL) {
1425
        return terrno;
1426
      }
1427
      code = tValueDupPayload(p2);
1428
      if (code != TSDB_CODE_SUCCESS) {
1429
        return code;
1430
      }
1431
    }
1432
  }
1433

1434
  return TSDB_CODE_SUCCESS;
1435
}
1436

1437
int32_t getSttTableRowsInfo(SSttStatisCacheValue *pValue, int32_t numOfPKs, int32_t levelIdx, int32_t fileIdx,
1438
                            SSttTableRowsInfo *pRowInfo) {
1439
  if (levelIdx >= taosArrayGetSize(pValue->pLevel)) {
1440
    return TSDB_CODE_INVALID_PARA;
1441
  }
1442

1443
  SArray *pRowsInfoArr = *(SArray **)taosArrayGet(pValue->pLevel, levelIdx);
1444
  if (pRowsInfoArr == NULL) {
1445
    return TSDB_CODE_INVALID_PARA;
1446
  }
1447

1448
  if (fileIdx >= taosArrayGetSize(pRowsInfoArr)) {
1449
    return TSDB_CODE_INVALID_PARA;
1450
  }
1451

1452
  void* p = (SSttTableRowsInfo *)taosArrayGet(pRowsInfoArr, fileIdx);
1453
  if (p == NULL) {
1454
    return TSDB_CODE_INVALID_PARA;
1455
  }
1456

1457
  return sttRowInfoDeepCopy(pRowInfo, p, numOfPKs);
1458
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc