• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4952

06 Feb 2026 07:29AM UTC coverage: 66.869% (-0.02%) from 66.887%
#4952

push

travis-ci

web-flow
merge: from main to 3.0 #34521

758 of 1081 new or added lines in 28 files covered. (70.12%)

6496 existing lines in 142 files now uncovered.

205752 of 307696 relevant lines covered (66.87%)

126028909.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.33
/source/util/src/tpagedbuf.c
1
#define _DEFAULT_SOURCE
2
#include "tpagedbuf.h"
3
#include "taoserror.h"
4
#include "tcompression.h"
5
#include "tlog.h"
6
#include "tsimplehash.h"
7

8
#define GET_PAYLOAD_DATA(_p)           ((char*)(_p)->pData + POINTER_BYTES)
9
#define BUF_PAGE_IN_MEM(_p)            ((_p)->pData != NULL)
10
#define CLEAR_BUF_PAGE_IN_MEM_FLAG(_p) ((_p)->pData = NULL)
11
#define HAS_DATA_IN_DISK(_p)           ((_p)->offset >= 0)
12
#define NO_IN_MEM_AVAILABLE_PAGES(_b)  (listNEles((_b)->lruList) >= (_b)->inMemPages)
13

14
typedef struct SPageDiskInfo {
15
  int64_t offset;
16
  int32_t length;
17
} SPageDiskInfo, SFreeListItem;
18

19
struct SPageInfo {
20
  SListNode* pn;  // point to list node struct. it is NULL when the page is evicted from the in-memory buffer
21
  void*      pData;
22
  int64_t    offset;
23
  int32_t    pageId;
24
  int32_t    length : 29;
25
  bool       used : 1;   // set current page is in used
26
  bool       dirty : 1;  // set current buffer page is dirty or not
27
};
28

29
struct SDiskbasedBuf {
30
  int32_t    numOfPages;
31
  int64_t    totalBufSize;
32
  uint64_t   fileSize;  // disk file size
33
  TdFilePtr  pFile;
34
  int32_t    allocateId;  // allocated page id
35
  char*      path;        // file path
36
  char*      prefix;      // file name prefix
37
  int32_t    pageSize;    // current used page size
38
  int32_t    inMemPages;  // numOfPages that are allocated in memory
39
  SList*     freePgList;  // free page list
40
  SArray*    pIdList;     // page id list
41
  SSHashObj* all;
42
  SList*     lruList;
43
  void*      emptyDummyIdList;  // dummy id list
44
  void*      assistBuf;         // assistant buffer for compress/decompress data
45
  SArray*    pFree;             // free area in file
46
  bool       comp;              // compressed before flushed to disk
47
  uint64_t   nextPos;           // next page flush position
48

49
  char*               id;           // for debug purpose
50
  bool                printStatis;  // Print statistics info when closing this buffer.
51
  SDiskbasedBufStatis statis;
52
};
53

54
static int32_t createDiskFile(SDiskbasedBuf* pBuf) {
108,723✔
55
  if (pBuf->path == NULL) {  // prepare the file name when needed it
108,723✔
56
    char path[PATH_MAX] = {0};
108,723✔
57
    taosGetTmpfilePath(pBuf->prefix, "paged-buf", path);
108,723✔
58
    pBuf->path = taosStrdup(path);
108,723✔
59
    if (pBuf->path == NULL) {
108,723✔
60
      return terrno;
×
61
    }
62
  }
63

64
  pBuf->pFile =
108,723✔
65
      taosOpenFile(pBuf->path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL);
108,723✔
66
  if (pBuf->pFile == NULL) {
108,723✔
67
    return terrno;
×
68
  }
69

70
  return TSDB_CODE_SUCCESS;
108,723✔
71
}
72

73
static char* doCompressData(void* data, int32_t srcSize, int32_t* dst, SDiskbasedBuf* pBuf) {  // do nothing
630,290,274✔
74
  if (!pBuf->comp) {
630,290,274✔
75
    *dst = srcSize;
630,310,778✔
76
    return data;
630,322,894✔
77
  }
78

79
  *dst = tsCompressString(data, srcSize, 1, pBuf->assistBuf, srcSize, ONE_STAGE_COMP, NULL, 0);
×
80

81
  memcpy(data, pBuf->assistBuf, *dst);
×
82
  return data;
×
83
}
84

85
static int32_t doDecompressData(void* data, int32_t srcSize, int32_t* dst, SDiskbasedBuf* pBuf) {  // do nothing
566,315,558✔
86
  int32_t code = 0;
566,315,558✔
87
  if (!pBuf->comp) {
566,315,558✔
88
    *dst = srcSize;
566,337,460✔
89
    return code;
566,327,208✔
90
  }
91

92
  *dst = tsDecompressString(data, srcSize, 1, pBuf->assistBuf, pBuf->pageSize, ONE_STAGE_COMP, NULL, 0);
×
93
  if (*dst > 0) {
×
94
    memcpy(data, pBuf->assistBuf, *dst);
×
95
  } else if (*dst < 0) {
×
96
    return terrno;
×
97
  }
98
  return code;
×
99
  ;
100
}
101

102
static uint64_t allocateNewPositionInFile(SDiskbasedBuf* pBuf, size_t size) {
629,755,935✔
103
  if (pBuf->pFree == NULL) {
629,755,935✔
104
    return pBuf->nextPos;
×
105
  } else {
106
    int32_t offset = -1;
629,779,701✔
107

108
    size_t num = taosArrayGetSize(pBuf->pFree);
629,779,701✔
109
    for (int32_t i = 0; i < num; ++i) {
629,786,187✔
110
      SFreeListItem* pi = taosArrayGet(pBuf->pFree, i);
×
111
      if (pi->length >= size) {
×
112
        offset = pi->offset;
×
113
        pi->offset += (int32_t)size;
×
114
        pi->length -= (int32_t)size;
×
115

116
        return offset;
×
117
      }
118
    }
119

120
    // no available recycle space, allocate new area in file
121
    return pBuf->nextPos;
629,786,187✔
122
  }
123
}
124

125
/**
126
 *   +--------------------------+-------------------+--------------+
127
 *   | PTR to SPageInfo (8bytes)| Payload (PageSize)| 2 Extra Bytes|
128
 *   +--------------------------+-------------------+--------------+
129
 * @param pBuf
130
 * @param pg
131
 * @return
132
 */
133

134
static FORCE_INLINE size_t getAllocPageSize(int32_t pageSize) { return pageSize + POINTER_BYTES + sizeof(SFilePage); }
2,147,483,647✔
135

136
static int32_t doFlushBufPageImpl(SDiskbasedBuf* pBuf, int64_t offset, const char* pData, int32_t size) {
630,319,670✔
137
  int64_t ret = taosLSeekFile(pBuf->pFile, offset, SEEK_SET);
630,319,670✔
138
  if (ret < 0) {
630,300,060✔
139
    return terrno;
×
140
  }
141

142
  ret = (int32_t)taosWriteFile(pBuf->pFile, pData, size);
630,300,060✔
143
  if (ret != size) {
630,344,796✔
144
    return terrno;
×
145
  }
146

147
  // extend the file
148
  if (pBuf->fileSize < offset + size) {
630,344,796✔
149
    pBuf->fileSize = offset + size;
629,830,923✔
150
  }
151

152
  pBuf->statis.flushBytes += size;
630,342,000✔
153
  pBuf->statis.flushPages += 1;
630,351,320✔
154

155
  return TSDB_CODE_SUCCESS;
630,349,456✔
156
}
157

158
static char* doFlushBufPage(SDiskbasedBuf* pBuf, SPageInfo* pg) {
1,012,370,382✔
159
  if (pg->pData == NULL || pg->used) {
1,012,370,382✔
160
    uError("invalid params in paged buffer process when flushing buf to disk, %s", pBuf->id);
20,038✔
161
    terrno = TSDB_CODE_INVALID_PARA;
20,038✔
162
    return NULL;
×
163
  }
164

165
  int32_t size = pBuf->pageSize;
1,012,362,926✔
166
  int64_t offset = pg->offset;
1,012,357,800✔
167

168
  char* t = NULL;
1,012,376,440✔
169
  if ((!HAS_DATA_IN_DISK(pg)) || pg->dirty) {
1,012,376,440✔
170
    void* payload = GET_PAYLOAD_DATA(pg);
630,320,564✔
171
    t = doCompressData(payload, pBuf->pageSize + sizeof(SFilePage), &size, pBuf);
630,332,680✔
172
    if (size < 0) {
630,278,662✔
173
      uError("failed to compress data when flushing data to disk, %s", pBuf->id);
×
174
      terrno = TSDB_CODE_INVALID_PARA;
×
175
      return NULL;
×
176
    }
177
  }
178

179
  // this page is flushed to disk for the first time
180
  if (pg->dirty) {
1,012,321,490✔
181
    if (!HAS_DATA_IN_DISK(pg)) {
630,306,622✔
182
      offset = allocateNewPositionInFile(pBuf, size);
629,816,981✔
183
      pBuf->nextPos += size;
629,822,573✔
184

185
      int32_t code = doFlushBufPageImpl(pBuf, offset, t, size);
629,818,845✔
186
      if (code != TSDB_CODE_SUCCESS) {
629,818,341✔
187
        return NULL;
×
188
      }
189
    } else {
190
      // length becomes greater, current space is not enough, allocate new place, otherwise, do nothing
191
      if (pg->length < size) {
518,999✔
192
        // 1. add current space to free list
193
        SPageDiskInfo dinfo = {.length = pg->length, .offset = offset};
×
194
        if (NULL == taosArrayPush(pBuf->pFree, &dinfo)) {
×
195
          return NULL;
×
196
        }
197

198
        // 2. allocate new position, and update the info
199
        offset = allocateNewPositionInFile(pBuf, size);
×
200
        pBuf->nextPos += size;
×
201
      }
202

203
      int32_t code = doFlushBufPageImpl(pBuf, offset, t, size);
518,999✔
204
      if (code != TSDB_CODE_SUCCESS) {
518,999✔
205
        return NULL;
×
206
      }
207
    }
208
  } else {  // NOTE: the size may be -1, the this recycle page has not been flushed to disk yet.
209
    size = pg->length;
382,042,828✔
210
  }
211

212
  char* pDataBuf = pg->pData;
1,012,380,168✔
213
  memset(pDataBuf, 0, getAllocPageSize(pBuf->pageSize));
2,024,725,386✔
214

215
#ifdef BUF_PAGE_DEBUG
216
  uDebug("page_flush %p, pageId:%d, offset:%d", pDataBuf, pg->pageId, offset);
217
#endif
218

219
  pg->offset = offset;
1,012,387,624✔
220
  pg->length = size;  // on disk size
1,012,356,402✔
221
  return pDataBuf;
1,012,392,750✔
222
}
223

224
static char* flushBufPage(SDiskbasedBuf* pBuf, SPageInfo* pg) {
1,012,375,508✔
225
  int32_t ret = TSDB_CODE_SUCCESS;
1,012,375,508✔
226

227
  if (pBuf->pFile == NULL) {
1,012,375,508✔
228
    if ((ret = createDiskFile(pBuf)) != TSDB_CODE_SUCCESS) {
108,723✔
229
      terrno = ret;
×
230
      return NULL;
×
231
    }
232
  }
233

234
  char* p = doFlushBufPage(pBuf, pg);
1,012,387,624✔
235
  CLEAR_BUF_PAGE_IN_MEM_FLAG(pg);
1,012,362,460✔
236

237
  pg->dirty = false;
1,012,391,352✔
238
  return p;
1,012,386,226✔
239
}
240

241
// load file block data in disk
242
static int32_t loadPageFromDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
566,348,644✔
243
  if (pg->offset < 0 || pg->length <= 0) {
566,348,644✔
244
    uError("failed to load buf page from disk, offset:%" PRId64 ", length:%d, %s", pg->offset, pg->length, pBuf->id);
44,736✔
245
    return TSDB_CODE_INVALID_PARA;
×
246
  }
247

248
  int64_t ret = taosLSeekFile(pBuf->pFile, pg->offset, SEEK_SET);
566,300,646✔
249
  if (ret < 0) {
566,344,450✔
250
    ret = terrno;
×
251
    return ret;
×
252
  }
253

254
  void* pPage = (void*)GET_PAYLOAD_DATA(pg);
566,344,450✔
255
  ret = taosReadFile(pBuf->pFile, pPage, pg->length);
566,332,800✔
256
  if (ret != pg->length) {
566,344,916✔
257
    ret = terrno;
×
258
    return ret;
×
259
  }
260

261
  pBuf->statis.loadBytes += pg->length;
566,347,712✔
262
  pBuf->statis.loadPages += 1;
566,344,450✔
263

264
  int32_t fullSize = 0;
566,338,392✔
265
  return doDecompressData(pPage, pg->length, &fullSize, pBuf);
566,318,820✔
266
}
267

268
static SPageInfo* registerNewPageInfo(SDiskbasedBuf* pBuf, int32_t pageId) {
1,599,924,133✔
269
  pBuf->numOfPages += 1;
1,599,924,133✔
270

271
  SPageInfo* ppi = taosMemoryMalloc(sizeof(SPageInfo));
1,599,933,621✔
272
  if (ppi == NULL) {
1,599,790,232✔
273
    return NULL;
×
274
  }
275

276
  ppi->pageId = pageId;
1,599,790,232✔
277
  ppi->pData = NULL;
1,599,897,086✔
278
  ppi->offset = -1;
1,599,917,725✔
279
  ppi->length = -1;
1,599,907,833✔
280
  ppi->used = true;
1,599,972,487✔
281
  ppi->pn = NULL;
1,599,964,156✔
282
  ppi->dirty = false;
1,599,931,278✔
283

284
  SPageInfo** pRet = taosArrayPush(pBuf->pIdList, &ppi);
1,599,921,183✔
285
  if (NULL == pRet) {
1,599,959,506✔
286
    taosMemoryFree(ppi);
×
287
    return NULL;
×
288
  }
289
  return *pRet;
1,599,959,506✔
290
}
291

292
static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) {
1,012,374,576✔
293
  SListIter iter = {0};
1,012,374,576✔
294
  tdListInitIter(pBuf->lruList, &iter, TD_LIST_BACKWARD);
1,012,375,974✔
295

296
  SListNode* pn = NULL;
1,012,397,876✔
297
  while ((pn = tdListNext(&iter)) != NULL) {
1,047,172,236✔
298
    SPageInfo* pageInfo = *(SPageInfo**)pn->data;
1,047,172,740✔
299

300
    SPageInfo* p = *(SPageInfo**)(pageInfo->pData);
1,047,172,740✔
301

302
    if (!pageInfo->used) {
1,047,170,410✔
303
      break;
1,012,398,808✔
304
    }
305
  }
306

307
  return pn;
1,012,396,944✔
308
}
309

310
static char* evictBufPage(SDiskbasedBuf* pBuf) {
1,012,391,352✔
311
  SListNode* pn = getEldestUnrefedPage(pBuf);
1,012,391,352✔
312
  if (pn == NULL) {  // no available buffer pages now, return.
1,012,398,808✔
313
    return NULL;
×
314
  }
315

316
  terrno = 0;
1,012,398,808✔
317
  pn = tdListPopNode(pBuf->lruList, pn);
1,012,397,876✔
318

319
  SPageInfo* d = *(SPageInfo**)pn->data;
1,012,388,556✔
320

321
  d->pn = NULL;
1,012,391,818✔
322
  taosMemoryFreeClear(pn);
1,012,393,682✔
323

324
  return flushBufPage(pBuf, d);
1,012,375,042✔
325
}
326

327
static int32_t lruListPushFront(SList* pList, SPageInfo* pi) {
2,147,483,647✔
328
  int32_t code = tdListPrepend(pList, &pi);
2,147,483,647✔
329
  if (TSDB_CODE_SUCCESS != code) {
2,147,483,647✔
330
    return code;
×
331
  }
332
  SListNode* front = tdListGetHead(pList);
2,147,483,647✔
333
  pi->pn = front;
2,147,483,647✔
334
  return TSDB_CODE_SUCCESS;
2,147,483,647✔
335
}
336

337
static void lruListMoveToFront(SList* pList, SPageInfo* pi) {
2,147,483,647✔
338
  pi->pn = tdListPopNode(pList, pi->pn);
2,147,483,647✔
339
  tdListPrependNode(pList, pi->pn);
2,147,483,647✔
340
}
2,147,483,647✔
341

342
static SPageInfo* getPageInfoFromPayload(void* page) {
2,147,483,647✔
343
  char* p = (char*)page - POINTER_BYTES;
2,147,483,647✔
344

345
  SPageInfo* ppi = ((SPageInfo**)p)[0];
2,147,483,647✔
346
  return ppi;
2,147,483,647✔
347
}
348

349
int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int64_t inMemBufSize, const char* id,
229,703,536✔
350
                           const char* dir) {
351
  int32_t code = 0;
229,703,536✔
352
  *pBuf = NULL;
229,703,536✔
353
  SDiskbasedBuf* pPBuf = taosMemoryCalloc(1, sizeof(SDiskbasedBuf));
229,764,367✔
354
  if (pPBuf == NULL) {
229,222,581✔
355
    code = terrno;
×
356
    goto _error;
×
357
  }
358

359
  pPBuf->pageSize = pagesize;
229,222,581✔
360
  pPBuf->numOfPages = 0;  // all pages are in buffer in the first place
229,267,746✔
361
  pPBuf->totalBufSize = 0;
229,280,115✔
362
  pPBuf->allocateId = -1;
229,373,194✔
363
  pPBuf->pFile = NULL;
229,559,500✔
364
  pPBuf->id = taosStrdup(id);
229,642,422✔
365
  if (id != NULL && pPBuf->id == NULL) {
229,704,372✔
366
    code = terrno;
×
367
    goto _error;
×
368
  }
369
  pPBuf->fileSize = 0;
229,688,013✔
370
  pPBuf->pFree = taosArrayInit(4, sizeof(SFreeListItem));
229,712,400✔
371
  pPBuf->freePgList = tdListNew(POINTER_BYTES);
229,565,746✔
372
  if (pPBuf->pFree == NULL || pPBuf->freePgList == NULL) {
229,462,328✔
373
    code = terrno;
90,403✔
374
    goto _error;
×
375
  }
376

377
  // at least more than 2 pages must be in memory
378
  if (inMemBufSize < pagesize * 2) {
229,503,307✔
379
    inMemBufSize = pagesize * 2;
1,064,922✔
380
  }
381

382
  pPBuf->inMemPages = inMemBufSize / pagesize;  // maximum allowed pages, it is a soft limit.
229,503,307✔
383
  pPBuf->lruList = tdListNew(POINTER_BYTES);
229,656,000✔
384
  if (pPBuf->lruList == NULL) {
229,684,578✔
385
    code = terrno;
×
386
    goto _error;
×
387
  }
388

389
  // init id hash table
390
  _hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT);
229,692,249✔
391
  pPBuf->pIdList = taosArrayInit(4, POINTER_BYTES);
229,683,150✔
392
  if (pPBuf->pIdList == NULL) {
229,664,386✔
393
    code = terrno;
×
394
    goto _error;
×
395
  }
396

397
  pPBuf->all = tSimpleHashInit(64, fn);
229,640,512✔
398
  if (pPBuf->all == NULL) {
229,579,111✔
399
    code = terrno;
×
400
    goto _error;
×
401
  }
402

403
  pPBuf->prefix = (char*)dir;
229,583,268✔
404
  pPBuf->emptyDummyIdList = taosArrayInit(1, sizeof(int32_t));
229,621,414✔
405
  if (pPBuf->emptyDummyIdList == NULL) {
229,724,312✔
406
    code = terrno;
×
407
    goto _error;
×
408
  }
409

410
  //  qDebug("QInfo:0x%"PRIx64 ", create resBuf for output, page size:%d, inmem buf pages:%d, file:%s", qId,
411
  //  pPBuf->pageSize, pPBuf->inMemPages, pPBuf->path);
412

413
  *pBuf = pPBuf;
229,690,227✔
414
  return TSDB_CODE_SUCCESS;
229,665,819✔
415

416
_error:
×
417
  destroyDiskbasedBuf(pPBuf);
×
418
  *pBuf = NULL;
×
419
  return code;
×
420
}
421

422
static char* doExtractPage(SDiskbasedBuf* pBuf) {
2,147,483,647✔
423
  char* availablePage = NULL;
2,147,483,647✔
424
  if (NO_IN_MEM_AVAILABLE_PAGES(pBuf)) {
2,147,483,647✔
425
    availablePage = evictBufPage(pBuf);
1,012,398,808✔
426
    if (availablePage == NULL) {
1,012,359,236✔
427
      uWarn("no available buf pages, current:%d, max:%d, reason: %s, %s", listNEles(pBuf->lruList), pBuf->inMemPages,
×
428
            terrstr(), pBuf->id)
429
    }
430
  } else {
431
    availablePage =
1,153,556,713✔
432
        taosMemoryCalloc(1, getAllocPageSize(pBuf->pageSize));  // add extract bytes in case of zipped buffer increased.
2,147,483,647✔
433
  }
434

435
  return availablePage;
2,147,483,647✔
436
}
437

438
void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) {
1,599,830,347✔
439
  pBuf->statis.getPages += 1;
1,599,830,347✔
440

441
  char* availablePage = doExtractPage(pBuf);
1,599,909,584✔
442
  if (availablePage == NULL) {
1,599,580,792✔
443
    return NULL;
×
444
  }
445

446
  SPageInfo* pi = NULL;
1,599,580,792✔
447
  int32_t    code = 0;
1,599,593,717✔
448
  if (listNEles(pBuf->freePgList) != 0) {
1,599,593,717✔
449
    SListNode* pItem = tdListPopHead(pBuf->freePgList);
×
450
    pi = *(SPageInfo**)pItem->data;
×
451
    pi->used = true;
×
452
    *pageId = pi->pageId;
×
453
    taosMemoryFreeClear(pItem);
×
454
    code = lruListPushFront(pBuf->lruList, pi);
×
455
    if (TSDB_CODE_SUCCESS != code) {
×
456
      taosMemoryFree(pi);
×
457
      taosMemoryFree(availablePage);
×
458
      terrno = code;
×
459
      return NULL;
×
460
    }
461
  } else {  // create a new pageinfo
462
    // register new id in this group
463
    *pageId = (++pBuf->allocateId);
1,599,730,720✔
464

465
    // register page id info
466
    pi = registerNewPageInfo(pBuf, *pageId);
1,599,858,028✔
467
    if (pi == NULL) {
1,599,850,583✔
468
      taosMemoryFree(availablePage);
×
469
      return NULL;
×
470
    }
471

472
    // add to hash map
473
    int32_t code = tSimpleHashPut(pBuf->all, pageId, sizeof(int32_t), &pi, POINTER_BYTES);
1,599,850,583✔
474

475
    if (TSDB_CODE_SUCCESS == code) {
1,599,942,922✔
476
      // add to LRU list
477
      code = lruListPushFront(pBuf->lruList, pi);
1,599,904,607✔
478
    }
479
    if (TSDB_CODE_SUCCESS == code) {
1,599,866,397✔
480
      pBuf->totalBufSize += pBuf->pageSize;
1,599,866,397✔
481
    } else {
482
      taosMemoryFree(availablePage);
×
483
      SPageInfo **pLast = taosArrayPop(pBuf->pIdList);
×
484
      int32_t ret = tSimpleHashRemove(pBuf->all, pageId, sizeof(int32_t));
×
485
      if (ret != TSDB_CODE_SUCCESS) {
×
486
        uError("%s failed to clear pageId %d from buf hash-set since %s", __func__, *pageId, tstrerror(ret));
×
487
      }
488
      taosMemoryFree(pi);
×
489
      terrno = code;
×
490
      return NULL;
×
491
    }
492
  }
493

494
  pi->pData = availablePage;
1,599,945,009✔
495

496
  ((void**)pi->pData)[0] = pi;
1,599,929,709✔
497
#ifdef BUF_PAGE_DEBUG
498
  uDebug("page_getNewBufPage , pi->pData:%p, pageId:%d, offset:%" PRId64, pi->pData, pi->pageId, pi->offset);
499
#endif
500

501
  return (void*)(GET_PAYLOAD_DATA(pi));
1,599,857,697✔
502
}
503

504
void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) {
2,147,483,647✔
505
  if (id < 0) {
2,147,483,647✔
506
    terrno = TSDB_CODE_INVALID_PARA;
×
507
    uError("invalid page id:%d, %s", id, pBuf->id);
×
508
    return NULL;
×
509
  }
510

511
  pBuf->statis.getPages += 1;
2,147,483,647✔
512

513
  SPageInfo** pi = tSimpleHashGet(pBuf->all, &id, sizeof(int32_t));
2,147,483,647✔
514
  if (pi == NULL || *pi == NULL) {
2,147,483,647✔
UNCOV
515
    uError("failed to locate the buffer page:%d, %s", id, pBuf->id);
×
UNCOV
516
    terrno = TSDB_CODE_INVALID_PARA;
×
517
    return NULL;
×
518
  }
519

520
  if (BUF_PAGE_IN_MEM(*pi)) {  // it is in memory
2,147,483,647✔
521
    // no need to update the LRU list if only one page exists
522
    if (pBuf->numOfPages == 1) {
2,147,483,647✔
523
      (*pi)->used = true;
2,147,483,647✔
524
      return (void*)(GET_PAYLOAD_DATA(*pi));
2,147,483,647✔
525
    }
526

527
    SPageInfo** pInfo = (SPageInfo**)((*pi)->pn->data);
2,147,483,647✔
528
    if (*pInfo != *pi) {
2,147,483,647✔
529
      terrno = TSDB_CODE_APP_ERROR;
×
530
      uError("inconsistently data in paged buffer, pInfo:%p, pi:%p, %s", *pInfo, *pi, pBuf->id);
×
531
      return NULL;
×
532
    }
533

534
    lruListMoveToFront(pBuf->lruList, (*pi));
2,147,483,647✔
535
    (*pi)->used = true;
2,147,483,647✔
536

537
#ifdef BUF_PAGE_DEBUG
538
    uDebug("page_getBufPage1 pageId:%d, offset:%" PRId64, (*pi)->pageId, (*pi)->offset);
539
#endif
540
    return (void*)(GET_PAYLOAD_DATA(*pi));
2,147,483,647✔
541
  } else {  // not in memory
542

543
    (*pi)->pData = doExtractPage(pBuf);
566,348,644✔
544

545
    // failed to evict buffer page, return with error code.
546
    if ((*pi)->pData == NULL) {
566,328,140✔
547
      return NULL;
×
548
    }
549

550
    // set the ptr to the new SPageInfo
551
    ((void**)((*pi)->pData))[0] = (*pi);
566,349,110✔
552

553
    int32_t code = lruListPushFront(pBuf->lruList, *pi);
566,345,382✔
554
    if (TSDB_CODE_SUCCESS != code) {
566,311,830✔
555
      taosMemoryFree((*pi)->pData);
×
556
      (*pi)->pData = NULL;
×
557
      terrno = code;
×
558
      return NULL;
×
559
    }
560
    (*pi)->used = true;
566,311,830✔
561

562
    // some data has been flushed to disk, and needs to be loaded into buffer again.
563
    if (HAS_DATA_IN_DISK(*pi)) {
566,348,178✔
564
      int32_t code = loadPageFromDisk(pBuf, *pi);
566,346,780✔
565
      if (code != 0) {
566,330,936✔
566
        taosMemoryFree((*pi)->pData);
×
567
        (*pi)->pData = NULL;
×
568
        terrno = code;
×
569
        return NULL;
×
570
      }
571
    }
572
#ifdef BUF_PAGE_DEBUG
573
    uDebug("page_getBufPage2 pageId:%d, offset:%" PRId64, (*pi)->pageId, (*pi)->offset);
574
#endif
575
    return (void*)(GET_PAYLOAD_DATA(*pi));
566,317,422✔
576
  }
577
}
578

579
void releaseBufPage(SDiskbasedBuf* pBuf, void* page) {
2,147,483,647✔
580
  if (page == NULL) {
2,147,483,647✔
581
    return;
×
582
  }
583

584
  SPageInfo* ppi = getPageInfoFromPayload(page);
2,147,483,647✔
585
  releaseBufPageInfo(pBuf, ppi);
2,147,483,647✔
586
}
587

588
void releaseBufPageInfo(SDiskbasedBuf* pBuf, SPageInfo* pi) {
2,147,483,647✔
589
#ifdef BUF_PAGE_DEBUG
590
  uDebug("page_releaseBufPageInfo pageId:%d, used:%d, offset:%" PRId64, pi->pageId, pi->used, pi->offset);
591
#endif
592

593
  if (pi == NULL) {
2,147,483,647✔
594
    return;
×
595
  }
596

597
  if (pi->pData == NULL) {
2,147,483,647✔
598
    uError("pi->pData (page data) is null");
×
599
    return;
×
600
  }
601

602
  pi->used = false;
2,147,483,647✔
603
  pBuf->statis.releasePages += 1;
2,147,483,647✔
604
}
605

606
size_t getTotalBufSize(const SDiskbasedBuf* pBuf) { return (size_t)pBuf->totalBufSize; }
×
607

608
SArray* getDataBufPagesIdList(SDiskbasedBuf* pBuf) { return pBuf->pIdList; }
×
609

610
void destroyDiskbasedBuf(SDiskbasedBuf* pBuf) {
284,026,647✔
611
  if (pBuf == NULL) {
284,026,647✔
612
    return;
54,209,352✔
613
  }
614

615
  dBufPrintStatis(pBuf);
229,817,295✔
616

617
  bool needRemoveFile = false;
229,781,792✔
618
  if (pBuf->pFile != NULL) {
229,781,792✔
619
    needRemoveFile = true;
108,723✔
620
    uDebug(
108,723✔
621
        "Paged buffer closed, total:%.2f Kb (%d Pages), inmem size:%.2f Kb (%d Pages), file size:%.2f Kb, page "
622
        "size:%.2f Kb, %s",
623
        pBuf->totalBufSize / 1024.0, pBuf->numOfPages, listNEles(pBuf->lruList) * pBuf->pageSize / 1024.0,
624
        listNEles(pBuf->lruList), pBuf->fileSize / 1024.0, pBuf->pageSize / 1024.0f, pBuf->id);
625

626
    int32_t code = taosCloseFile(&pBuf->pFile);
108,723✔
627
    if (TSDB_CODE_SUCCESS != code) {
108,723✔
628
      uDebug("WARNING tPage failed to close file when destroy disk basebuf: %s", pBuf->path);
×
629
    }
630
  } else {
631
    uDebug("Paged buffer closed, total:%.2f Kb, no file created, %s", pBuf->totalBufSize / 1024.0, pBuf->id);
229,657,839✔
632
  }
633

634
  // print the statistics information
635
  {
636
    SDiskbasedBufStatis* ps = &pBuf->statis;
229,767,186✔
637
    if (ps->loadPages == 0) {
229,826,931✔
638
      uDebug("Get/Release pages:%d/%d, flushToDisk:%.2f Kb (%d Pages), loadFromDisk:%.2f Kb (%d Pages)", ps->getPages,
229,704,868✔
639
             ps->releasePages, ps->flushBytes / 1024.0f, ps->flushPages, ps->loadBytes / 1024.0f, ps->loadPages);
640
    } else {
641
      uDebug(
108,723✔
642
          "Get/Release pages:%d/%d, flushToDisk:%.2f Kb (%d Pages), loadFromDisk:%.2f Kb (%d Pages), avgPgSize:%.2f Kb",
643
          ps->getPages, ps->releasePages, ps->flushBytes / 1024.0f, ps->flushPages, ps->loadBytes / 1024.0f,
644
          ps->loadPages, ps->loadBytes / (1024.0 * ps->loadPages));
645
    }
646
  }
647

648
  if (needRemoveFile) {
229,819,204✔
649
    int32_t ret = taosRemoveFile(pBuf->path);
108,723✔
650
    if (ret != 0) {  // print the error and discard this error info
108,723✔
651
      uDebug("WARNING tPage remove file failed. path=%s, code:%s", pBuf->path, strerror(ERRNO));
108,723✔
652
    }
653
  }
654

655
  taosMemoryFreeClear(pBuf->path);
229,819,204✔
656

657
  size_t n = taosArrayGetSize(pBuf->pIdList);
229,832,062✔
658
  for (int32_t i = 0; i < n; ++i) {
1,255,908,156✔
659
    SPageInfo* pi = taosArrayGetP(pBuf->pIdList, i);
1,026,137,264✔
660
    taosMemoryFreeClear(pi->pData);
1,026,123,673✔
661
    taosMemoryFreeClear(pi);
1,025,985,852✔
662
  }
663

664
  taosArrayDestroy(pBuf->pIdList);
229,770,892✔
665

666
  pBuf->lruList = tdListFree(pBuf->lruList);
229,777,507✔
667
  pBuf->freePgList = tdListFree(pBuf->freePgList);
229,747,204✔
668

669
  taosArrayDestroy(pBuf->emptyDummyIdList);
229,778,359✔
670
  taosArrayDestroy(pBuf->pFree);
229,728,958✔
671

672
  tSimpleHashCleanup(pBuf->all);
229,769,811✔
673

674
  taosMemoryFreeClear(pBuf->id);
229,785,776✔
675
  taosMemoryFreeClear(pBuf->assistBuf);
229,761,338✔
676
  taosMemoryFreeClear(pBuf);
229,811,089✔
677
}
678

679
SPageInfo* getLastPageInfo(SArray* pList) {
×
680
  size_t     size = taosArrayGetSize(pList);
×
681
  SPageInfo* pPgInfo = taosArrayGetP(pList, size - 1);
×
682
  return pPgInfo;
×
683
}
684

685
int32_t getPageId(const SPageInfo* pPgInfo) { return pPgInfo->pageId; }
×
686

687
int32_t getBufPageSize(const SDiskbasedBuf* pBuf) { return pBuf->pageSize; }
2,147,483,647✔
688

689
int32_t getNumOfInMemBufPages(const SDiskbasedBuf* pBuf) { return pBuf->inMemPages; }
11,853,086✔
690

691
bool isAllDataInMemBuf(const SDiskbasedBuf* pBuf) { return pBuf->fileSize == 0; }
×
692

693
void setBufPageDirty(void* pPage, bool dirty) {
2,147,483,647✔
694
  SPageInfo* ppi = getPageInfoFromPayload(pPage);
2,147,483,647✔
695
  ppi->dirty = dirty;
2,147,483,647✔
696
}
2,147,483,647✔
697

698
int32_t setBufPageCompressOnDisk(SDiskbasedBuf* pBuf, bool comp) {
×
699
  pBuf->comp = comp;
×
700
  if (comp && (pBuf->assistBuf == NULL)) {
×
701
    pBuf->assistBuf = taosMemoryMalloc(pBuf->pageSize + 2);  // EXTRA BYTES
×
702
    if (pBuf->assistBuf) {
×
703
      return terrno;
×
704
    }
705
  }
706
  return TSDB_CODE_SUCCESS;
×
707
}
708

709
int32_t dBufSetBufPageRecycled(SDiskbasedBuf* pBuf, void* pPage) {
×
710
  SPageInfo* ppi = getPageInfoFromPayload(pPage);
×
711

712
  int32_t code = tdListAppend(pBuf->freePgList, &ppi);
×
713
  if (TSDB_CODE_SUCCESS != code) {
×
714
    return code;
×
715
  }
716

717
  ppi->used = false;
×
718
  ppi->dirty = false;
×
719

720
  // add this pageinfo into the free page info list
721
  SListNode* pNode = tdListPopNode(pBuf->lruList, ppi->pn);
×
722
  taosMemoryFreeClear(ppi->pData);
×
723
  taosMemoryFreeClear(pNode);
×
724
  ppi->pn = NULL;
×
725
  return TSDB_CODE_SUCCESS;
×
726
}
727

728
void dBufSetPrintInfo(SDiskbasedBuf* pBuf) { pBuf->printStatis = true; }
29,055,651✔
729

730
SDiskbasedBufStatis getDBufStatis(const SDiskbasedBuf* pBuf) { return pBuf->statis; }
12,254,520✔
731

732
void dBufPrintStatis(const SDiskbasedBuf* pBuf) {
229,817,854✔
733
  if (!pBuf->printStatis) {
229,817,854✔
734
    return;
200,771,859✔
735
  }
736

737
  const SDiskbasedBufStatis* ps = &pBuf->statis;
29,058,393✔
738

739
#if 0
740
  printf(
741
      "Paged buffer closed, total:%.2f Kb (%d Pages), inmem size:%.2f Kb (%d Pages), file size:%.2f Kb, page size:%.2f "
742
      "Kb, %s\n",
743
      pBuf->totalBufSize / 1024.0, pBuf->numOfPages, listNEles(pBuf->lruList) * pBuf->pageSize / 1024.0,
744
      listNEles(pBuf->lruList), pBuf->fileSize / 1024.0, pBuf->pageSize / 1024.0f, pBuf->id);
745
#endif
746

747
  if (ps->loadPages > 0) {
29,058,842✔
748
    (void)printf(
15,262✔
749
        "Get/Release pages:%d/%d, flushToDisk:%.2f Kb (%d Pages), loadFromDisk:%.2f Kb (%d Pages), avgPageSize:%.2f "
750
        "Kb\n",
751
        ps->getPages, ps->releasePages, ps->flushBytes / 1024.0f, ps->flushPages, ps->loadBytes / 1024.0f,
15,262✔
752
        ps->loadPages, ps->loadBytes / (1024.0 * ps->loadPages));
15,262✔
753
  } else {
754
    // printf("no page loaded\n");
755
  }
756
}
757

758
void clearDiskbasedBuf(SDiskbasedBuf* pBuf) {
2,928,964✔
759
  size_t n = taosArrayGetSize(pBuf->pIdList);
2,928,964✔
760
  for (int32_t i = 0; i < n; ++i) {
576,811,143✔
761
    SPageInfo* pi = taosArrayGetP(pBuf->pIdList, i);
573,880,355✔
762
    taosMemoryFreeClear(pi->pData);
573,882,911✔
763
    taosMemoryFreeClear(pi);
573,879,995✔
764
  }
765

766
  taosArrayClear(pBuf->pIdList);
2,930,788✔
767

768
  tdListEmpty(pBuf->lruList);
2,931,148✔
769
  tdListEmpty(pBuf->freePgList);
2,930,782✔
770

771
  taosArrayClear(pBuf->emptyDummyIdList);
2,929,318✔
772
  taosArrayClear(pBuf->pFree);
2,930,056✔
773

774
  tSimpleHashClear(pBuf->all);
2,930,056✔
775

776
  pBuf->numOfPages = 0;  // all pages are in buffer in the first place
2,931,880✔
777
  pBuf->totalBufSize = 0;
2,931,880✔
778
  pBuf->allocateId = -1;
2,931,514✔
779
  pBuf->fileSize = 0;
2,931,880✔
780
}
2,931,148✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc