• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3523

06 Nov 2024 02:29AM UTC coverage: 55.861% (-2.4%) from 58.216%
#3523

push

travis-ci

web-flow
Merge pull request #28551 from taosdata/feat/TS-5215-2

test(blob): testing & fixes for blob

106075 of 245834 branches covered (43.15%)

Branch coverage included in aggregate %.

0 of 15 new or added lines in 2 files covered. (0.0%)

17003 existing lines in 254 files now uncovered.

181910 of 269703 relevant lines covered (67.45%)

1527639.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

41.56
/source/util/src/tpagedbuf.c
1
#define _DEFAULT_SOURCE
2
#include "tpagedbuf.h"
3
#include "taoserror.h"
4
#include "tcompression.h"
5
#include "tlog.h"
6
#include "tsimplehash.h"
7

8
#define GET_PAYLOAD_DATA(_p)           ((char*)(_p)->pData + POINTER_BYTES)
9
#define BUF_PAGE_IN_MEM(_p)            ((_p)->pData != NULL)
10
#define CLEAR_BUF_PAGE_IN_MEM_FLAG(_p) ((_p)->pData = NULL)
11
#define HAS_DATA_IN_DISK(_p)           ((_p)->offset >= 0)
12
#define NO_IN_MEM_AVAILABLE_PAGES(_b)  (listNEles((_b)->lruList) >= (_b)->inMemPages)
13

14
typedef struct SPageDiskInfo {
15
  int64_t offset;
16
  int32_t length;
17
} SPageDiskInfo, SFreeListItem;
18

19
struct SPageInfo {
20
  SListNode* pn;  // point to list node struct. it is NULL when the page is evicted from the in-memory buffer
21
  void*      pData;
22
  int64_t    offset;
23
  int32_t    pageId;
24
  int32_t    length : 29;
25
  bool       used : 1;   // set current page is in used
26
  bool       dirty : 1;  // set current buffer page is dirty or not
27
};
28

29
struct SDiskbasedBuf {
30
  int32_t    numOfPages;
31
  int64_t    totalBufSize;
32
  uint64_t   fileSize;  // disk file size
33
  TdFilePtr  pFile;
34
  int32_t    allocateId;  // allocated page id
35
  char*      path;        // file path
36
  char*      prefix;      // file name prefix
37
  int32_t    pageSize;    // current used page size
38
  int32_t    inMemPages;  // numOfPages that are allocated in memory
39
  SList*     freePgList;  // free page list
40
  SArray*    pIdList;     // page id list
41
  SSHashObj* all;
42
  SList*     lruList;
43
  void*      emptyDummyIdList;  // dummy id list
44
  void*      assistBuf;         // assistant buffer for compress/decompress data
45
  SArray*    pFree;             // free area in file
46
  bool       comp;              // compressed before flushed to disk
47
  uint64_t   nextPos;           // next page flush position
48

49
  char*               id;           // for debug purpose
50
  bool                printStatis;  // Print statistics info when closing this buffer.
51
  SDiskbasedBufStatis statis;
52
};
53

UNCOV
54
static int32_t createDiskFile(SDiskbasedBuf* pBuf) {
×
UNCOV
55
  if (pBuf->path == NULL) {  // prepare the file name when needed it
×
UNCOV
56
    char path[PATH_MAX] = {0};
×
UNCOV
57
    taosGetTmpfilePath(pBuf->prefix, "paged-buf", path);
×
UNCOV
58
    pBuf->path = taosStrdup(path);
×
UNCOV
59
    if (pBuf->path == NULL) {
×
60
      return terrno;
×
61
    }
62
  }
63

UNCOV
64
  pBuf->pFile =
×
UNCOV
65
      taosOpenFile(pBuf->path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL);
×
UNCOV
66
  if (pBuf->pFile == NULL) {
×
67
    return terrno;
×
68
  }
69

UNCOV
70
  return TSDB_CODE_SUCCESS;
×
71
}
72

UNCOV
73
static char* doCompressData(void* data, int32_t srcSize, int32_t* dst, SDiskbasedBuf* pBuf) {  // do nothing
×
UNCOV
74
  if (!pBuf->comp) {
×
UNCOV
75
    *dst = srcSize;
×
UNCOV
76
    return data;
×
77
  }
78

79
  *dst = tsCompressString(data, srcSize, 1, pBuf->assistBuf, srcSize, ONE_STAGE_COMP, NULL, 0);
×
80

81
  memcpy(data, pBuf->assistBuf, *dst);
×
82
  return data;
×
83
}
84

UNCOV
85
static int32_t doDecompressData(void* data, int32_t srcSize, int32_t* dst, SDiskbasedBuf* pBuf) {  // do nothing
×
UNCOV
86
  int32_t code = 0;
×
UNCOV
87
  if (!pBuf->comp) {
×
UNCOV
88
    *dst = srcSize;
×
UNCOV
89
    return code;
×
90
  }
91

92
  *dst = tsDecompressString(data, srcSize, 1, pBuf->assistBuf, pBuf->pageSize, ONE_STAGE_COMP, NULL, 0);
×
93
  if (*dst > 0) {
×
94
    memcpy(data, pBuf->assistBuf, *dst);
×
95
  } else if (*dst < 0) {
×
96
    return terrno;
×
97
  }
98
  return code;
×
99
  ;
100
}
101

UNCOV
102
static uint64_t allocateNewPositionInFile(SDiskbasedBuf* pBuf, size_t size) {
×
UNCOV
103
  if (pBuf->pFree == NULL) {
×
104
    return pBuf->nextPos;
×
105
  } else {
UNCOV
106
    int32_t offset = -1;
×
107

UNCOV
108
    size_t num = taosArrayGetSize(pBuf->pFree);
×
UNCOV
109
    for (int32_t i = 0; i < num; ++i) {
×
110
      SFreeListItem* pi = taosArrayGet(pBuf->pFree, i);
×
111
      if (pi->length >= size) {
×
112
        offset = pi->offset;
×
113
        pi->offset += (int32_t)size;
×
114
        pi->length -= (int32_t)size;
×
115

116
        return offset;
×
117
      }
118
    }
119

120
    // no available recycle space, allocate new area in file
UNCOV
121
    return pBuf->nextPos;
×
122
  }
123
}
124

125
/**
126
 *   +--------------------------+-------------------+--------------+
127
 *   | PTR to SPageInfo (8bytes)| Payload (PageSize)| 2 Extra Bytes|
128
 *   +--------------------------+-------------------+--------------+
129
 * @param pBuf
130
 * @param pg
131
 * @return
132
 */
133

134
static FORCE_INLINE size_t getAllocPageSize(int32_t pageSize) { return pageSize + POINTER_BYTES + sizeof(SFilePage); }
270,905✔
135

UNCOV
136
static int32_t doFlushBufPageImpl(SDiskbasedBuf* pBuf, int64_t offset, const char* pData, int32_t size) {
×
UNCOV
137
  int64_t ret = taosLSeekFile(pBuf->pFile, offset, SEEK_SET);
×
UNCOV
138
  if (ret < 0) {
×
139
    return terrno;
×
140
  }
141

UNCOV
142
  ret = (int32_t)taosWriteFile(pBuf->pFile, pData, size);
×
UNCOV
143
  if (ret != size) {
×
144
    return terrno;
×
145
  }
146

147
  // extend the file
UNCOV
148
  if (pBuf->fileSize < offset + size) {
×
UNCOV
149
    pBuf->fileSize = offset + size;
×
150
  }
151

UNCOV
152
  pBuf->statis.flushBytes += size;
×
UNCOV
153
  pBuf->statis.flushPages += 1;
×
154

UNCOV
155
  return TSDB_CODE_SUCCESS;
×
156
}
157

UNCOV
158
static char* doFlushBufPage(SDiskbasedBuf* pBuf, SPageInfo* pg) {
×
UNCOV
159
  if (pg->pData == NULL || pg->used) {
×
160
    uError("invalid params in paged buffer process when flushing buf to disk, %s", pBuf->id);
×
161
    terrno = TSDB_CODE_INVALID_PARA;
×
162
    return NULL;
×
163
  }
164

UNCOV
165
  int32_t size = pBuf->pageSize;
×
UNCOV
166
  int64_t offset = pg->offset;
×
167

UNCOV
168
  char* t = NULL;
×
UNCOV
169
  if ((!HAS_DATA_IN_DISK(pg)) || pg->dirty) {
×
UNCOV
170
    void* payload = GET_PAYLOAD_DATA(pg);
×
UNCOV
171
    t = doCompressData(payload, pBuf->pageSize + sizeof(SFilePage), &size, pBuf);
×
UNCOV
172
    if (size < 0) {
×
173
      uError("failed to compress data when flushing data to disk, %s", pBuf->id);
×
174
      terrno = TSDB_CODE_INVALID_PARA;
×
175
      return NULL;
×
176
    }
177
  }
178

179
  // this page is flushed to disk for the first time
UNCOV
180
  if (pg->dirty) {
×
UNCOV
181
    if (!HAS_DATA_IN_DISK(pg)) {
×
UNCOV
182
      offset = allocateNewPositionInFile(pBuf, size);
×
UNCOV
183
      pBuf->nextPos += size;
×
184

UNCOV
185
      int32_t code = doFlushBufPageImpl(pBuf, offset, t, size);
×
UNCOV
186
      if (code != TSDB_CODE_SUCCESS) {
×
187
        return NULL;
×
188
      }
189
    } else {
190
      // length becomes greater, current space is not enough, allocate new place, otherwise, do nothing
UNCOV
191
      if (pg->length < size) {
×
192
        // 1. add current space to free list
193
        SPageDiskInfo dinfo = {.length = pg->length, .offset = offset};
×
194
        if (NULL == taosArrayPush(pBuf->pFree, &dinfo)) {
×
195
          return NULL;
×
196
        }
197

198
        // 2. allocate new position, and update the info
199
        offset = allocateNewPositionInFile(pBuf, size);
×
200
        pBuf->nextPos += size;
×
201
      }
202

UNCOV
203
      int32_t code = doFlushBufPageImpl(pBuf, offset, t, size);
×
UNCOV
204
      if (code != TSDB_CODE_SUCCESS) {
×
205
        return NULL;
×
206
      }
207
    }
208
  } else {  // NOTE: the size may be -1, the this recycle page has not been flushed to disk yet.
UNCOV
209
    size = pg->length;
×
210
  }
211

UNCOV
212
  char* pDataBuf = pg->pData;
×
UNCOV
213
  memset(pDataBuf, 0, getAllocPageSize(pBuf->pageSize));
×
214

215
#ifdef BUF_PAGE_DEBUG
216
  uDebug("page_flush %p, pageId:%d, offset:%d", pDataBuf, pg->pageId, offset);
217
#endif
218

UNCOV
219
  pg->offset = offset;
×
UNCOV
220
  pg->length = size;  // on disk size
×
UNCOV
221
  return pDataBuf;
×
222
}
223

UNCOV
224
static char* flushBufPage(SDiskbasedBuf* pBuf, SPageInfo* pg) {
×
UNCOV
225
  int32_t ret = TSDB_CODE_SUCCESS;
×
226

UNCOV
227
  if (pBuf->pFile == NULL) {
×
UNCOV
228
    if ((ret = createDiskFile(pBuf)) != TSDB_CODE_SUCCESS) {
×
229
      terrno = ret;
×
230
      return NULL;
×
231
    }
232
  }
233

UNCOV
234
  char* p = doFlushBufPage(pBuf, pg);
×
UNCOV
235
  CLEAR_BUF_PAGE_IN_MEM_FLAG(pg);
×
236

UNCOV
237
  pg->dirty = false;
×
UNCOV
238
  return p;
×
239
}
240

241
// load file block data in disk
UNCOV
242
static int32_t loadPageFromDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
×
UNCOV
243
  if (pg->offset < 0 || pg->length <= 0) {
×
244
    uError("failed to load buf page from disk, offset:%" PRId64 ", length:%d, %s", pg->offset, pg->length, pBuf->id);
×
245
    return TSDB_CODE_INVALID_PARA;
×
246
  }
247

UNCOV
248
  int64_t ret = taosLSeekFile(pBuf->pFile, pg->offset, SEEK_SET);
×
UNCOV
249
  if (ret < 0) {
×
250
    ret = terrno;
×
251
    return ret;
×
252
  }
253

UNCOV
254
  void* pPage = (void*)GET_PAYLOAD_DATA(pg);
×
UNCOV
255
  ret = taosReadFile(pBuf->pFile, pPage, pg->length);
×
UNCOV
256
  if (ret != pg->length) {
×
257
    ret = terrno;
×
258
    return ret;
×
259
  }
260

UNCOV
261
  pBuf->statis.loadBytes += pg->length;
×
UNCOV
262
  pBuf->statis.loadPages += 1;
×
263

UNCOV
264
  int32_t fullSize = 0;
×
UNCOV
265
  return doDecompressData(pPage, pg->length, &fullSize, pBuf);
×
266
}
267

268
static SPageInfo* registerNewPageInfo(SDiskbasedBuf* pBuf, int32_t pageId) {
270,924✔
269
  pBuf->numOfPages += 1;
270,924✔
270

271
  SPageInfo* ppi = taosMemoryMalloc(sizeof(SPageInfo));
270,924✔
272
  if (ppi == NULL) {
270,979✔
273
    terrno = TSDB_CODE_OUT_OF_MEMORY;
6✔
274
    return NULL;
6✔
275
  }
276

277
  ppi->pageId = pageId;
270,973✔
278
  ppi->pData = NULL;
270,973✔
279
  ppi->offset = -1;
270,973✔
280
  ppi->length = -1;
270,973✔
281
  ppi->used = true;
270,973✔
282
  ppi->pn = NULL;
270,973✔
283
  ppi->dirty = false;
270,973✔
284

285
  SPageInfo** pRet = taosArrayPush(pBuf->pIdList, &ppi);
270,973✔
286
  if (NULL == pRet) {
270,969✔
287
    taosMemoryFree(ppi);
2✔
288
    return NULL;
2✔
289
  }
290
  return *pRet;
270,967✔
291
}
292

UNCOV
293
static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) {
×
UNCOV
294
  SListIter iter = {0};
×
UNCOV
295
  tdListInitIter(pBuf->lruList, &iter, TD_LIST_BACKWARD);
×
296

UNCOV
297
  SListNode* pn = NULL;
×
UNCOV
298
  while ((pn = tdListNext(&iter)) != NULL) {
×
UNCOV
299
    SPageInfo* pageInfo = *(SPageInfo**)pn->data;
×
300

UNCOV
301
    SPageInfo* p = *(SPageInfo**)(pageInfo->pData);
×
302

UNCOV
303
    if (!pageInfo->used) {
×
UNCOV
304
      break;
×
305
    }
306
  }
307

UNCOV
308
  return pn;
×
309
}
310

UNCOV
311
static char* evictBufPage(SDiskbasedBuf* pBuf) {
×
UNCOV
312
  SListNode* pn = getEldestUnrefedPage(pBuf);
×
UNCOV
313
  if (pn == NULL) {  // no available buffer pages now, return.
×
314
    return NULL;
×
315
  }
316

UNCOV
317
  terrno = 0;
×
UNCOV
318
  pn = tdListPopNode(pBuf->lruList, pn);
×
319

UNCOV
320
  SPageInfo* d = *(SPageInfo**)pn->data;
×
321

UNCOV
322
  d->pn = NULL;
×
UNCOV
323
  taosMemoryFreeClear(pn);
×
324

UNCOV
325
  return flushBufPage(pBuf, d);
×
326
}
327

328
static int32_t lruListPushFront(SList* pList, SPageInfo* pi) {
270,939✔
329
  int32_t code = tdListPrepend(pList, &pi);
270,939✔
330
  if (TSDB_CODE_SUCCESS != code) {
270,956✔
331
    return code;
15✔
332
  }
333
  SListNode* front = tdListGetHead(pList);
270,941✔
334
  pi->pn = front;
270,928✔
335
  return TSDB_CODE_SUCCESS;
270,928✔
336
}
337

338
static void lruListMoveToFront(SList* pList, SPageInfo* pi) {
10,575,327✔
339
  pi->pn = tdListPopNode(pList, pi->pn);
10,575,327✔
340
  tdListPrependNode(pList, pi->pn);
10,573,709✔
341
}
10,573,094✔
342

343
static SPageInfo* getPageInfoFromPayload(void* page) {
14,509,862✔
344
  char* p = (char*)page - POINTER_BYTES;
14,509,862✔
345

346
  SPageInfo* ppi = ((SPageInfo**)p)[0];
14,509,862✔
347
  return ppi;
14,509,862✔
348
}
349

350
int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, const char* id,
209,555✔
351
                           const char* dir) {
352
  *pBuf = NULL;
209,555✔
353
  SDiskbasedBuf* pPBuf = taosMemoryCalloc(1, sizeof(SDiskbasedBuf));
209,555✔
354
  if (pPBuf == NULL) {
209,577✔
355
    goto _error;
8✔
356
  }
357

358
  pPBuf->pageSize = pagesize;
209,569✔
359
  pPBuf->numOfPages = 0;  // all pages are in buffer in the first place
209,569✔
360
  pPBuf->totalBufSize = 0;
209,569✔
361
  pPBuf->allocateId = -1;
209,569✔
362
  pPBuf->pFile = NULL;
209,569✔
363
  pPBuf->id = taosStrdup(id);
209,569✔
364
  if (id != NULL && pPBuf->id == NULL) {
209,596!
365
    goto _error;
11✔
366
  }
367
  pPBuf->fileSize = 0;
209,585✔
368
  pPBuf->pFree = taosArrayInit(4, sizeof(SFreeListItem));
209,585✔
369
  pPBuf->freePgList = tdListNew(POINTER_BYTES);
209,590✔
370
  if (pPBuf->pFree == NULL || pPBuf->freePgList == NULL) {
209,577✔
371
    goto _error;
23✔
372
  }
373

374
  // at least more than 2 pages must be in memory
375
  if (inMemBufSize < pagesize * 2) {
209,554!
376
    inMemBufSize = pagesize * 2;
×
377
  }
378

379
  pPBuf->inMemPages = inMemBufSize / pagesize;  // maximum allowed pages, it is a soft limit.
209,554✔
380
  pPBuf->lruList = tdListNew(POINTER_BYTES);
209,554✔
381
  if (pPBuf->lruList == NULL) {
209,571✔
382
    goto _error;
4✔
383
  }
384

385
  // init id hash table
386
  _hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT);
209,567✔
387
  pPBuf->pIdList = taosArrayInit(4, POINTER_BYTES);
209,565✔
388
  if (pPBuf->pIdList == NULL) {
209,571✔
389
    goto _error;
14✔
390
  }
391

392
  pPBuf->all = tSimpleHashInit(64, fn);
209,557✔
393
  if (pPBuf->all == NULL) {
209,512✔
394
    goto _error;
8✔
395
  }
396

397
  pPBuf->prefix = (char*)dir;
209,504✔
398
  pPBuf->emptyDummyIdList = taosArrayInit(1, sizeof(int32_t));
209,504✔
399
  if (pPBuf->emptyDummyIdList == NULL) {
209,548✔
400
    goto _error;
15✔
401
  }
402

403
  //  qDebug("QInfo:0x%"PRIx64" create resBuf for output, page size:%d, inmem buf pages:%d, file:%s", qId,
404
  //  pPBuf->pageSize, pPBuf->inMemPages, pPBuf->path);
405

406
  *pBuf = pPBuf;
209,533✔
407
  return TSDB_CODE_SUCCESS;
209,533✔
408

409
_error:
83✔
410
  destroyDiskbasedBuf(pPBuf);
83✔
411
  *pBuf = NULL;
83✔
412
  return TSDB_CODE_OUT_OF_MEMORY;
83✔
413
}
414

415
static char* doExtractPage(SDiskbasedBuf* pBuf) {
270,905✔
416
  char* availablePage = NULL;
270,905✔
417
  if (NO_IN_MEM_AVAILABLE_PAGES(pBuf)) {
270,905!
UNCOV
418
    availablePage = evictBufPage(pBuf);
×
UNCOV
419
    if (availablePage == NULL) {
×
420
      uWarn("no available buf pages, current:%d, max:%d, reason: %s, %s", listNEles(pBuf->lruList), pBuf->inMemPages,
×
421
            terrstr(), pBuf->id)
422
    }
423
  } else {
424
    availablePage =
425
        taosMemoryCalloc(1, getAllocPageSize(pBuf->pageSize));  // add extract bytes in case of zipped buffer increased.
541,810✔
426
    if (availablePage == NULL) {
270,926✔
427
      terrno = TSDB_CODE_OUT_OF_MEMORY;
14✔
428
    }
429
  }
430

431
  return availablePage;
270,929✔
432
}
433

434
void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) {
270,907✔
435
  pBuf->statis.getPages += 1;
270,907✔
436

437
  char* availablePage = doExtractPage(pBuf);
270,907✔
438
  if (availablePage == NULL) {
270,927✔
439
    return NULL;
14✔
440
  }
441

442
  SPageInfo* pi = NULL;
270,913✔
443
  int32_t    code = 0;
270,913✔
444
  if (listNEles(pBuf->freePgList) != 0) {
270,913!
445
    SListNode* pItem = tdListPopHead(pBuf->freePgList);
×
446
    pi = *(SPageInfo**)pItem->data;
×
447
    pi->used = true;
×
448
    *pageId = pi->pageId;
×
449
    taosMemoryFreeClear(pItem);
×
450
    code = lruListPushFront(pBuf->lruList, pi);
×
451
    if (TSDB_CODE_SUCCESS != code) {
×
452
      taosMemoryFree(pi);
6✔
453
      taosMemoryFree(availablePage);
×
454
      terrno = code;
×
455
      return NULL;
×
456
    }
457
  } else {  // create a new pageinfo
458
    // register new id in this group
459
    *pageId = (++pBuf->allocateId);
270,913✔
460

461
    // register page id info
462
    pi = registerNewPageInfo(pBuf, *pageId);
270,913✔
463
    if (pi == NULL) {
270,968✔
464
      taosMemoryFree(availablePage);
8✔
465
      return NULL;
8✔
466
    }
467

468
    // add to hash map
469
    int32_t code = tSimpleHashPut(pBuf->all, pageId, sizeof(int32_t), &pi, POINTER_BYTES);
270,960✔
470

471
    if (TSDB_CODE_SUCCESS == code) {
270,968✔
472
      // add to LRU list
473
      code = lruListPushFront(pBuf->lruList, pi);
270,953✔
474
    }
475
    if (TSDB_CODE_SUCCESS == code) {
270,959✔
476
      pBuf->totalBufSize += pBuf->pageSize;
270,929✔
477
    } else {
478
      taosMemoryFree(availablePage);
30✔
479
      SPageInfo **pLast = taosArrayPop(pBuf->pIdList);
30✔
480
      int32_t ret = tSimpleHashRemove(pBuf->all, pageId, sizeof(int32_t));
30✔
481
      if (ret != TSDB_CODE_SUCCESS) {
30✔
482
        uError("%s failed to clear pageId %d from buf hash-set since %s", __func__, *pageId, tstrerror(ret));
15!
483
      }
484
      taosMemoryFree(pi);
30✔
485
      terrno = code;
30✔
486
      return NULL;
31✔
487
    }
488
  }
489

490
  pi->pData = availablePage;
270,923✔
491

492
  ((void**)pi->pData)[0] = pi;
270,923✔
493
#ifdef BUF_PAGE_DEBUG
494
  uDebug("page_getNewBufPage , pi->pData:%p, pageId:%d, offset:%" PRId64, pi->pData, pi->pageId, pi->offset);
495
#endif
496

497
  return (void*)(GET_PAYLOAD_DATA(pi));
270,923✔
498
}
499

500
void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) {
12,604,663✔
501
  if (id < 0) {
12,604,663!
502
    terrno = TSDB_CODE_INVALID_PARA;
×
503
    uError("invalid page id:%d, %s", id, pBuf->id);
×
504
    return NULL;
×
505
  }
506

507
  pBuf->statis.getPages += 1;
12,604,663✔
508

509
  SPageInfo** pi = tSimpleHashGet(pBuf->all, &id, sizeof(int32_t));
12,604,663✔
510
  if (pi == NULL || *pi == NULL) {
12,601,557!
UNCOV
511
    uError("failed to locate the buffer page:%d, %s", id, pBuf->id);
×
UNCOV
512
    terrno = TSDB_CODE_INVALID_PARA;
×
513
    return NULL;
×
514
  }
515

516
  if (BUF_PAGE_IN_MEM(*pi)) {  // it is in memory
12,601,804✔
517
    // no need to update the LRU list if only one page exists
518
    if (pBuf->numOfPages == 1) {
12,601,445✔
519
      (*pi)->used = true;
2,031,709✔
520
      return (void*)(GET_PAYLOAD_DATA(*pi));
2,031,709✔
521
    }
522

523
    SPageInfo** pInfo = (SPageInfo**)((*pi)->pn->data);
10,569,736✔
524
    if (*pInfo != *pi) {
10,569,736!
525
      terrno = TSDB_CODE_APP_ERROR;
×
526
      uError("inconsistently data in paged buffer, pInfo:%p, pi:%p, %s", *pInfo, *pi, pBuf->id);
×
527
      return NULL;
×
528
    }
529

530
    lruListMoveToFront(pBuf->lruList, (*pi));
10,569,736✔
531
    (*pi)->used = true;
10,573,024✔
532

533
#ifdef BUF_PAGE_DEBUG
534
    uDebug("page_getBufPage1 pageId:%d, offset:%" PRId64, (*pi)->pageId, (*pi)->offset);
535
#endif
536
    return (void*)(GET_PAYLOAD_DATA(*pi));
10,573,024✔
537
  } else {  // not in memory
538

539
    (*pi)->pData = doExtractPage(pBuf);
359✔
540

541
    // failed to evict buffer page, return with error code.
UNCOV
542
    if ((*pi)->pData == NULL) {
×
543
      return NULL;
×
544
    }
545

546
    // set the ptr to the new SPageInfo
UNCOV
547
    ((void**)((*pi)->pData))[0] = (*pi);
×
548

UNCOV
549
    int32_t code = lruListPushFront(pBuf->lruList, *pi);
×
UNCOV
550
    if (TSDB_CODE_SUCCESS != code) {
×
551
      taosMemoryFree((*pi)->pData);
×
552
      (*pi)->pData = NULL;
×
553
      terrno = code;
×
554
      return NULL;
×
555
    }
UNCOV
556
    (*pi)->used = true;
×
557

558
    // some data has been flushed to disk, and needs to be loaded into buffer again.
UNCOV
559
    if (HAS_DATA_IN_DISK(*pi)) {
×
UNCOV
560
      int32_t code = loadPageFromDisk(pBuf, *pi);
×
UNCOV
561
      if (code != 0) {
×
UNCOV
562
        taosMemoryFree((*pi)->pData);
×
563
        (*pi)->pData = NULL;
×
564
        terrno = code;
×
565
        return NULL;
×
566
      }
567
    }
568
#ifdef BUF_PAGE_DEBUG
569
    uDebug("page_getBufPage2 pageId:%d, offset:%" PRId64, (*pi)->pageId, (*pi)->offset);
570
#endif
UNCOV
571
    return (void*)(GET_PAYLOAD_DATA(*pi));
×
572
  }
573
}
574

575
void releaseBufPage(SDiskbasedBuf* pBuf, void* page) {
9,761,246✔
576
  if (page == NULL) {
9,761,246!
577
    return;
×
578
  }
579

580
  SPageInfo* ppi = getPageInfoFromPayload(page);
9,761,246✔
581
  releaseBufPageInfo(pBuf, ppi);
9,758,973✔
582
}
583

584
void releaseBufPageInfo(SDiskbasedBuf* pBuf, SPageInfo* pi) {
9,758,666✔
585
#ifdef BUF_PAGE_DEBUG
586
  uDebug("page_releaseBufPageInfo pageId:%d, used:%d, offset:%" PRId64, pi->pageId, pi->used, pi->offset);
587
#endif
588

589
  if (pi == NULL) {
9,758,666!
590
    return;
×
591
  }
592

593
  if (pi->pData == NULL) {
9,758,666!
594
    uError("pi->pData (page data) is null");
×
595
    return;
×
596
  }
597

598
  pi->used = false;
9,758,666✔
599
  pBuf->statis.releasePages += 1;
9,758,666✔
600
}
601

602
size_t getTotalBufSize(const SDiskbasedBuf* pBuf) { return (size_t)pBuf->totalBufSize; }
×
603

604
SArray* getDataBufPagesIdList(SDiskbasedBuf* pBuf) { return pBuf->pIdList; }
×
605

606
void destroyDiskbasedBuf(SDiskbasedBuf* pBuf) {
254,465✔
607
  if (pBuf == NULL) {
254,465✔
608
    return;
44,921✔
609
  }
610

611
  dBufPrintStatis(pBuf);
209,544✔
612

613
  bool needRemoveFile = false;
209,553✔
614
  if (pBuf->pFile != NULL) {
209,553!
UNCOV
615
    needRemoveFile = true;
×
UNCOV
616
    uDebug(
×
617
        "Paged buffer closed, total:%.2f Kb (%d Pages), inmem size:%.2f Kb (%d Pages), file size:%.2f Kb, page "
618
        "size:%.2f Kb, %s",
619
        pBuf->totalBufSize / 1024.0, pBuf->numOfPages, listNEles(pBuf->lruList) * pBuf->pageSize / 1024.0,
620
        listNEles(pBuf->lruList), pBuf->fileSize / 1024.0, pBuf->pageSize / 1024.0f, pBuf->id);
621

UNCOV
622
    int32_t code = taosCloseFile(&pBuf->pFile);
×
UNCOV
623
    if (TSDB_CODE_SUCCESS != code) {
×
624
      uDebug("WARNING tPage failed to close file when destroy disk basebuf: %s", pBuf->path);
×
625
    }
626
  } else {
627
    uDebug("Paged buffer closed, total:%.2f Kb, no file created, %s", pBuf->totalBufSize / 1024.0, pBuf->id);
209,553✔
628
  }
629

630
  // print the statistics information
631
  {
632
    SDiskbasedBufStatis* ps = &pBuf->statis;
209,549✔
633
    if (ps->loadPages == 0) {
209,549!
634
      uDebug("Get/Release pages:%d/%d, flushToDisk:%.2f Kb (%d Pages), loadFromDisk:%.2f Kb (%d Pages)", ps->getPages,
209,549✔
635
             ps->releasePages, ps->flushBytes / 1024.0f, ps->flushPages, ps->loadBytes / 1024.0f, ps->loadPages);
636
    } else {
UNCOV
637
      uDebug(
×
638
          "Get/Release pages:%d/%d, flushToDisk:%.2f Kb (%d Pages), loadFromDisk:%.2f Kb (%d Pages), avgPgSize:%.2f Kb",
639
          ps->getPages, ps->releasePages, ps->flushBytes / 1024.0f, ps->flushPages, ps->loadBytes / 1024.0f,
640
          ps->loadPages, ps->loadBytes / (1024.0 * ps->loadPages));
641
    }
642
  }
643

644
  if (needRemoveFile) {
209,549!
UNCOV
645
    int32_t ret = taosRemoveFile(pBuf->path);
×
UNCOV
646
    if (ret != 0) {  // print the error and discard this error info
×
UNCOV
647
      uDebug("WARNING tPage remove file failed. path=%s, code:%s", pBuf->path, strerror(errno));
×
648
    }
649
  }
650

651
  taosMemoryFreeClear(pBuf->path);
209,536!
652

653
  size_t n = taosArrayGetSize(pBuf->pIdList);
209,536✔
654
  for (int32_t i = 0; i < n; ++i) {
461,571✔
655
    SPageInfo* pi = taosArrayGetP(pBuf->pIdList, i);
252,017✔
656
    taosMemoryFreeClear(pi->pData);
251,999✔
657
    taosMemoryFreeClear(pi);
252,040✔
658
  }
659

660
  taosArrayDestroy(pBuf->pIdList);
209,554✔
661

662
  pBuf->lruList = tdListFree(pBuf->lruList);
209,557✔
663
  pBuf->freePgList = tdListFree(pBuf->freePgList);
209,564✔
664

665
  taosArrayDestroy(pBuf->emptyDummyIdList);
209,568✔
666
  taosArrayDestroy(pBuf->pFree);
209,570✔
667

668
  tSimpleHashCleanup(pBuf->all);
209,560✔
669

670
  taosMemoryFreeClear(pBuf->id);
209,569✔
671
  taosMemoryFreeClear(pBuf->assistBuf);
209,574!
672
  taosMemoryFreeClear(pBuf);
209,574✔
673
}
674

675
SPageInfo* getLastPageInfo(SArray* pList) {
×
676
  size_t     size = taosArrayGetSize(pList);
×
677
  SPageInfo* pPgInfo = taosArrayGetP(pList, size - 1);
×
678
  return pPgInfo;
×
679
}
680

681
int32_t getPageId(const SPageInfo* pPgInfo) { return pPgInfo->pageId; }
×
682

683
int32_t getBufPageSize(const SDiskbasedBuf* pBuf) { return pBuf->pageSize; }
4,261,192✔
684

685
int32_t getNumOfInMemBufPages(const SDiskbasedBuf* pBuf) { return pBuf->inMemPages; }
23,639✔
686

687
bool isAllDataInMemBuf(const SDiskbasedBuf* pBuf) { return pBuf->fileSize == 0; }
×
688

689
void setBufPageDirty(void* pPage, bool dirty) {
4,763,718✔
690
  SPageInfo* ppi = getPageInfoFromPayload(pPage);
4,763,718✔
691
  ppi->dirty = dirty;
4,763,120✔
692
}
4,763,120✔
693

694
int32_t setBufPageCompressOnDisk(SDiskbasedBuf* pBuf, bool comp) {
×
695
  pBuf->comp = comp;
×
696
  if (comp && (pBuf->assistBuf == NULL)) {
×
697
    pBuf->assistBuf = taosMemoryMalloc(pBuf->pageSize + 2);  // EXTRA BYTES
×
698
    if (pBuf->assistBuf) {
×
699
      return terrno;
×
700
    }
701
  }
702
  return TSDB_CODE_SUCCESS;
×
703
}
704

705
int32_t dBufSetBufPageRecycled(SDiskbasedBuf* pBuf, void* pPage) {
×
706
  SPageInfo* ppi = getPageInfoFromPayload(pPage);
×
707

708
  int32_t code = tdListAppend(pBuf->freePgList, &ppi);
×
709
  if (TSDB_CODE_SUCCESS != code) {
×
710
    return code;
×
711
  }
712

713
  ppi->used = false;
×
714
  ppi->dirty = false;
×
715

716
  // add this pageinfo into the free page info list
717
  SListNode* pNode = tdListPopNode(pBuf->lruList, ppi->pn);
×
718
  taosMemoryFreeClear(ppi->pData);
×
719
  taosMemoryFreeClear(pNode);
×
720
  ppi->pn = NULL;
×
721
  return TSDB_CODE_SUCCESS;
×
722
}
723

724
void dBufSetPrintInfo(SDiskbasedBuf* pBuf) { pBuf->printStatis = true; }
48,332✔
725

726
SDiskbasedBufStatis getDBufStatis(const SDiskbasedBuf* pBuf) { return pBuf->statis; }
23,686✔
727

728
void dBufPrintStatis(const SDiskbasedBuf* pBuf) {
209,541✔
729
  if (!pBuf->printStatis) {
209,541✔
730
    return;
161,219✔
731
  }
732

733
  const SDiskbasedBufStatis* ps = &pBuf->statis;
48,322✔
734

735
#if 0
736
  printf(
737
      "Paged buffer closed, total:%.2f Kb (%d Pages), inmem size:%.2f Kb (%d Pages), file size:%.2f Kb, page size:%.2f "
738
      "Kb, %s\n",
739
      pBuf->totalBufSize / 1024.0, pBuf->numOfPages, listNEles(pBuf->lruList) * pBuf->pageSize / 1024.0,
740
      listNEles(pBuf->lruList), pBuf->fileSize / 1024.0, pBuf->pageSize / 1024.0f, pBuf->id);
741
#endif
742

743
  if (ps->loadPages > 0) {
48,322!
UNCOV
744
    (void)printf(
×
745
        "Get/Release pages:%d/%d, flushToDisk:%.2f Kb (%d Pages), loadFromDisk:%.2f Kb (%d Pages), avgPageSize:%.2f "
746
        "Kb\n",
UNCOV
747
        ps->getPages, ps->releasePages, ps->flushBytes / 1024.0f, ps->flushPages, ps->loadBytes / 1024.0f,
×
UNCOV
748
        ps->loadPages, ps->loadBytes / (1024.0 * ps->loadPages));
×
749
  } else {
750
    // printf("no page loaded\n");
751
  }
752
}
753

754
void clearDiskbasedBuf(SDiskbasedBuf* pBuf) {
5,681✔
755
  size_t n = taosArrayGetSize(pBuf->pIdList);
5,681✔
756
  for (int32_t i = 0; i < n; ++i) {
24,547✔
757
    SPageInfo* pi = taosArrayGetP(pBuf->pIdList, i);
18,867✔
758
    taosMemoryFreeClear(pi->pData);
18,867!
759
    taosMemoryFreeClear(pi);
18,867!
760
  }
761

762
  taosArrayClear(pBuf->pIdList);
5,680✔
763

764
  tdListEmpty(pBuf->lruList);
5,681✔
765
  tdListEmpty(pBuf->freePgList);
5,681✔
766

767
  taosArrayClear(pBuf->emptyDummyIdList);
5,681✔
768
  taosArrayClear(pBuf->pFree);
5,681✔
769

770
  tSimpleHashClear(pBuf->all);
5,681✔
771

772
  pBuf->numOfPages = 0;  // all pages are in buffer in the first place
5,681✔
773
  pBuf->totalBufSize = 0;
5,681✔
774
  pBuf->allocateId = -1;
5,681✔
775
  pBuf->fileSize = 0;
5,681✔
776
}
5,681✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc