• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5064

17 May 2026 01:15AM UTC coverage: 73.37% (-0.02%) from 73.388%
#5064

push

travis-ci

web-flow
feat (TDgpt): Dynamic Model Synchronization Enhancements (#35344)

* refactor: do some internal refactor.

* fix: fix multiprocess sync issue.

* feat: add dynamic anomaly detection and forecasting services

* fix: log error message for undeploying model in exception handling

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* fix: handle undeploy when model exists only on disk

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/286aafa0-c3ce-4c27-b803-2707571e9dc1

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: guard dynamic registry concurrent access

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: tighten service list locking scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: restore prophet support and update tests per review feedback

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: improve test name and move copy inside lock scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* Potential fix for pull request finding

Co-au... (continued)

281592 of 383795 relevant lines covered (73.37%)

135078241.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.24
/source/dnode/vnode/src/tsdb/tsdbMergeTree.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tlrucache.h"
17
#include "tsdb.h"
18
#include "tsdbFSet2.h"
19
#include "tsdbMerge.h"
20
#include "tsdbReadUtil.h"
21
#include "tsdbSttFileRW.h"
22
#include "ttypes.h"
23

24
typedef struct SSttStatisCacheKey {
25
  int64_t suid;
26
  int32_t vgId;
27
  int32_t fid;
28
} SSttStatisCacheKey;
29

30
typedef struct SSttStatisCacheValue {
31
  int64_t commitTs;
32
  SArray *pLevel;  // SArray<SArray<SSttTableRowsInfo>>
33
} SSttStatisCacheValue;
34

35
typedef struct SSttStatisFileCacheInfo {
36
  SLRUCache *    pStatisFileCache;
37
  TdThreadMutex  lock;
38
} SSttStatisFileCacheInfo;
39

40
static SSttStatisFileCacheInfo statisCacheInfo;
41
static TdThreadOnce tsCacheInit = PTHREAD_ONCE_INIT;
42

43
static int32_t getSttTableRowsInfo(SSttStatisCacheValue *pValue, int32_t numOfPKs, int32_t levelIdx, int32_t fileIdx,
44
                                   SSttTableRowsInfo *pSttTableRowsInfo);
45
static int32_t buildSttTableRowsInfoKV(SMergeTreeConf *pConf, int32_t vgId, SSttStatisCacheKey *pKey,
46
                                       SSttStatisCacheValue **pValue);
47
static void    clearStatisInfoCache(SLRUCache *pStatisCache, SSttStatisCacheKey *pKey, LRUHandle **pHandle);
48
static int32_t putStatisInfoIntoCache(SLRUCache *pCache, SSttStatisCacheKey *pKey, SSttStatisCacheValue *pValue,
49
                                      const char *id);
50
static int32_t getStatisInfoFromCache(SLRUCache *pCache, SSttStatisCacheKey *pKey, SSttStatisCacheValue **pValue,
51
                                      LRUHandle **pHandle, const char *id);
52
static void    releaseCacheHandle(SLRUCache *pCache, LRUHandle **pHandle, bool lock);
53
static void    freeStatisFileItems(const void *key, size_t keyLen, void *value, void *ud);
54
static int32_t getCacheValueSize(const SSttStatisCacheValue* pValue);
55

56
static void tLDataIterClose2(SLDataIter *pIter);
57

58
// SLDataIter =================================================
59
int32_t tCreateSttBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols, SSttBlockLoadInfo **pInfo) {
579,856,058✔
60
  *pInfo = NULL;
579,856,058✔
61

62
  SSttBlockLoadInfo *pLoadInfo = taosMemoryCalloc(1, sizeof(SSttBlockLoadInfo));
580,166,699✔
63
  if (pLoadInfo == NULL) {
579,532,661✔
64
    return terrno;
×
65
  }
66

67
  pLoadInfo->blockData[0].sttBlockIndex = -1;
579,532,661✔
68
  pLoadInfo->blockData[1].sttBlockIndex = -1;
579,567,063✔
69

70
  pLoadInfo->currentLoadBlockIndex = 1;
579,611,905✔
71

72
  int32_t code = tBlockDataCreate(&pLoadInfo->blockData[0].data);
579,631,751✔
73
  if (code) {
580,272,301✔
74
    taosMemoryFreeClear(pLoadInfo);
×
75
    return code;
×
76
  }
77

78
  code = tBlockDataCreate(&pLoadInfo->blockData[1].data);
580,272,301✔
79
  if (code) {
580,338,406✔
80
    taosMemoryFreeClear(pLoadInfo);
×
81
    return code;
×
82
  }
83

84
  pLoadInfo->aSttBlk = taosArrayInit(4, sizeof(SSttBlk));
580,338,406✔
85
  if (pLoadInfo->aSttBlk == NULL) {
580,356,907✔
86
    taosMemoryFreeClear(pLoadInfo);
×
87
    return terrno;
×
88
  }
89

90
  pLoadInfo->pSchema = pSchema;
580,288,862✔
91
  pLoadInfo->colIds = colList;
580,367,379✔
92
  pLoadInfo->numOfCols = numOfCols;
580,318,005✔
93

94
  *pInfo = pLoadInfo;
580,445,004✔
95
  return code;
580,428,058✔
96
}
97

98
static void freeItem(void *pValue) {
2,147,483,647✔
99
  SValue *p = (SValue *)pValue;
2,147,483,647✔
100
  if (IS_VAR_DATA_TYPE(p->type) || p->type == TSDB_DATA_TYPE_DECIMAL) {
2,147,483,647✔
101
    taosMemoryFree(p->pData);
128,369,472✔
102
  }
103
}
2,147,483,647✔
104

105
void destroySttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
580,138,460✔
106
  if (pLoadInfo == NULL) {
580,138,460✔
107
    return;
×
108
  }
109

110
  pLoadInfo->currentLoadBlockIndex = 1;
580,138,460✔
111

112
  SBlockDataInfo *pInfo = &pLoadInfo->blockData[0];
580,458,421✔
113
  tBlockDataDestroy(&pInfo->data);
580,304,512✔
114
  pInfo->sttBlockIndex = -1;
580,284,410✔
115
  pInfo->pin = false;
580,451,317✔
116

117
  pInfo = &pLoadInfo->blockData[1];
580,345,611✔
118
  tBlockDataDestroy(&pInfo->data);
580,159,131✔
119
  pInfo->sttBlockIndex = -1;
580,529,847✔
120
  pInfo->pin = false;
580,548,536✔
121

122
  taosArrayDestroy(pLoadInfo->info.pUid);
580,519,296✔
123
  taosArrayDestroy(pLoadInfo->info.pCount);
580,298,916✔
124
  taosArrayDestroy(pLoadInfo->info.pFirstTs);
580,461,129✔
125
  taosArrayDestroy(pLoadInfo->info.pLastTs);
580,496,764✔
126

127
  taosArrayDestroyEx(pLoadInfo->info.pFirstKey, freeItem);
580,413,117✔
128
  taosArrayDestroyEx(pLoadInfo->info.pLastKey, freeItem);
580,241,496✔
129

130
  pLoadInfo->info.pUid = NULL;
580,251,408✔
131
  pLoadInfo->info.pFirstKey = NULL;
580,487,621✔
132
  pLoadInfo->info.pLastKey = NULL;
580,497,455✔
133
  pLoadInfo->info.pCount = NULL;
580,510,694✔
134
  pLoadInfo->info.pFirstTs = NULL;
580,273,250✔
135
  pLoadInfo->info.pLastTs = NULL;
580,578,971✔
136

137
  taosArrayDestroy(pLoadInfo->aSttBlk);
580,540,200✔
138
  taosMemoryFree(pLoadInfo);
580,418,721✔
139
}
140

141
void destroyLDataIter(SLDataIter *pIter) {
580,390,896✔
142
  tLDataIterClose2(pIter);
580,390,896✔
143
  destroySttBlockLoadInfo(pIter->pBlockLoadInfo);
580,349,903✔
144
  taosMemoryFree(pIter);
580,250,172✔
145
}
580,374,531✔
146

147
void destroySttBlockReader(SArray *pLDataIterArray, SSttBlockLoadCostInfo *pLoadCost) {
936,119,416✔
148
  if (pLDataIterArray == NULL) {
936,119,416✔
149
    return;
9,940,279✔
150
  }
151

152
  int32_t numOfLevel = taosArrayGetSize(pLDataIterArray);
926,179,137✔
153
  for (int32_t i = 0; i < numOfLevel; ++i) {
1,506,587,964✔
154
    SArray *pList = taosArrayGetP(pLDataIterArray, i);
580,201,376✔
155
    for (int32_t j = 0; j < taosArrayGetSize(pList); ++j) {
1,160,681,656✔
156
      SLDataIter *pIter = taosArrayGetP(pList, j);
580,568,293✔
157
      if (pIter->pBlockLoadInfo != NULL) {
580,551,211✔
158
        SSttBlockLoadCostInfo *pCost = &pIter->pBlockLoadInfo->cost;
580,569,782✔
159
        if (pLoadCost != NULL) {
580,567,539✔
160
          pLoadCost->loadBlocks += pCost->loadBlocks;
579,435,999✔
161
          pLoadCost->loadStatisBlocks += pCost->loadStatisBlocks;
579,459,923✔
162
          pLoadCost->blockElapsedTime += pCost->blockElapsedTime;
579,455,156✔
163
          pLoadCost->statisElapsedTime += pCost->statisElapsedTime;
579,456,042✔
164
        }
165
      }
166

167
      destroyLDataIter(pIter);
580,548,750✔
168
    }
169

170
    taosArrayDestroy(pList);
580,074,083✔
171
  }
172

173
  taosArrayDestroy(pLDataIterArray);
926,386,588✔
174
}
175

176
// choose the unpinned slot to load next data block
177
static void updateBlockLoadSlot(SSttBlockLoadInfo *pLoadInfo) {
277,833,628✔
178
  int32_t nextSlotIndex = pLoadInfo->currentLoadBlockIndex ^ 1;
277,833,628✔
179
  if (pLoadInfo->blockData[nextSlotIndex].pin) {
277,931,110✔
180
    nextSlotIndex = nextSlotIndex ^ 1;
×
181
  }
182

183
  pLoadInfo->currentLoadBlockIndex = nextSlotIndex;
277,828,128✔
184
}
277,838,169✔
185

186
static int32_t loadLastBlock(SLDataIter *pIter, const char *idStr, SBlockData **pResBlock) {
2,147,483,647✔
187
  if (pResBlock != NULL) {
2,147,483,647✔
188
    *pResBlock = NULL;
2,147,483,647✔
189
  }
190

191
  int32_t            code = 0;
2,147,483,647✔
192
  SSttBlockLoadInfo *pInfo = pIter->pBlockLoadInfo;
2,147,483,647✔
193

194
  if (pInfo->blockData[0].sttBlockIndex == pIter->iSttBlk) {
2,147,483,647✔
195
    if (pInfo->currentLoadBlockIndex != 0) {
2,147,483,647✔
196
      tsdbDebug("current load index is set to 0, block index:%d, fileVer:%" PRId64 ", due to uid:%" PRIu64
46,613✔
197
                ", load data, %s",
198
                pIter->iSttBlk, pIter->cid, pIter->uid, idStr);
199
      pInfo->currentLoadBlockIndex = 0;
46,613✔
200
    }
201

202
    *pResBlock = &pInfo->blockData[0].data;
2,147,483,647✔
203
    return code;
2,147,483,647✔
204
  }
205

206
  if (pInfo->blockData[1].sttBlockIndex == pIter->iSttBlk) {
2,147,483,647✔
207
    if (pInfo->currentLoadBlockIndex != 1) {
2,147,483,647✔
208
      tsdbDebug("current load index is set to 1, block index:%d, fileVer:%" PRId64 ", due to uid:%" PRIu64
40,308✔
209
                ", load data, %s",
210
                pIter->iSttBlk, pIter->cid, pIter->uid, idStr);
211
      pInfo->currentLoadBlockIndex = 1;
40,308✔
212
    }
213

214
    *pResBlock = &pInfo->blockData[1].data;
2,147,483,647✔
215
    return code;
2,147,483,647✔
216
  }
217

218
  if (pIter->pSttBlk == NULL || pInfo->pSchema == NULL) {
277,762,004✔
219
    return code;
×
220
  }
221

222
  updateBlockLoadSlot(pInfo);
277,883,091✔
223
  int64_t st = taosGetTimestampUs();
277,917,780✔
224

225
  SBlockData *pBlock = &pInfo->blockData[pInfo->currentLoadBlockIndex].data;
277,917,780✔
226
  code = tsdbSttFileReadBlockDataByColumn(pIter->pReader, pIter->pSttBlk, pBlock, pInfo->pSchema, &pInfo->colIds[1],
277,795,106✔
227
                                          pInfo->numOfCols - 1);
277,865,027✔
228
  if (code != TSDB_CODE_SUCCESS) {
277,907,110✔
229
    return code;
×
230
  }
231

232
  double el = (taosGetTimestampUs() - st) / 1000.0;
277,944,625✔
233
  pInfo->cost.blockElapsedTime += el;
277,944,625✔
234
  pInfo->cost.loadBlocks += 1;
277,932,520✔
235

236
  tsdbDebug("read stt block, total load:%" PRId64 ", trigger by uid:%" PRIu64 ", stt-fileVer:%" PRId64
277,943,294✔
237
            ", last block index:%d, entry:%d, rows:%d, uidRange:%" PRId64 "-%" PRId64 " tsRange:%" PRId64 "-%" PRId64
238
            " %p, elapsed time:%.2f ms, %s",
239
            pInfo->cost.loadBlocks, pIter->uid, pIter->cid, pIter->iSttBlk, pInfo->currentLoadBlockIndex, pBlock->nRow,
240
            pIter->pSttBlk->minUid, pIter->pSttBlk->maxUid, pIter->pSttBlk->minKey, pIter->pSttBlk->maxKey, pBlock, el,
241
            idStr);
242

243
  pInfo->blockData[pInfo->currentLoadBlockIndex].sttBlockIndex = pIter->iSttBlk;
277,943,759✔
244
  pIter->iRow = (pIter->backward) ? pInfo->blockData[pInfo->currentLoadBlockIndex].data.nRow : -1;
277,995,049✔
245

246
  tsdbDebug("last block index list:%d, %d, rowIndex:%d %s", pInfo->blockData[0].sttBlockIndex,
277,954,784✔
247
            pInfo->blockData[1].sttBlockIndex, pIter->iRow, idStr);
248

249
  *pResBlock = &pInfo->blockData[pInfo->currentLoadBlockIndex].data;
277,954,784✔
250
  return code;
277,927,183✔
251
}
252

253
// find the earliest block that contains the required records
254
static FORCE_INLINE int32_t findEarliestIndex(int32_t index, uint64_t uid, const SSttBlk *pBlockList, int32_t num,
255
                                              int32_t backward) {
256
  int32_t i = index;
365,123,536✔
257
  int32_t step = backward ? 1 : -1;
365,123,536✔
258
  while (i >= 0 && i < num && uid >= pBlockList[i].minUid && uid <= pBlockList[i].maxUid) {
742,563,313✔
259
    i += step;
377,439,777✔
260
  }
261
  return i - step;
365,073,353✔
262
}
263

264
static int32_t binarySearchForStartBlock(SSttBlk *pBlockList, int32_t num, uint64_t uid, int32_t backward) {
978,305,983✔
265
  int32_t midPos = -1;
978,305,983✔
266
  if (num <= 0) {
978,305,983✔
267
    return -1;
513,370,030✔
268
  }
269

270
  int32_t firstPos = 0;
464,935,953✔
271
  int32_t lastPos = num - 1;
464,935,953✔
272

273
  // find the first position which is bigger than the key
274
  if ((uid > pBlockList[lastPos].maxUid) || (uid < pBlockList[firstPos].minUid)) {
464,935,953✔
275
    return -1;
100,016,735✔
276
  }
277

278
  while (1) {
41,274,055✔
279
    if (uid >= pBlockList[firstPos].minUid && uid <= pBlockList[firstPos].maxUid) {
406,367,187✔
280
      return findEarliestIndex(firstPos, uid, pBlockList, num, backward);
342,535,736✔
281
    }
282

283
    if (uid > pBlockList[lastPos].maxUid || uid < pBlockList[firstPos].minUid) {
63,712,542✔
284
      return -1;
57,794✔
285
    }
286

287
    int32_t numOfRows = lastPos - firstPos + 1;
63,806,610✔
288
    midPos = (numOfRows >> 1u) + firstPos;
63,806,610✔
289

290
    if (uid < pBlockList[midPos].minUid) {
63,806,610✔
291
      lastPos = midPos - 1;
23,474,379✔
292
    } else if (uid > pBlockList[midPos].maxUid) {
40,336,505✔
293
      firstPos = midPos + 1;
17,799,676✔
294
    } else {
295
      return findEarliestIndex(midPos, uid, pBlockList, num, backward);
22,537,617✔
296
    }
297
  }
298
}
299

300
static FORCE_INLINE int32_t findEarliestRow(int32_t index, uint64_t uid, const uint64_t *uidList, int32_t num,
301
                                            int32_t backward) {
302
  int32_t i = index;
357,242,038✔
303
  int32_t step = backward ? 1 : -1;
357,242,038✔
304
  while (i >= 0 && i < num && uid == uidList[i]) {
2,147,483,647✔
305
    i += step;
2,147,483,647✔
306
  }
307
  return i - step;
357,286,080✔
308
}
309

310
static int32_t binarySearchForStartRowIndex(uint64_t *uidList, int32_t num, uint64_t uid, int32_t backward) {
357,268,754✔
311
  int32_t firstPos = 0;
357,268,754✔
312
  int32_t lastPos = num - 1;
357,268,754✔
313

314
  // find the first position which is bigger than the key
315
  if ((uid > uidList[lastPos]) || (uid < uidList[firstPos])) {
357,268,754✔
316
    return -1;
×
317
  }
318

319
  while (1) {
146,650,462✔
320
    if (uid == uidList[firstPos]) {
504,020,381✔
321
      return findEarliestRow(firstPos, uid, uidList, num, backward);
217,659,008✔
322
    }
323

324
    if (uid > uidList[lastPos] || uid < uidList[firstPos]) {
286,290,778✔
325
      return -1;
33,334✔
326
    }
327

328
    int32_t numOfRows = lastPos - firstPos + 1;
286,237,864✔
329
    int32_t midPos = (numOfRows >> 1u) + firstPos;
286,237,864✔
330

331
    if (uid < uidList[midPos]) {
286,237,864✔
332
      lastPos = midPos - 1;
48,262,353✔
333
    } else if (uid > uidList[midPos]) {
237,998,831✔
334
      firstPos = midPos + 1;
98,388,109✔
335
    } else {
336
      return findEarliestRow(midPos, uid, uidList, num, backward);
139,627,072✔
337
    }
338
  }
339
}
340

341
static int32_t extractSttBlockInfo(SLDataIter *pIter, const TSttBlkArray *pArray, SSttBlockLoadInfo *pBlockLoadInfo,
580,068,650✔
342
                                   uint64_t suid) {
343
  void   *px = NULL;
580,068,650✔
344
  int32_t code = TSDB_CODE_SUCCESS;
580,068,650✔
345
  if (TARRAY2_SIZE(pArray) <= 0) {
580,068,650✔
346
    return code;
161,212✔
347
  }
348

349
  SSttBlk *pStart = &pArray->data[0];
580,145,517✔
350
  SSttBlk *pEnd = &pArray->data[TARRAY2_SIZE(pArray) - 1];
580,249,295✔
351

352
  // all identical
353
  if (pStart->suid == pEnd->suid) {
580,002,251✔
354
    if (pStart->suid != suid) {  // no qualified stt block existed
421,723,129✔
355
      taosArrayClear(pBlockLoadInfo->aSttBlk);
227,322,177✔
356
      pIter->iSttBlk = -1;
227,237,532✔
357
      return TSDB_CODE_SUCCESS;
227,339,339✔
358
    } else {  // all blocks are qualified
359
      taosArrayClear(pBlockLoadInfo->aSttBlk);
194,421,322✔
360
      px = taosArrayAddBatch(pBlockLoadInfo->aSttBlk, pArray->data, pArray->size);
194,467,519✔
361
      if (px == NULL) {
194,505,887✔
362
        return terrno;
×
363
      }
364
    }
365
  } else {
366
    SArray *pTmp = taosArrayInit(TARRAY2_SIZE(pArray), sizeof(SSttBlk));
158,414,740✔
367
    if (pTmp == NULL) {
158,435,451✔
368
      return terrno;
×
369
    }
370

371
    for (int32_t i = 0; i < TARRAY2_SIZE(pArray); ++i) {
1,014,100,036✔
372
      SSttBlk *p = &pArray->data[i];
955,601,137✔
373
      if (p->suid < suid) {
955,599,878✔
374
        continue;
299,116,105✔
375
      }
376

377
      if (p->suid == suid) {
656,513,384✔
378
        void *px = taosArrayPush(pTmp, p);
556,547,092✔
379
        if (px == NULL) {
556,547,092✔
380
          code = terrno;
×
381
          break;
×
382
        }
383
      } else if (p->suid > suid) {
99,973,634✔
384
        break;
99,979,509✔
385
      }
386
    }
387

388
    taosArrayDestroy(pBlockLoadInfo->aSttBlk);
158,424,745✔
389
    pBlockLoadInfo->aSttBlk = pTmp;
158,448,922✔
390
  }
391

392
  return code;
352,900,596✔
393
}
394

395
static int32_t tValueDupPayload(SValue *pVal) {
334,173,096✔
396
  if (IS_VAR_DATA_TYPE(pVal->type) || pVal->type == TSDB_DATA_TYPE_DECIMAL) {
334,173,096✔
397
    char *p = (char *)pVal->pData;
128,325,317✔
398
    char *pBuf = taosMemoryMalloc(pVal->nData);
128,355,409✔
399
    if (pBuf == NULL) {
128,360,471✔
400
      return terrno;
×
401
    }
402

403
    memcpy(pBuf, p, pVal->nData);
128,360,471✔
404
    pVal->pData = (uint8_t *)pBuf;
128,368,072✔
405
  }
406

407
  return TSDB_CODE_SUCCESS;
334,191,345✔
408
}
409

410
static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBlockLoadInfo *pBlockLoadInfo,
580,102,093✔
411
                                          TStatisBlkArray *pStatisBlkArray, uint64_t suid, const char *id) {
412
  int32_t code = TSDB_CODE_SUCCESS;
580,102,093✔
413
  int32_t lino = 0;
580,102,093✔
414
  void   *px = NULL;
580,102,093✔
415
  int32_t startIndex = 0;
580,102,093✔
416
  int32_t ret = 0;
580,102,093✔
417

418
  int32_t numOfBlocks = TARRAY2_SIZE(pStatisBlkArray);
580,102,093✔
419
  if (numOfBlocks <= 0) {
580,070,334✔
420
    return code;
161,212✔
421
  }
422

423
  while ((startIndex < numOfBlocks) && (pStatisBlkArray->data[startIndex].maxTbid.suid < suid)) {
776,081,433✔
424
    ++startIndex;
196,172,311✔
425
  }
426

427
  if (startIndex >= numOfBlocks || pStatisBlkArray->data[startIndex].minTbid.suid > suid) {
580,115,838✔
428
    return 0;
271,329,520✔
429
  }
430

431
  int32_t endIndex = startIndex;
308,724,783✔
432
  while (endIndex < numOfBlocks && pStatisBlkArray->data[endIndex].minTbid.suid <= suid) {
622,774,656✔
433
    ++endIndex;
314,049,873✔
434
  }
435

436
  int32_t num = endIndex - startIndex;
308,771,185✔
437
  pBlockLoadInfo->cost.loadStatisBlocks += num;
308,771,185✔
438

439
  STbStatisBlock block;
308,668,776✔
440
  code = tStatisBlockInit(&block);
308,672,681✔
441
  QUERY_CHECK_CODE(code, lino, _end);
308,742,292✔
442

443
  int64_t st = taosGetTimestampUs();
308,722,140✔
444

445
  for (int32_t k = startIndex; k < endIndex; ++k) {
622,796,412✔
446
    code = tsdbSttFileReadStatisBlock(pSttFileReader, &pStatisBlkArray->data[k], &block);
314,029,472✔
447
    QUERY_CHECK_CODE(code, lino, _end);
314,110,789✔
448

449
    int32_t i = 0;
314,110,789✔
450
    int32_t rows = block.numOfRecords;
314,110,789✔
451
    while (i < rows && ((int64_t *)block.suids.data)[i] != suid) {
479,050,070✔
452
      ++i;
164,939,281✔
453
    }
454

455
    // existed
456
    if (i < rows) {
314,104,293✔
457
      SSttTableRowsInfo *pInfo = &pBlockLoadInfo->info;
297,684,844✔
458

459
      if (pInfo->pUid == NULL) {
297,703,047✔
460
        pInfo->pUid = taosArrayInit(rows, sizeof(int64_t));
292,333,463✔
461
        pInfo->pFirstTs = taosArrayInit(rows, sizeof(int64_t));
292,312,275✔
462
        pInfo->pLastTs = taosArrayInit(rows, sizeof(int64_t));
292,329,689✔
463
        pInfo->pCount = taosArrayInit(rows, sizeof(int64_t));
292,332,478✔
464

465
        pInfo->pFirstKey = taosArrayInit(rows, sizeof(SValue));
292,294,680✔
466
        pInfo->pLastKey = taosArrayInit(rows, sizeof(SValue));
292,326,601✔
467

468
        if (pInfo->pUid == NULL || pInfo->pFirstTs == NULL || pInfo->pLastTs == NULL || pInfo->pCount == NULL ||
292,302,764✔
469
            pInfo->pFirstKey == NULL || pInfo->pLastKey == NULL) {
292,346,581✔
470
          code = terrno;
568✔
471
          goto _end;
×
472
        }
473

474
        pInfo->memSize = sizeof(SSttTableRowsInfo) + sizeof(SArray) * 6;
292,301,558✔
475
      }
476

477
      if (pStatisBlkArray->data[k].maxTbid.suid == suid) {
297,683,490✔
478
        int32_t size = rows - i;
257,525,176✔
479
        int32_t offset = i * sizeof(int64_t);
257,525,176✔
480

481
        px = taosArrayAddBatch(pInfo->pUid, tBufferGetDataAt(&block.uids, offset), size);
257,525,176✔
482
        TSDB_CHECK_NULL(px, code, lino, _end, terrno);
257,537,591✔
483

484
        px = taosArrayAddBatch(pInfo->pFirstTs, tBufferGetDataAt(&block.firstKeyTimestamps, offset), size);
257,537,591✔
485
        TSDB_CHECK_NULL(px, code, lino, _end, terrno);
257,504,486✔
486

487
        px = taosArrayAddBatch(pInfo->pLastTs, tBufferGetDataAt(&block.lastKeyTimestamps, offset), size);
257,504,486✔
488
        TSDB_CHECK_NULL(px, code, lino, _end, terrno);
257,517,783✔
489

490
        px = taosArrayAddBatch(pInfo->pCount, tBufferGetDataAt(&block.counts, offset), size);
257,517,783✔
491
        TSDB_CHECK_NULL(px, code, lino, _end, terrno);
257,553,847✔
492

493
        pInfo->memSize += size * sizeof(int64_t) * 4;
257,553,847✔
494
        pInfo->memSize += size * sizeof(SValue) * 2;
257,518,571✔
495

496
        if (block.numOfPKs > 0) {
257,449,264✔
497
          SValue vFirst = {0}, vLast = {0};
77,697,714✔
498
          for (int32_t f = i; f < rows; ++f) {
226,996,486✔
499
            code = tValueColumnGet(&block.firstKeyPKs[0], f, &vFirst);
149,284,260✔
500
            TSDB_CHECK_CODE(code, lino, _end);
149,291,654✔
501

502
            code = tValueDupPayload(&vFirst);
149,291,654✔
503
            TSDB_CHECK_CODE(code, lino, _end);
149,294,108✔
504

505
            px = taosArrayPush(pInfo->pFirstKey, &vFirst);
149,294,108✔
506
            TSDB_CHECK_NULL(px, code, lino, _end, terrno);
149,322,353✔
507

508
            // todo add api to clone the original data
509
            code = tValueColumnGet(&block.lastKeyPKs[0], f, &vLast);
149,322,353✔
510
            TSDB_CHECK_CODE(code, lino, _end);
149,329,227✔
511

512
            code = tValueDupPayload(&vLast);
149,329,227✔
513
            TSDB_CHECK_CODE(code, lino, _end);
149,313,369✔
514

515
            px = taosArrayPush(pInfo->pLastKey, &vLast);
149,313,369✔
516
            TSDB_CHECK_NULL(px, code, lino, _end, terrno);
149,329,266✔
517

518
            pInfo->memSize += (IS_VAR_DATA_TYPE(vFirst.type) || vFirst.type == TSDB_DATA_TYPE_DECIMAL)? (vFirst.nData + vLast.nData):0;
149,329,266✔
519
          }
520
        } else {
521
          // SValue vFirst = {0}; is equal array init with zeroed data
522
          px = taosArrayReserve(pInfo->pFirstKey, size);
179,751,550✔
523
          TSDB_CHECK_NULL(px, code, lino, _end, terrno);
179,811,713✔
524
          px = taosArrayReserve(pInfo->pLastKey, size);
179,811,713✔
525
          TSDB_CHECK_NULL(px, code, lino, _end, terrno);
179,814,961✔
526
        }
527
      } else {
528
        STbStatisRecord record = {0};
40,138,894✔
529
        while (i < rows) {
189,030,059✔
530
          code = tStatisBlockGet(&block, i, &record);
189,038,573✔
531
          TSDB_CHECK_CODE(code, lino, _end);
189,024,454✔
532

533
          if (record.suid != suid) {
189,024,454✔
534
            break;
40,160,358✔
535
          }
536

537
          px = taosArrayPush(pInfo->pUid, &record.uid);
148,864,096✔
538
          TSDB_CHECK_NULL(px, code, lino, _end, terrno);
148,890,233✔
539

540
          px = taosArrayPush(pInfo->pCount, &record.count);
148,890,233✔
541
          TSDB_CHECK_NULL(px, code, lino, _end, terrno);
148,884,031✔
542

543
          px = taosArrayPush(pInfo->pFirstTs, &record.firstKey.ts);
148,884,031✔
544
          TSDB_CHECK_NULL(px, code, lino, _end, terrno);
148,901,808✔
545

546
          px = taosArrayPush(pInfo->pLastTs, &record.lastKey.ts);
148,901,808✔
547
          TSDB_CHECK_NULL(px, code, lino, _end, terrno);
148,901,032✔
548

549
          pInfo->memSize += (sizeof(int64_t) * 4 + sizeof(SValue) * 2);
148,901,032✔
550

551
          if (record.firstKey.numOfPKs > 0) {
148,880,988✔
552
            SValue first = record.firstKey.pks[0];
17,786,420✔
553
            code = tValueDupPayload(&first);
17,787,830✔
554
            TSDB_CHECK_CODE(code, lino, _end);
17,788,513✔
555

556
            px = taosArrayPush(pInfo->pFirstKey, &first);
17,788,513✔
557
            TSDB_CHECK_NULL(px, code, lino, _end, terrno);
17,789,218✔
558

559
            SValue last = record.lastKey.pks[0];
17,789,218✔
560
            code = tValueDupPayload(&last);
17,786,398✔
561
            TSDB_CHECK_CODE(code, lino, _end);
17,789,218✔
562

563
            px = taosArrayPush(pInfo->pLastKey, &last);
17,789,218✔
564
            TSDB_CHECK_NULL(px, code, lino, _end, terrno);
17,787,808✔
565

566
            pInfo->memSize += (IS_VAR_DATA_TYPE(first.type) || first.type == TSDB_DATA_TYPE_DECIMAL)? (first.nData + last.nData):0;
17,787,808✔
567
          } else {
568
            // append empty value, SValue v = {0}; 
569
            px = taosArrayReserve(pInfo->pFirstKey, 1);
131,094,568✔
570
            TSDB_CHECK_NULL(px, code, lino, _end, terrno);
131,095,302✔
571

572
            px = taosArrayReserve(pInfo->pLastKey, 1);
131,095,302✔
573
            TSDB_CHECK_NULL(px, code, lino, _end, terrno);
131,097,618✔
574
          }
575

576
          i += 1;
148,885,426✔
577
        }
578
      }
579
    }
580
  }
581

582
_end:
308,802,511✔
583
  tStatisBlockDestroy(&block);
308,708,680✔
584
  if (code != 0) {
308,727,886✔
585
    tsdbError("%s error happens at:%s line number: %d, code:%s", id, __func__, lino, tstrerror(code));
×
586
  } else {
587
    double el = (taosGetTimestampUs() - st) / 1000.0;
308,686,992✔
588
    pBlockLoadInfo->cost.statisElapsedTime += el;
308,686,992✔
589

590
    tsdbDebug("%s load %d statis blocks into buf, elapsed time:%.2fms", id, num, el);
308,698,412✔
591
  }
592
  return code;
308,700,870✔
593
}
594

595
static int32_t doLoadSttFilesBlk(SSttBlockLoadInfo *pBlockLoadInfo, SLDataIter *pIter, int64_t suid,
579,875,522✔
596
                                 _load_tomb_fn loadTombFn, void *pReader1, const char *idStr, bool loadFromDisk) {
597
  int64_t st = taosGetTimestampUs();
580,119,624✔
598

599
  const TSttBlkArray *pSttBlkArray = NULL;
580,119,624✔
600
  pBlockLoadInfo->sttBlockLoaded = true;
579,972,549✔
601

602
  // load the stt block info for each stt-block
603
  int32_t code = tsdbSttFileReadSttBlk(pIter->pReader, &pSttBlkArray);
580,336,128✔
604
  if (code != TSDB_CODE_SUCCESS) {
580,403,597✔
605
    tsdbError("load stt blk failed, code:%s, %s", tstrerror(code), idStr);
×
606
    return code;
×
607
  }
608

609
  // load the stt block info for each stt file block
610
  code = extractSttBlockInfo(pIter, pSttBlkArray, pBlockLoadInfo, suid);
580,403,597✔
611
  if (code != TSDB_CODE_SUCCESS) {
580,208,144✔
612
    tsdbError("load stt block info failed, code:%s, %s", tstrerror(code), idStr);
29✔
613
    return code;
×
614
  }
615

616
  if (loadFromDisk) {
580,218,688✔
617
    // load stt statistics block for all stt-blocks, to decide if the data of queried table exists in current stt file
618
    TStatisBlkArray *pStatisBlkArray = NULL;
580,157,248✔
619
    code = tsdbSttFileReadStatisBlk(pIter->pReader, (const TStatisBlkArray **)&pStatisBlkArray);
580,100,316✔
620
    if (code != TSDB_CODE_SUCCESS) {
580,344,778✔
621
      tsdbError("failed to load stt block statistics, code:%s, %s", tstrerror(code), idStr);
×
622
      return code;
×
623
    }
624

625
    // load statistics block for all tables in current stt file
626
    code = loadSttStatisticsBlockData(pIter->pReader, pIter->pBlockLoadInfo, pStatisBlkArray, suid, idStr);
580,344,778✔
627
    if (code != TSDB_CODE_SUCCESS) {
580,042,298✔
628
      tsdbError("failed to load stt statistics block data, code:%s, %s", tstrerror(code), idStr);
×
629
      return code;
×
630
    }
631
  } else {
632
    tsdbDebug("stt block statis info loaded from cache, %s", idStr);
61,440✔
633
  }
634

635
  code = loadTombFn(pReader1, pIter->pReader, pIter->pBlockLoadInfo);
580,421,908✔
636

637
  double el = (taosGetTimestampUs() - st) / 1000.0;
580,411,444✔
638
  tsdbDebug("load the stt file blk info completed, elapsed time:%.2fms, %s", el, idStr);
580,411,444✔
639
  return code;
580,353,668✔
640
}
641

642
static int32_t uidComparFn(const void *p1, const void *p2) {
795,962,752✔
643
  const uint64_t *pFirst = p1;
795,962,752✔
644
  const uint64_t *pVal = p2;
795,962,752✔
645

646
  if (*pFirst == *pVal) {
795,962,752✔
647
    return 0;
365,150,645✔
648
  } else {
649
    return *pFirst < *pVal ? -1 : 1;
431,000,044✔
650
  }
651
}
652

653
static void setSttInfoForCurrentTable(SSttBlockLoadInfo *pLoadInfo, uint64_t uid, SSttKeyRange *pRange,
978,529,911✔
654
                                      int64_t *numOfRows) {
655
  if (pRange == NULL || taosArrayGetSize(pLoadInfo->info.pUid) == 0) {
978,529,911✔
656
    return;
513,534,188✔
657
  }
658

659
  int32_t index = taosArraySearchIdx(pLoadInfo->info.pUid, &uid, uidComparFn, TD_EQ);
465,168,088✔
660
  if (index >= 0) {
465,150,810✔
661
    pRange->skey.ts = *(int64_t *)taosArrayGet(pLoadInfo->info.pFirstTs, index);
365,116,741✔
662
    pRange->ekey.ts = *(int64_t *)taosArrayGet(pLoadInfo->info.pLastTs, index);
365,107,829✔
663

664
    *numOfRows += *(int64_t *)taosArrayGet(pLoadInfo->info.pCount, index);
365,120,610✔
665

666
    if (pRange->skey.numOfPKs > 0) {
365,105,280✔
667
      memcpy(&pRange->skey.pks[0], taosArrayGet(pLoadInfo->info.pFirstKey, index), sizeof(SValue));
148,547,787✔
668
      memcpy(&pRange->ekey.pks[0], taosArrayGet(pLoadInfo->info.pLastKey, index), sizeof(SValue));
148,557,681✔
669
    }
670
  }
671
}
672

673
int32_t tLDataIterOpen2(SLDataIter *pIter, SSttFileReader *pSttFileReader, int32_t cid, int8_t backward,
978,575,226✔
674
                        SMergeTreeConf *pConf, SSttBlockLoadInfo *pBlockLoadInfo, SSttKeyRange *pKeyRange,
675
                        int64_t *numOfRows, const char *idStr, bool loadFromDisk) {
676
  int32_t code = TSDB_CODE_SUCCESS;
978,575,226✔
677

678
  pIter->uid = pConf->uid;
978,575,226✔
679
  pIter->cid = cid;
978,784,269✔
680
  pIter->backward = backward;
979,014,029✔
681
  pIter->verRange.minVer = pConf->verRange.minVer;
978,962,194✔
682
  pIter->verRange.maxVer = pConf->verRange.maxVer;
978,584,318✔
683
  pIter->timeWindow.skey = pConf->timewindow.skey;
978,929,086✔
684
  pIter->timeWindow.ekey = pConf->timewindow.ekey;
978,501,662✔
685

686
  pIter->pStartRowKey = pConf->pCurRowKey;
978,923,183✔
687
  pIter->pReader = pSttFileReader;
978,447,939✔
688
  pIter->pBlockLoadInfo = pBlockLoadInfo;
978,979,285✔
689

690
  // open stt file failed, ignore and continue
691
  if (pIter->pReader == NULL) {
978,889,818✔
692
    tsdbError("stt file reader is null, %s", idStr);
×
693
    pIter->pSttBlk = NULL;
×
694
    pIter->iSttBlk = -1;
×
695
    return TSDB_CODE_SUCCESS;
×
696
  }
697

698
  if (!pBlockLoadInfo->sttBlockLoaded) {
978,564,147✔
699
      code = doLoadSttFilesBlk(pBlockLoadInfo, pIter, pConf->suid, pConf->loadTombFn, pConf->pReader, idStr, loadFromDisk);
579,921,313✔
700
  }
701

702
  setSttInfoForCurrentTable(pBlockLoadInfo, pConf->uid, pKeyRange, numOfRows);
978,748,444✔
703

704
  // find the start block, actually we could load the position to avoid repeatly searching for the start position when
705
  // the skey is updated.
706
  size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk);
978,598,343✔
707
  pIter->iSttBlk = binarySearchForStartBlock(pBlockLoadInfo->aSttBlk->pData, size, pConf->uid, backward);
978,759,632✔
708
  if (pIter->iSttBlk != -1) {
978,636,886✔
709
    pIter->pSttBlk = taosArrayGet(pBlockLoadInfo->aSttBlk, pIter->iSttBlk);
365,174,760✔
710
    pIter->iRow = (pIter->backward) ? pIter->pSttBlk->nRow : -1;
365,090,631✔
711

712
    if ((!backward) && ((pConf->strictTimeRange && pIter->pSttBlk->minKey >= pIter->timeWindow.ekey) ||
365,147,916✔
713
                        (!pConf->strictTimeRange && pIter->pSttBlk->minKey > pIter->timeWindow.ekey))) {
300,728,242✔
714
      pIter->pSttBlk = NULL;
2,640,044✔
715
    }
716

717
    if (backward && ((pConf->strictTimeRange && pIter->pSttBlk->maxKey <= pIter->timeWindow.skey) ||
365,074,475✔
718
                     (!pConf->strictTimeRange && pIter->pSttBlk->maxKey < pIter->timeWindow.skey))) {
64,335,212✔
719
      pIter->pSttBlk = NULL;
129,629✔
720
      pIter->ignoreEarlierTs = true;
198,445✔
721
    }
722
  }
723

724
  return code;
978,292,025✔
725
}
726

727
void tLDataIterClose2(SLDataIter *pIter) {
580,350,645✔
728
  tsdbSttFileReaderClose(&pIter->pReader);
580,350,645✔
729
  pIter->pReader = NULL;
580,337,451✔
730
}
580,387,375✔
731

732
void tLDataIterNextBlock(SLDataIter *pIter, const char *idStr) {
400,221,169✔
733
  int32_t step = pIter->backward ? -1 : 1;
400,221,169✔
734
  int32_t oldIndex = pIter->iSttBlk;
400,282,198✔
735

736
  pIter->iSttBlk += step;
400,316,662✔
737

738
  int32_t index = -1;
400,278,616✔
739
  size_t  size = pIter->pBlockLoadInfo->aSttBlk->size;
400,278,616✔
740
  for (int32_t i = pIter->iSttBlk; i < size && i >= 0; i += step) {
401,389,525✔
741
    SSttBlk *p = taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, i);
81,444,027✔
742
    if ((!pIter->backward) && p->minUid > pIter->uid) {
81,442,127✔
743
      break;
32,447,200✔
744
    }
745

746
    if (pIter->backward && p->maxUid < pIter->uid) {
48,999,578✔
747
      break;
1,181,836✔
748
    }
749

750
    // check uid firstly
751
    if (p->minUid <= pIter->uid && p->maxUid >= pIter->uid) {
47,815,229✔
752
      if ((!pIter->backward) && p->minKey > pIter->timeWindow.ekey) {
47,815,229✔
753
        break;
2,933,582✔
754
      }
755

756
      if (pIter->backward && p->maxKey < pIter->timeWindow.skey) {
44,881,647✔
757
        break;
754✔
758
      }
759

760
      // check time range secondly
761
      if (p->minKey <= pIter->timeWindow.ekey && p->maxKey >= pIter->timeWindow.skey) {
44,880,256✔
762
        if ((!pIter->backward) && p->minVer > pIter->verRange.maxVer) {
43,784,095✔
763
          break;
×
764
        }
765

766
        if (pIter->backward && p->maxVer < pIter->verRange.minVer) {
43,784,091✔
767
          break;
×
768
        }
769

770
        if (p->minVer <= pIter->verRange.maxVer && p->maxVer >= pIter->verRange.minVer) {
43,784,095✔
771
          index = i;
43,783,421✔
772
          break;
43,783,421✔
773
        }
774
      }
775
    }
776
  }
777

778
  pIter->pSttBlk = NULL;
400,297,199✔
779
  if (index != -1) {
400,194,127✔
780
    SSttBlk *p = taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, index);
43,784,095✔
781

782
    pIter->iSttBlk = index;
43,784,095✔
783
    pIter->pSttBlk = (SSttBlk *)taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, pIter->iSttBlk);
43,784,095✔
784
    tsdbDebug("try next stt-file block:%d from %d, trigger by uid:%" PRIu64 ", stt-fileVer:%" PRId64
43,783,486✔
785
              ", uidRange:%" PRId64 "-%" PRId64 " %s",
786
              pIter->iSttBlk, oldIndex, pIter->uid, pIter->cid, p->minUid, p->maxUid, idStr);
787
  } else {
788
    tsdbDebug("no more last block qualified, uid:%" PRIu64 ", stt-file block:%d, %s", pIter->uid, oldIndex, idStr);
356,410,032✔
789
  }
790
}
400,193,518✔
791

792
static int32_t findNextValidRow(SLDataIter *pIter, const char *idStr) {
2,147,483,647✔
793
  bool        hasVal = false;
2,147,483,647✔
794
  int32_t     step = pIter->backward ? -1 : 1;
2,147,483,647✔
795
  int32_t     i = pIter->iRow;
2,147,483,647✔
796
  SBlockData *pData = NULL;
2,147,483,647✔
797

798
  int32_t code = loadLastBlock(pIter, idStr, &pData);
2,147,483,647✔
799
  if (code) {
2,147,483,647✔
800
    tsdbError("failed to load stt block, code:%s, %s", tstrerror(code), idStr);
×
801
    return code;
×
802
  }
803

804
  // mostly we only need to find the start position for a given table
805
  if ((((i == 0) && (!pIter->backward)) || (i == pData->nRow - 1 && pIter->backward)) && pData->aUid != NULL) {
2,147,483,647✔
806
    i = binarySearchForStartRowIndex((uint64_t *)pData->aUid, pData->nRow, pIter->uid, pIter->backward);
357,354,912✔
807
    if (i == -1) {
357,265,901✔
808
      tsdbDebug("failed to find the data in pBlockData, uid:%" PRIu64 " , %s", pIter->uid, idStr);
33,334✔
809
      pIter->iRow = -1;
33,334✔
810
      return code;
33,334✔
811
    }
812
  }
813

814
  for (; i < pData->nRow && i >= 0; i += step) {
2,147,483,647✔
815
    if (pData->aUid != NULL) {
2,147,483,647✔
816
      if (!pIter->backward) {
2,147,483,647✔
817
        if (pData->aUid[i] > pIter->uid) {
2,147,483,647✔
818
          break;
115,033,013✔
819
        }
820
      } else {
821
        if (pData->aUid[i] < pIter->uid) {
2,147,483,647✔
822
          break;
25,966,615✔
823
        }
824
      }
825
    }
826

827
    int64_t ts = pData->aTSKEY[i];
2,147,483,647✔
828
    if (!pIter->backward) {               // asc
2,147,483,647✔
829
      if (ts > pIter->timeWindow.ekey) {  // no more data
2,147,483,647✔
830
        break;
4,303,844✔
831
      } else {
832
        if (ts < pIter->timeWindow.skey) {
2,147,483,647✔
833
          continue;
2,147,483,647✔
834
        }
835

836
        if (ts == pIter->timeWindow.skey && pIter->pStartRowKey->numOfPKs > 0) {
2,147,483,647✔
837
          SRowKey key;
35,440,040✔
838
          tColRowGetKey(pData, i, &key);
35,440,040✔
839
          int32_t ret = pkCompEx(&key, pIter->pStartRowKey);
35,440,040✔
840
          if (ret < 0) {
35,440,040✔
841
            continue;
×
842
          }
843
        }
844
      }
845
    } else {
846
      if (ts < pIter->timeWindow.skey) {
2,147,483,647✔
847
        break;
915,568✔
848
      } else {
849
        if (ts > pIter->timeWindow.ekey) {
2,147,483,647✔
850
          continue;
56,689,718✔
851
        }
852

853
        if (ts == pIter->timeWindow.ekey && pIter->pStartRowKey->numOfPKs > 0) {
2,147,483,647✔
854
          SRowKey key;
×
855
          tColRowGetKey(pData, i, &key);
×
856
          int32_t ret = pkCompEx(&key, pIter->pStartRowKey);
×
857
          if (ret > 0) {
×
858
            continue;
×
859
          }
860
        }
861
      }
862
    }
863

864
    int64_t ver = pData->aVersion[i];
2,147,483,647✔
865
    if (ver < pIter->verRange.minVer) {
2,147,483,647✔
866
      continue;
×
867
    }
868

869
    // todo opt handle desc case
870
    if (ver > pIter->verRange.maxVer) {
2,147,483,647✔
871
      continue;
1,416✔
872
    }
873

874
    hasVal = true;
2,147,483,647✔
875
    break;
2,147,483,647✔
876
  }
877

878
  pIter->iRow = (hasVal) ? i : -1;
2,147,483,647✔
879
  return code;
2,147,483,647✔
880
}
881

882
int32_t tLDataIterNextRow(SLDataIter *pIter, const char *idStr, bool *hasNext) {
2,147,483,647✔
883
  int32_t     step = pIter->backward ? -1 : 1;
2,147,483,647✔
884
  int32_t     code = 0;
2,147,483,647✔
885
  int32_t     iBlockL = pIter->iSttBlk;
2,147,483,647✔
886
  SBlockData *pBlockData = NULL;
2,147,483,647✔
887
  int32_t     lino = 0;
2,147,483,647✔
888

889
  *hasNext = false;
2,147,483,647✔
890

891
  // no qualified last file block in current file, no need to fetch row
892
  if (pIter->pSttBlk == NULL) {
2,147,483,647✔
893
    return code;
616,688,908✔
894
  }
895

896
  code = loadLastBlock(pIter, idStr, &pBlockData);
2,147,483,647✔
897
  if (pBlockData == NULL || code != TSDB_CODE_SUCCESS) {
2,147,483,647✔
898
    lino = __LINE__;
×
899
    goto _exit;
×
900
  }
901

902
  pIter->iRow += step;
2,147,483,647✔
903

904
  while (1) {
43,782,821✔
905
    bool skipBlock = false;
2,147,483,647✔
906
    code = findNextValidRow(pIter, idStr);
2,147,483,647✔
907
    TSDB_CHECK_CODE(code, lino, _exit);
2,147,483,647✔
908

909
    if (pIter->pBlockLoadInfo->checkRemainingRow) {
2,147,483,647✔
910
      skipBlock = true;
×
911
      int16_t *aCols = pIter->pBlockLoadInfo->colIds;
×
912
      int      nCols = pIter->pBlockLoadInfo->numOfCols;
×
913
      bool     isLast = pIter->pBlockLoadInfo->isLast;
×
914
      for (int inputColIndex = 0; inputColIndex < nCols; ++inputColIndex) {
×
915
        for (int colIndex = 0; colIndex < pBlockData->nColData; ++colIndex) {
×
916
          SColData *pColData = &pBlockData->aColData[colIndex];
×
917
          int16_t   cid = pColData->cid;
×
918

919
          if (cid == aCols[inputColIndex]) {
×
920
            if (isLast && (pColData->flag & HAS_VALUE)) {
×
921
              skipBlock = false;
×
922
              break;
×
923
            } else if (pColData->flag & (HAS_VALUE | HAS_NULL)) {
×
924
              skipBlock = false;
×
925
              break;
×
926
            }
927
          }
928
        }
929
      }
930
    }
931

932
    if (skipBlock || pIter->iRow >= pBlockData->nRow || pIter->iRow < 0) {
2,147,483,647✔
933
      tLDataIterNextBlock(pIter, idStr);
545,959,352✔
934
      if (pIter->pSttBlk == NULL) {  // no more data
400,188,524✔
935
        goto _exit;
356,505,957✔
936
      }
937
    } else {
938
      break;
939
    }
940

941
    if (iBlockL != pIter->iSttBlk) {
43,783,490✔
942
      code = loadLastBlock(pIter, idStr, &pBlockData);
43,783,490✔
943
      if ((pBlockData == NULL) || (code != 0)) {
43,782,821✔
944
        lino = __LINE__;
×
945
        goto _exit;
×
946
      }
947

948
      // set start row index
949
      pIter->iRow = pIter->backward ? pBlockData->nRow - 1 : 0;
43,783,490✔
950
    }
951
  }
952

953
  pIter->rInfo.suid = pBlockData->suid;
2,147,483,647✔
954
  pIter->rInfo.uid = pBlockData->uid;
2,147,483,647✔
955
  pIter->rInfo.row = tsdbRowFromBlockData(pBlockData, pIter->iRow);
2,147,483,647✔
956

957
_exit:
2,147,483,647✔
958
  if (code) {
2,147,483,647✔
959
    tsdbError("failed to exec stt-file nextIter, lino:%d, code:%s, %s", lino, tstrerror(code), idStr);
×
960
  }
961

962
  *hasNext = (code == TSDB_CODE_SUCCESS) && (pIter->pSttBlk != NULL) && (pBlockData != NULL);
2,147,483,647✔
963
  return code;
2,147,483,647✔
964
}
965

966
// SMergeTree =================================================
967
static FORCE_INLINE int32_t tLDataIterCmprFn(const SRBTreeNode *p1, const SRBTreeNode *p2) {
2,147,483,647✔
968
  SLDataIter *pIter1 = (SLDataIter *)(((uint8_t *)p1) - offsetof(SLDataIter, node));
2,147,483,647✔
969
  SLDataIter *pIter2 = (SLDataIter *)(((uint8_t *)p2) - offsetof(SLDataIter, node));
2,147,483,647✔
970

971
  SRowKey rkey1 = {0}, rkey2 = {0};
2,147,483,647✔
972
  tRowGetKeyEx(&pIter1->rInfo.row, &rkey1);
2,147,483,647✔
973
  tRowGetKeyEx(&pIter2->rInfo.row, &rkey2);
2,147,483,647✔
974

975
  int32_t ret = tRowKeyCompare(&rkey1, &rkey2);
2,147,483,647✔
976
  if (ret < 0) {
2,147,483,647✔
977
    return -1;
2,147,483,647✔
978
  } else if (ret > 0) {
2,147,483,647✔
979
    return 1;
2,147,483,647✔
980
  } else {
981
    int64_t ver1 = TSDBROW_VERSION(&pIter1->rInfo.row);
2,147,483,647✔
982
    int64_t ver2 = TSDBROW_VERSION(&pIter2->rInfo.row);
2,147,483,647✔
983

984
    if (ver1 < ver2) {
2,147,483,647✔
985
      return -1;
1,714,253,033✔
986
    } else if (ver1 > ver2) {
2,147,483,647✔
987
      return 1;
2,147,483,647✔
988
    } else {
989
      return 0;
×
990
    }
991
  }
992
}
993

994
static FORCE_INLINE int32_t tLDataIterDescCmprFn(const SRBTreeNode *p1, const SRBTreeNode *p2) {
616,768,241✔
995
  return -1 * tLDataIterCmprFn(p1, p2);
616,768,241✔
996
}
997

998
static void clearTableRowsInfoCache(void) {
171,600✔
999
  int32_t items = (statisCacheInfo.pStatisFileCache != NULL)? taosLRUCacheGetElems(statisCacheInfo.pStatisFileCache):0;
171,600✔
1000
  tsdbInfo("start to free %d items in statisCache", items);
171,600✔
1001

1002
  taosLRUCacheCleanup(statisCacheInfo.pStatisFileCache);
171,600✔
1003
  (void)taosThreadMutexDestroy(&statisCacheInfo.lock);
171,600✔
1004
}
171,600✔
1005

1006
// init the statis file cache, 10MiB by default
1007
static void initTableRowsInfoCache(void) {
171,600✔
1008
  statisCacheInfo.pStatisFileCache = taosLRUCacheInit(40 * 1024 * 1024, -1, 0.5);
171,600✔
1009
  (void)taosThreadMutexInit(&statisCacheInfo.lock, NULL);
171,600✔
1010
  (void) atexit(clearTableRowsInfoCache);
171,600✔
1011
}
171,600✔
1012

1013
int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoForTable *pSttDataInfo) {
1,016,626,165✔
1014
  int32_t               code = TSDB_CODE_SUCCESS;
1,016,626,165✔
1015
  STFileSet *           pFset = (STFileSet *)pConf->pCurrentFileset;
1,016,626,165✔
1016
  bool                  loadStatisFromDisk = true;
1,016,865,096✔
1017
  int32_t               lino = 0;
1,016,865,096✔
1018
  int32_t               numOfLevels = pFset->lvlArr->size;
1,016,865,096✔
1019
  SSttStatisCacheValue* pValue = NULL;
1,016,930,053✔
1020
  LRUHandle*            pHandle = NULL;
1,016,956,249✔
1021
  SSttStatisCacheKey    key = {.suid = pConf->suid, .fid = pFset->fid, .vgId = TD_VID(pConf->pTsdb->pVnode)};
1,016,955,414✔
1022

1023
  (void)taosThreadOnce(&tsCacheInit, initTableRowsInfoCache);
1,016,947,110✔
1024

1025
  pMTree->pIter = NULL;
1,016,763,658✔
1026
  pMTree->backward = pConf->backward;
1,016,834,568✔
1027
  pMTree->idStr = pConf->idstr;
1,016,682,726✔
1028

1029
  if (!pMTree->backward) {  // asc
1,016,674,400✔
1030
    tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn);
794,785,944✔
1031
  } else {  // desc
1032
    tRBTreeCreate(&pMTree->rbt, tLDataIterDescCmprFn);
221,922,466✔
1033
  }
1034

1035
  pMTree->ignoreEarlierTs = false;
1,016,406,577✔
1036

1037
  // no data exists, go to end
1038
  if (numOfLevels == 0) {
1,016,617,530✔
1039
    goto _end;
73,772,286✔
1040
  }
1041

1042
  code = adjustSttDataIters(pConf->pSttFileBlockIterArray, pConf->pCurrentFileset);
942,845,244✔
1043
  if (code) {
942,801,939✔
1044
    goto _end;
×
1045
  }
1046

1047
  if (pConf->cacheStatis) {
942,801,939✔
1048
    int32_t ret = getStatisInfoFromCache(statisCacheInfo.pStatisFileCache, &key, &pValue, &pHandle, pConf->idstr);
184,065✔
1049
    if (ret == TSDB_CODE_SUCCESS) {  // use cached statis info
184,065✔
1050
      if (pValue->commitTs == pFset->lastCommit) {
183,720✔
1051
        loadStatisFromDisk = false;
183,720✔
1052
      } else {  // release the handle ref, and then remove it from lru cache
1053
        int64_t ts = pValue->commitTs;
×
1054
        clearStatisInfoCache(statisCacheInfo.pStatisFileCache, &key, &pHandle);
×
1055
        tsdbInfo(
×
1056
            "cache expired since new commit occurs, remove the cache and load from disk, vgId:%d, fid:%d, suid:%" PRId64
1057
            ", commitTs:%" PRId64 ", new commitTs:%" PRId64,
1058
            key.vgId, key.fid, key.suid, ts, pFset->lastCommit);
1059
      }
1060
    }
1061
  }
1062

1063
  for (int32_t j = 0; j < numOfLevels; ++j) {
1,921,020,981✔
1064
    SSttLvl *pSttLevel = ((STFileSet *)pConf->pCurrentFileset)->lvlArr->data[j];
977,841,358✔
1065
    SArray  *pList = taosArrayGetP(pConf->pSttFileBlockIterArray, j);
978,192,177✔
1066

1067
    for (int32_t i = 0; i < TARRAY2_SIZE(pSttLevel->fobjArr); ++i) {  // open all last file
1,957,266,663✔
1068
      SLDataIter *pIter = taosArrayGetP(pList, i);
978,710,007✔
1069

1070
      SSttFileReader    *pSttFileReader = pIter->pReader;
978,750,997✔
1071
      SSttBlockLoadInfo *pLoadInfo = pIter->pBlockLoadInfo;
978,398,324✔
1072

1073
      // open stt file reader if not opened yet
1074
      // if failed to open this stt file, ignore the error and try next one
1075
      if (pSttFileReader == NULL) {
978,735,194✔
1076
        int32_t pgSize = pConf->pTsdb->pVnode->config.tsdbPageSize;
580,332,359✔
1077

1078
        SSttFileReaderConfig conf = {
1,161,077,428✔
1079
            .tsdb = pConf->pTsdb, .szPage = pgSize, .file[0] = *pSttLevel->fobjArr->data[i]->f};
580,408,245✔
1080

1081
        code = tsdbSttFileReaderOpen(pSttLevel->fobjArr->data[i]->fname, &conf, &pSttFileReader);
580,575,786✔
1082
        if (code != TSDB_CODE_SUCCESS) {
579,806,983✔
1083
          tsdbError("open stt file reader error. file name %s, code %s, %s", pSttLevel->fobjArr->data[i]->fname,
×
1084
                    tstrerror(code), pMTree->idStr);
1085
        }
1086
      }
1087

1088
      if (pLoadInfo == NULL) {
978,122,028✔
1089
        code = tCreateSttBlockLoadInfo(pConf->pSchema, pConf->pCols, pConf->numOfCols, &pLoadInfo);
579,858,083✔
1090
        if (code != TSDB_CODE_SUCCESS) {
580,379,153✔
1091
          goto _end;
×
1092
        }
1093
      }
1094

1095
      if (!loadStatisFromDisk && (pLoadInfo->info.pCount == NULL)) {
978,643,098✔
1096
          code = getSttTableRowsInfo(pValue, pConf->pCurRowKey->numOfPKs, j, i, &pLoadInfo->info);
61,725✔
1097
          if (code != TSDB_CODE_SUCCESS) {
61,725✔
1098
            loadStatisFromDisk = true;  // failed to get statis info from cache, load it from stt file
×
1099
          }
1100
      }
1101

1102
      memset(pIter, 0, sizeof(SLDataIter));
978,643,098✔
1103

1104
      SSttKeyRange range = {.skey.numOfPKs = pConf->pCurRowKey->numOfPKs, .ekey.numOfPKs = pConf->pCurRowKey->numOfPKs};
978,643,098✔
1105
      int64_t      numOfRows = 0;
979,004,458✔
1106
      int64_t      cid = pSttLevel->fobjArr->data[i]->f->cid;
978,929,531✔
1107

1108
      code = tLDataIterOpen2(pIter, pSttFileReader, cid, pMTree->backward, pConf, pLoadInfo, &range, &numOfRows,
979,035,629✔
1109
                             pMTree->idStr, loadStatisFromDisk);
1110
      if (code != TSDB_CODE_SUCCESS) {
978,363,565✔
1111
        goto _end;
×
1112
      }
1113

1114
      bool hasVal = NULL;
978,363,565✔
1115
      code = tLDataIterNextRow(pIter, pMTree->idStr, &hasVal);
978,597,867✔
1116
      if (code) {
978,331,639✔
1117
        goto _end;
×
1118
      }
1119

1120
      if (hasVal) {
978,331,639✔
1121
        tMergeTreeAddIter(pMTree, pIter);
359,983,833✔
1122

1123
        // let's record the time window for current table of uid in the stt files
1124
        if (pSttDataInfo != NULL && numOfRows > 0) {
359,998,371✔
1125
          void *px = taosArrayPush(pSttDataInfo->pKeyRangeList, &range);
359,928,231✔
1126
          QUERY_CHECK_NULL(px, code, lino, _end, terrno);
360,002,515✔
1127

1128
          pSttDataInfo->numOfRows += numOfRows;
360,002,515✔
1129
        }
1130
      } else {
1131
        if (!pMTree->ignoreEarlierTs) {
618,347,806✔
1132
          pMTree->ignoreEarlierTs = pIter->ignoreEarlierTs;
618,266,502✔
1133
        }
1134
      }
1135
    }
1136
  }
1137

1138
  if (pConf->cacheStatis && loadStatisFromDisk) {
943,179,623✔
1139
    SSttStatisCacheKey    k = {0};
345✔
1140
    SSttStatisCacheValue *pVal = NULL;
345✔
1141

1142
    code = buildSttTableRowsInfoKV(pConf, TD_VID(pConf->pTsdb->pVnode), &k, &pVal);
345✔
1143
    if (code == TSDB_CODE_SUCCESS) {
345✔
1144
      code = putStatisInfoIntoCache(statisCacheInfo.pStatisFileCache, &k, pVal, pConf->idstr);
345✔
1145
    }
1146
  }
1147

1148
  if (pHandle != NULL && pConf->cacheStatis) {
942,701,096✔
1149
    releaseCacheHandle(statisCacheInfo.pStatisFileCache, &pHandle, true);
183,720✔
1150
  }
1151

1152
  return code;
942,760,114✔
1153

1154
_end:
73,650,896✔
1155
  if (pHandle != NULL && pConf->cacheStatis) {
73,773,942✔
1156
    releaseCacheHandle(statisCacheInfo.pStatisFileCache, &pHandle, true);
×
1157
  }
1158

1159
  tMergeTreeClose(pMTree);
73,773,942✔
1160
  return code;
73,776,729✔
1161
}
1162

1163
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter) {
359,958,089✔
1164
  SRBTreeNode *node = tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pIter);
359,958,089✔
1165
}
359,998,238✔
1166

1167
bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree) { return pMTree->ignoreEarlierTs; }
×
1168

1169
static void tLDataIterPinSttBlock(SLDataIter *pIter, const char *id) {
2,147,483,647✔
1170
  SSttBlockLoadInfo *pInfo = pIter->pBlockLoadInfo;
2,147,483,647✔
1171

1172
  if (pInfo->blockData[0].sttBlockIndex == pIter->iSttBlk) {
2,147,483,647✔
1173
    pInfo->blockData[0].pin = true;
2,147,483,647✔
1174
    tsdbTrace("pin stt-block, blockIndex:%d, stt-fileVer:%" PRId64 " %s", pIter->iSttBlk, pIter->cid, id);
2,147,483,647✔
1175
    return;
2,147,483,647✔
1176
  }
1177

1178
  if (pInfo->blockData[1].sttBlockIndex == pIter->iSttBlk) {
2,147,483,647✔
1179
    pInfo->blockData[1].pin = true;
2,147,483,647✔
1180
    tsdbTrace("pin stt-block, blockIndex:%d, stt-fileVer:%" PRId64 " %s", pIter->iSttBlk, pIter->cid, id);
2,147,483,647✔
1181
    return;
2,147,483,647✔
1182
  }
1183

1184
  tsdbError("failed to pin any stt block, sttBlock:%d stt-fileVer:%" PRId64 " %s", pIter->iSttBlk, pIter->cid, id);
×
1185
}
1186

1187
static void tLDataIterUnpinSttBlock(SLDataIter *pIter, const char *id) {
2,147,483,647✔
1188
  SSttBlockLoadInfo *pInfo = pIter->pBlockLoadInfo;
2,147,483,647✔
1189
  if (pInfo->blockData[0].pin) {
2,147,483,647✔
1190
    pInfo->blockData[0].pin = false;
2,147,483,647✔
1191
    tsdbTrace("unpin stt-block:%d, stt-fileVer:%" PRId64 " %s", pInfo->blockData[0].sttBlockIndex, pIter->cid, id);
2,147,483,647✔
1192
    return;
2,147,483,647✔
1193
  }
1194

1195
  if (pInfo->blockData[1].pin) {
2,147,483,647✔
1196
    pInfo->blockData[1].pin = false;
2,147,483,647✔
1197
    tsdbTrace("unpin stt-block:%d, stt-fileVer:%" PRId64 " %s", pInfo->blockData[1].sttBlockIndex, pIter->cid, id);
2,147,483,647✔
1198
    return;
2,147,483,647✔
1199
  }
1200

1201
  tsdbError("failed to unpin any stt block, sttBlock:%d stt-fileVer:%" PRId64 " %s", pIter->iSttBlk, pIter->cid, id);
×
1202
}
1203

1204
void tMergeTreePinSttBlock(SMergeTree *pMTree) {
2,147,483,647✔
1205
  if (pMTree->pIter == NULL) {
2,147,483,647✔
1206
    return;
×
1207
  }
1208

1209
  SLDataIter *pIter = pMTree->pIter;
2,147,483,647✔
1210
  pMTree->pPinnedBlockIter = pIter;
2,147,483,647✔
1211
  tLDataIterPinSttBlock(pIter, pMTree->idStr);
2,147,483,647✔
1212
}
1213

1214
void tMergeTreeUnpinSttBlock(SMergeTree *pMTree) {
2,147,483,647✔
1215
  if (pMTree->pPinnedBlockIter == NULL) {
2,147,483,647✔
1216
    return;
×
1217
  }
1218

1219
  SLDataIter *pIter = pMTree->pPinnedBlockIter;
2,147,483,647✔
1220
  pMTree->pPinnedBlockIter = NULL;
2,147,483,647✔
1221
  tLDataIterUnpinSttBlock(pIter, pMTree->idStr);
2,147,483,647✔
1222
}
1223

1224
int32_t tMergeTreeNext(SMergeTree *pMTree, bool *pHasNext) {
2,147,483,647✔
1225
  int32_t code = 0;
2,147,483,647✔
1226
  if (pHasNext == NULL) {
2,147,483,647✔
1227
    return TSDB_CODE_INVALID_PARA;
×
1228
  }
1229

1230
  *pHasNext = false;
2,147,483,647✔
1231
  while (pMTree->pIter) {
2,147,483,647✔
1232
    SLDataIter *pIter = pMTree->pIter;
2,147,483,647✔
1233
    bool        hasVal = false;
2,147,483,647✔
1234
    code = tLDataIterNextRow(pIter, pMTree->idStr, &hasVal);
2,147,483,647✔
1235
    if (!hasVal || (code != 0)) {
2,147,483,647✔
1236
      if (code == TSDB_CODE_FILE_CORRUPTED) {
194,886,922✔
1237
        code = 0;  // suppress the file corrupt error to enable all queries within this cluster can run without failed.
×
1238
      }
1239

1240
      pMTree->pIter = NULL;
194,886,922✔
1241
    }
1242

1243
    // compare with min in RB Tree
1244
    pIter = (SLDataIter *)tRBTreeMin(&pMTree->rbt);
2,147,483,647✔
1245
    if (pMTree->pIter && pIter) {
2,147,483,647✔
1246
      int32_t c = pMTree->rbt.cmprFn(&pMTree->pIter->node, &pIter->node);
2,147,483,647✔
1247
      if (c > 0) {
2,147,483,647✔
1248
        (void)tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pMTree->pIter);
1,676,417,041✔
1249
        pMTree->pIter = NULL;
1,676,573,825✔
1250
      } else if (!c) {
2,137,839,773✔
1251
        continue;
×
1252
      }
1253
    }
1254

1255
    break;
2,147,483,647✔
1256
  }
1257

1258
  if (pMTree->pIter == NULL) {
2,147,483,647✔
1259
    pMTree->pIter = (SLDataIter *)tRBTreeMin(&pMTree->rbt);
2,147,483,647✔
1260
    if (pMTree->pIter) {
2,147,483,647✔
1261
      tRBTreeDrop(&pMTree->rbt, (SRBTreeNode *)pMTree->pIter);
2,031,450,353✔
1262
    }
1263
  }
1264

1265
  *pHasNext = (pMTree->pIter != NULL);
2,147,483,647✔
1266
  return code;
2,147,483,647✔
1267
}
1268

1269
void tMergeTreeClose(SMergeTree *pMTree) {
1,747,633,134✔
1270
  pMTree->pIter = NULL;
1,747,633,134✔
1271
  pMTree->pPinnedBlockIter = NULL;
1,748,008,350✔
1272
}
1,747,736,359✔
1273

1274
int32_t buildSttTableRowsInfoKV(SMergeTreeConf *pConf, int32_t vgId, SSttStatisCacheKey *pKey,
345✔
1275
                                SSttStatisCacheValue **pValue) {
1276
  *pValue = taosMemoryCalloc(1, sizeof(SSttStatisCacheValue));
345✔
1277
  if (*pValue == NULL) {
345✔
1278
    return terrno;
×
1279
  }
1280

1281
  memset(pKey, 0, sizeof(SSttStatisCacheKey));
345✔
1282

1283
  STFileSet *pFset = (STFileSet *)pConf->pCurrentFileset;
345✔
1284

1285
  pKey->suid = pConf->suid;
345✔
1286
  pKey->vgId = vgId;
345✔
1287
  pKey->fid = pFset->fid;
345✔
1288

1289
  (*pValue)->commitTs = pFset->lastCommit;
345✔
1290

1291
  int32_t numOfLevels = TARRAY2_SIZE(pFset->lvlArr);
345✔
1292

1293
  (*pValue)->pLevel = taosArrayInit(numOfLevels, sizeof(void *));
345✔
1294
  if ((*pValue)->pLevel == NULL) {
345✔
1295
    return terrno;
×
1296
  }
1297

1298
  for (int32_t j = 0; j < numOfLevels; ++j) {
690✔
1299
    SSttLvl *pSttLevel = pFset->lvlArr->data[j];
345✔
1300
    SArray * pIterList = taosArrayGetP(pConf->pSttFileBlockIterArray, j);
345✔
1301
    if (pIterList == NULL) {
345✔
1302
      return terrno;
×
1303
    }
1304

1305
    SArray *pRowsInfoArr = taosArrayInit(TARRAY2_SIZE(pSttLevel->fobjArr), sizeof(SSttTableRowsInfo));
345✔
1306
    if (pRowsInfoArr == NULL) {
345✔
1307
      return terrno;
×
1308
    }
1309

1310
    for (int32_t i = 0; i < TARRAY2_SIZE(pSttLevel->fobjArr); ++i) {  // open all stt file
690✔
1311
      SLDataIter *       pIter = taosArrayGetP(pIterList, i);
345✔
1312
      SSttBlockLoadInfo *pLoadInfo = pIter->pBlockLoadInfo;
345✔
1313

1314
      void *px = taosArrayPush(pRowsInfoArr, &pLoadInfo->info);
345✔
1315
      if (px == NULL) {
345✔
1316
        return terrno;
×
1317
      }
1318

1319
      memset(&pLoadInfo->info, 0, sizeof(SSttTableRowsInfo));
345✔
1320
    }
1321

1322
    void *px = taosArrayPush((*pValue)->pLevel, &pRowsInfoArr);
345✔
1323
    if (px == NULL) {
345✔
1324
      return terrno;
×
1325
    }
1326
  }
1327

1328
  // todo handle memory failure
1329

1330
  return TSDB_CODE_SUCCESS;
345✔
1331
}
1332

1333
int32_t getStatisInfoFromCache(SLRUCache *pCache, SSttStatisCacheKey *pKey, SSttStatisCacheValue **pValue,
184,065✔
1334
                               LRUHandle **pHandle, const char *id) {
1335
  *pValue = NULL;
184,065✔
1336
  *pHandle = NULL;
184,065✔
1337

1338
  (void)taosThreadMutexLock(&statisCacheInfo.lock);
184,065✔
1339
  LRUHandle *pItemHandle = taosLRUCacheLookup(pCache, pKey, sizeof(SSttStatisCacheKey));
184,065✔
1340
  if (pItemHandle == NULL) {
184,065✔
1341
    (void)taosThreadMutexUnlock(&statisCacheInfo.lock);
345✔
1342
    return TSDB_CODE_NOT_FOUND;
345✔
1343
  }
1344

1345
  void *p = taosLRUCacheValue(pCache, pItemHandle);
183,720✔
1346

1347
  *pValue = p;
183,720✔
1348
  *pHandle = pItemHandle;
183,720✔
1349

1350
  tsdbDebug("get statis info from cache suid:%" PRId64 ", vgId:%d, fid:%d, %s, commitTs:%" PRId64, pKey->suid,
183,720✔
1351
            pKey->vgId, pKey->fid, id, (*pValue)->commitTs);
1352

1353
  // (*pEntry)->hitTimes += 1;
1354
  (void)taosThreadMutexUnlock(&statisCacheInfo.lock);
183,720✔
1355
  return TSDB_CODE_SUCCESS;
183,720✔
1356
}
1357

1358
void releaseCacheHandle(SLRUCache *pCache, LRUHandle **pHandle, bool lock) {
183,720✔
1359
  if (lock) {
183,720✔
1360
    (void)taosThreadMutexLock(&statisCacheInfo.lock);
183,720✔
1361
  }
1362

1363
  bool ret = taosLRUCacheRelease(pCache, *pHandle, false);
183,720✔
1364
  *pHandle = NULL;
183,720✔
1365

1366
  if (lock) {
183,720✔
1367
    (void)taosThreadMutexUnlock(&statisCacheInfo.lock);
183,720✔
1368
  }
1369
}
183,720✔
1370

1371
void freeStatisFileItems(const void* key, size_t keyLen, void* value, void* ud) {
345✔
1372
  (void)ud;
1373

1374
  if (value == NULL) {
345✔
1375
    return;
×
1376
  }
1377

1378
  SSttStatisCacheKey *  pKey = (SSttStatisCacheKey *)key;
345✔
1379
  SSttStatisCacheValue *pVal = value;
345✔
1380

1381
  for(int32_t i = 0; i < taosArrayGetSize(pVal->pLevel); ++i) {
690✔
1382
    SArray* pInfos = taosArrayGetP(pVal->pLevel, i);
345✔
1383

1384
    for(int32_t j = 0; j < taosArrayGetSize(pInfos); ++j) {
690✔
1385
      SSttTableRowsInfo* p = taosArrayGet(pInfos, j);
345✔
1386
      taosArrayDestroy(p->pCount);
345✔
1387
      taosArrayDestroyEx(p->pFirstKey, freeItem);
345✔
1388
      taosArrayDestroyEx(p->pLastKey, freeItem);
345✔
1389
      taosArrayDestroy(p->pFirstTs);
345✔
1390
      taosArrayDestroy(p->pLastTs);
345✔
1391
      taosArrayDestroy(p->pUid);
345✔
1392
    }
1393

1394
    taosArrayDestroy(pInfos);
345✔
1395
  }
1396

1397
  taosArrayDestroy(pVal->pLevel);
345✔
1398
  taosMemoryFree(pVal);
345✔
1399
}
1400

1401
int32_t putStatisInfoIntoCache(SLRUCache *pCache, SSttStatisCacheKey *pKey, SSttStatisCacheValue *pValue,
345✔
1402
                               const char *id) {
1403
  (void)taosThreadMutexLock(&statisCacheInfo.lock);
345✔
1404

1405
  LRUStatus status = taosLRUCacheInsert(pCache, pKey, sizeof(SSttStatisCacheKey), pValue, getCacheValueSize(pValue),
345✔
1406
                                        freeStatisFileItems, NULL, NULL, TAOS_LRU_PRIORITY_LOW, NULL);
1407
  if (status != TAOS_LRU_STATUS_OK) {
345✔
1408
    if (status == TAOS_LRU_STATUS_FAIL) {
×
1409
      tsdbError("%s failed to insert items into statis cache, status:%d", id, status);
×
1410
      freeStatisFileItems(NULL, 0, pValue, NULL);
×
1411
    }
1412
  } else {
1413
    int32_t total = taosLRUCacheGetElems(pCache);
345✔
1414
    tsdbDebug("%s put statis info into cache, total:%d suid:%" PRId64 ", vgId:%d, fid:%d", id, total, pKey->suid,
345✔
1415
              pKey->vgId, pKey->fid);
1416
  }
1417

1418
  (void)taosThreadMutexUnlock(&statisCacheInfo.lock);
345✔
1419
  return TSDB_CODE_SUCCESS;
345✔
1420
}
1421

1422
void clearStatisInfoCache(SLRUCache *pStatisCache, SSttStatisCacheKey *pKey, LRUHandle** pHandle) {
×
1423
  (void)taosThreadMutexLock(&statisCacheInfo.lock);
×
1424
  releaseCacheHandle(statisCacheInfo.pStatisFileCache, pHandle, false);
×
1425
  taosLRUCacheErase(pStatisCache, pKey, sizeof(SSttStatisCacheKey));
×
1426
  (void)taosThreadMutexUnlock(&statisCacheInfo.lock);
×
1427
}
×
1428

1429
static int32_t dupPlayload(SValue *p){
×
1430
  if (p != NULL){
×
1431
    int32_t code = tValueDupPayload(p);
×
1432
    if (code != TSDB_CODE_SUCCESS) {
×
1433
      return code;
×
1434
    }
1435
  }
1436
  return 0;
×
1437
}
1438

1439
int32_t sttRowInfoDeepCopy(SSttTableRowsInfo *pDst, SSttTableRowsInfo *pInfo, int32_t numOfPKs) {
61,725✔
1440
  int32_t code = 0;
61,725✔
1441

1442
  pDst->pCount = taosArrayDup(pInfo->pCount, NULL);
61,725✔
1443
  pDst->pFirstKey = taosArrayDup(pInfo->pFirstKey, NULL);
61,725✔
1444
  pDst->pLastKey = taosArrayDup(pInfo->pLastKey, NULL);
61,725✔
1445
  pDst->pFirstTs = taosArrayDup(pInfo->pFirstTs, NULL);
61,725✔
1446
  pDst->pLastTs = taosArrayDup(pInfo->pLastTs, NULL);
61,725✔
1447
  pDst->pUid = taosArrayDup(pInfo->pUid, NULL);
61,725✔
1448

1449
  pDst->memSize = pInfo->memSize;
61,725✔
1450
  
1451
  if (numOfPKs > 0) {
61,725✔
1452
    int32_t len = taosArrayGetSize(pDst->pCount);
×
1453
    for (int32_t i = 0; i < len; ++i) {
×
1454
      SValue *p1 = (SValue *)taosArrayGet(pDst->pFirstKey, i);
×
1455
      if (p1 == NULL) {
×
1456
        return terrno;
×
1457
      }
1458
      code = tValueDupPayload(p1);
×
1459
      if (code != TSDB_CODE_SUCCESS) {
×
1460
        return code;
×
1461
      }
1462
      SValue *p2 = (SValue *)taosArrayGet(pDst->pLastKey, i);
×
1463
      if (p2 == NULL) {
×
1464
        return terrno;
×
1465
      }
1466
      code = tValueDupPayload(p2);
×
1467
      if (code != TSDB_CODE_SUCCESS) {
×
1468
        return code;
×
1469
      }
1470
    }
1471
  }
1472

1473
  return TSDB_CODE_SUCCESS;
61,725✔
1474
}
1475

1476
int32_t getSttTableRowsInfo(SSttStatisCacheValue *pValue, int32_t numOfPKs, int32_t levelIdx, int32_t fileIdx,
61,725✔
1477
                            SSttTableRowsInfo *pRowInfo) {
1478
  if (levelIdx >= taosArrayGetSize(pValue->pLevel)) {
61,725✔
1479
    return TSDB_CODE_INVALID_PARA;
×
1480
  }
1481

1482
  SArray *pRowsInfoArr = *(SArray **)taosArrayGet(pValue->pLevel, levelIdx);
61,725✔
1483
  if (pRowsInfoArr == NULL) {
61,725✔
1484
    return TSDB_CODE_INVALID_PARA;
×
1485
  }
1486

1487
  if (fileIdx >= taosArrayGetSize(pRowsInfoArr)) {
61,725✔
1488
    return TSDB_CODE_INVALID_PARA;
×
1489
  }
1490

1491
  void* p = (SSttTableRowsInfo *)taosArrayGet(pRowsInfoArr, fileIdx);
61,725✔
1492
  if (p == NULL) {
61,725✔
1493
    return TSDB_CODE_INVALID_PARA;
×
1494
  }
1495

1496
  return sttRowInfoDeepCopy(pRowInfo, p, numOfPKs);
61,725✔
1497
}
1498

1499
int32_t getCacheValueSize(const SSttStatisCacheValue *pValue) {
345✔
1500
  int32_t size = sizeof(SSttStatisCacheValue) + sizeof(SArray);
345✔
1501
  for (int32_t i = 0; i < taosArrayGetSize(pValue->pLevel); ++i) {
690✔
1502
    SArray *pRowsInfoArr = *(SArray **)taosArrayGet(pValue->pLevel, i);
345✔
1503
    
1504
    size += sizeof(SArray);
345✔
1505
    for (int32_t j = 0; j < taosArrayGetSize(pRowsInfoArr); ++j) {
690✔
1506
      SSttTableRowsInfo *p = (SSttTableRowsInfo *)taosArrayGet(pRowsInfoArr, j);
345✔
1507
      size += p->memSize;
345✔
1508
    }
1509
  }
1510

1511
  return size;
345✔
1512
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc