• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5058

17 May 2026 01:15AM UTC coverage: 73.387% (-0.02%) from 73.406%
#5058

push

travis-ci

web-flow
feat (TDgpt): Dynamic Model Synchronization Enhancements (#35344)

* refactor: do some internal refactor.

* fix: fix multiprocess sync issue.

* feat: add dynamic anomaly detection and forecasting services

* fix: log error message for undeploying model in exception handling

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* fix: handle undeploy when model exists only on disk

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/286aafa0-c3ce-4c27-b803-2707571e9dc1

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: guard dynamic registry concurrent access

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: tighten service list locking scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: restore prophet support and update tests per review feedback

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: improve test name and move copy inside lock scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* Potential fix for pull request finding

Co-au... (continued)

281656 of 383795 relevant lines covered (73.39%)

135114337.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.02
/source/util/src/tpagedbuf.c
1
#define _DEFAULT_SOURCE
2
#include "tpagedbuf.h"
3
#include "taoserror.h"
4
#include "tcompression.h"
5
#include "tlog.h"
6
#include "tsimplehash.h"
7

8
#define GET_PAYLOAD_DATA(_p)           ((char*)(_p)->pData + POINTER_BYTES)
9
#define BUF_PAGE_IN_MEM(_p)            ((_p)->pData != NULL)
10
#define CLEAR_BUF_PAGE_IN_MEM_FLAG(_p) ((_p)->pData = NULL)
11
#define HAS_DATA_IN_DISK(_p)           ((_p)->offset >= 0)
12
#define NO_IN_MEM_AVAILABLE_PAGES(_b)  (listNEles((_b)->lruList) >= (_b)->inMemPages)
13

14
typedef struct SPageDiskInfo {
15
  int64_t offset;
16
  int32_t length;
17
} SPageDiskInfo, SFreeListItem;
18

19
struct SPageInfo {
20
  SListNode* pn;  // point to list node struct. it is NULL when the page is evicted from the in-memory buffer
21
  void*      pData;
22
  int64_t    offset;
23
  int32_t    pageId;
24
  int32_t    length : 29;
25
  bool       used : 1;   // set current page is in used
26
  bool       dirty : 1;  // set current buffer page is dirty or not
27
};
28

29
struct SDiskbasedBuf {
30
  int32_t    numOfPages;
31
  int64_t    totalBufSize;
32
  uint64_t   fileSize;  // disk file size
33
  TdFilePtr  pFile;
34
  int32_t    allocateId;  // allocated page id
35
  char*      path;        // file path
36
  char*      prefix;      // file name prefix
37
  int32_t    pageSize;    // current used page size
38
  int32_t    inMemPages;  // numOfPages that are allocated in memory
39
  SList*     freePgList;  // free page list
40
  SArray*    pIdList;     // page id list
41
  SSHashObj* all;
42
  SList*     lruList;
43
  void*      emptyDummyIdList;  // dummy id list
44
  void*      assistBuf;         // assistant buffer for compress/decompress data
45
  SArray*    pFree;             // free area in file
46
  bool       comp;              // compressed before flushed to disk
47
  uint64_t   nextPos;           // next page flush position
48

49
  char*               id;           // for debug purpose
50
  bool                printStatis;  // Print statistics info when closing this buffer.
51
  SDiskbasedBufStatis statis;
52
};
53

54
static int32_t createDiskFile(SDiskbasedBuf* pBuf) {
107,774✔
55
  if (pBuf->path == NULL) {  // prepare the file name when needed it
107,774✔
56
    char path[PATH_MAX] = {0};
107,774✔
57
    taosGetTmpfilePath(pBuf->prefix, "paged-buf", path);
107,774✔
58
    pBuf->path = taosStrdup(path);
107,774✔
59
    if (pBuf->path == NULL) {
107,774✔
60
      return terrno;
×
61
    }
62
  }
63

64
  pBuf->pFile =
107,774✔
65
      taosOpenFile(pBuf->path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL);
107,774✔
66
  if (pBuf->pFile == NULL) {
107,774✔
67
    return terrno;
×
68
  }
69

70
  int64_t realSize = -1;
107,774✔
71
  if (taosFStatFile(pBuf->pFile, &realSize, NULL) != TSDB_CODE_SUCCESS) {
107,774✔
72
    realSize = -1;
×
73
  }
74
  uDebug("paged buffer file opened, path:%s, realSize:%" PRId64 ", nextPos:%" PRIu64 ", fileSize:%" PRIu64 ", %s",
107,774✔
75
         pBuf->path, realSize, pBuf->nextPos, pBuf->fileSize, pBuf->id);
76

77
  return TSDB_CODE_SUCCESS;
107,774✔
78
}
79

80
static char* doCompressData(void* data, int32_t srcSize, int32_t* dst, SDiskbasedBuf* pBuf) {  // do nothing
424,226,722✔
81
  if (!pBuf->comp) {
424,226,722✔
82
    *dst = srcSize;
424,226,726✔
83
    return data;
424,226,182✔
84
  }
85

86
  *dst = tsCompressString(data, srcSize, 1, pBuf->assistBuf, srcSize, ONE_STAGE_COMP, NULL, 0);
×
87

88
  memcpy(data, pBuf->assistBuf, *dst);
×
89
  return data;
×
90
}
91

92
static int32_t doDecompressData(void* data, int32_t srcSize, int32_t* dst, SDiskbasedBuf* pBuf) {  // do nothing
351,653,285✔
93
  int32_t code = 0;
351,653,285✔
94
  if (!pBuf->comp) {
351,653,285✔
95
    *dst = srcSize;
351,653,292✔
96
    return code;
351,653,298✔
97
  }
98

99
  *dst = tsDecompressString(data, srcSize, 1, pBuf->assistBuf, pBuf->pageSize, ONE_STAGE_COMP, NULL, 0);
×
100
  if (*dst > 0) {
×
101
    memcpy(data, pBuf->assistBuf, *dst);
×
102
  } else if (*dst < 0) {
×
103
    return terrno;
×
104
  }
105
  return code;
×
106
  ;
107
}
108

109
static uint64_t allocateNewPositionInFile(SDiskbasedBuf* pBuf, size_t size) {
423,625,589✔
110
  if (pBuf->pFree == NULL) {
423,625,589✔
111
    return pBuf->nextPos;
×
112
  } else {
113
    int32_t offset = -1;
423,625,593✔
114

115
    size_t num = taosArrayGetSize(pBuf->pFree);
423,625,593✔
116
    for (int32_t i = 0; i < num; ++i) {
423,625,590✔
117
      SFreeListItem* pi = taosArrayGet(pBuf->pFree, i);
×
118
      if (pi->length >= size) {
×
119
        offset = pi->offset;
×
120
        pi->offset += (int32_t)size;
×
121
        pi->length -= (int32_t)size;
×
122

123
        return offset;
×
124
      }
125
    }
126

127
    // no available recycle space, allocate new area in file
128
    return pBuf->nextPos;
423,625,590✔
129
  }
130
}
131

132
/**
133
 *   +--------------------------+-------------------+--------------+
134
 *   | PTR to SPageInfo (8bytes)| Payload (PageSize)| 2 Extra Bytes|
135
 *   +--------------------------+-------------------+--------------+
136
 * @param pBuf
137
 * @param pg
138
 * @return
139
 */
140

141
static FORCE_INLINE size_t getAllocPageSize(int32_t pageSize) { return pageSize + POINTER_BYTES + sizeof(SFilePage); }
2,136,961,761✔
142

143
static int32_t doFlushBufPageImpl(SDiskbasedBuf* pBuf, int64_t offset, const char* pData, int32_t size) {
424,225,295✔
144
  int64_t ret = taosLSeekFile(pBuf->pFile, offset, SEEK_SET);
424,225,295✔
145
  if (ret < 0) {
424,225,275✔
146
    return terrno;
×
147
  }
148

149
  ret = (int32_t)taosWriteFile(pBuf->pFile, pData, size);
424,225,275✔
150
  if (ret != size) {
424,226,389✔
151
    return terrno;
×
152
  }
153

154
  // extend the file
155
  if (pBuf->fileSize < offset + size) {
424,226,389✔
156
    pBuf->fileSize = offset + size;
423,626,686✔
157
  }
158

159
  pBuf->statis.flushBytes += size;
424,226,391✔
160
  pBuf->statis.flushPages += 1;
424,226,399✔
161

162
  return TSDB_CODE_SUCCESS;
424,226,394✔
163
}
164

165
static char* doFlushBufPage(SDiskbasedBuf* pBuf, SPageInfo* pg) {
624,102,479✔
166
  if (pg->pData == NULL || pg->used) {
624,102,479✔
167
    uError("invalid params in paged buffer process when flushing buf to disk, %s", pBuf->id);
4✔
168
    terrno = TSDB_CODE_INVALID_PARA;
4✔
169
    return NULL;
×
170
  }
171

172
  int32_t size = pBuf->pageSize;
624,102,480✔
173
  int64_t offset = pg->offset;
624,102,483✔
174

175
  char* t = NULL;
624,101,938✔
176
  if ((!HAS_DATA_IN_DISK(pg)) || pg->dirty) {
624,101,938✔
177
    void* payload = GET_PAYLOAD_DATA(pg);
424,226,728✔
178
    t = doCompressData(payload, pBuf->pageSize + sizeof(SFilePage), &size, pBuf);
424,226,722✔
179
    if (size < 0) {
424,226,178✔
180
      uError("failed to compress data when flushing data to disk, %s", pBuf->id);
×
181
      terrno = TSDB_CODE_INVALID_PARA;
×
182
      return NULL;
×
183
    }
184
  }
185

186
  // this page is flushed to disk for the first time
187
  if (pg->dirty) {
624,101,386✔
188
    if (!HAS_DATA_IN_DISK(pg)) {
424,224,754✔
189
      offset = allocateNewPositionInFile(pBuf, size);
423,625,052✔
190
      pBuf->nextPos += size;
423,625,017✔
191

192
      int32_t code = doFlushBufPageImpl(pBuf, offset, t, size);
423,625,593✔
193
      if (code != TSDB_CODE_SUCCESS) {
423,626,676✔
194
        return NULL;
×
195
      }
196
    } else {
197
      // length becomes greater, current space is not enough, allocate new place, otherwise, do nothing
198
      if (pg->length < size) {
599,706✔
199
        // 1. add current space to free list
200
        SPageDiskInfo dinfo = {.length = pg->length, .offset = offset};
×
201
        if (NULL == taosArrayPush(pBuf->pFree, &dinfo)) {
×
202
          return NULL;
×
203
        }
204

205
        // 2. allocate new position, and update the info
206
        offset = allocateNewPositionInFile(pBuf, size);
×
207
        pBuf->nextPos += size;
×
208
      }
209

210
      int32_t code = doFlushBufPageImpl(pBuf, offset, t, size);
599,706✔
211
      if (code != TSDB_CODE_SUCCESS) {
599,706✔
212
        return NULL;
×
213
      }
214
    }
215
  } else {  // NOTE: the size may be -1, the this recycle page has not been flushed to disk yet.
216
    size = pg->length;
199,876,636✔
217
  }
218

219
  char* pDataBuf = pg->pData;
624,103,018✔
220
  memset(pDataBuf, 0, getAllocPageSize(pBuf->pageSize));
1,248,204,210✔
221

222
#ifdef BUF_PAGE_DEBUG
223
  uDebug("page_flush %p, pageId:%d, offset:%d", pDataBuf, pg->pageId, offset);
224
#endif
225

226
  pg->offset = offset;
624,103,033✔
227
  pg->length = size;  // on disk size
624,103,022✔
228
  return pDataBuf;
624,103,030✔
229
}
230

231
static char* flushBufPage(SDiskbasedBuf* pBuf, SPageInfo* pg) {
624,101,362✔
232
  int32_t ret = TSDB_CODE_SUCCESS;
624,101,362✔
233

234
  if (pBuf->pFile == NULL) {
624,101,362✔
235
    if ((ret = createDiskFile(pBuf)) != TSDB_CODE_SUCCESS) {
107,774✔
236
      terrno = ret;
×
237
      return NULL;
×
238
    }
239
  }
240

241
  char* p = doFlushBufPage(pBuf, pg);
624,101,365✔
242
  CLEAR_BUF_PAGE_IN_MEM_FLAG(pg);
624,102,478✔
243

244
  pg->dirty = false;
624,103,027✔
245
  return p;
624,102,483✔
246
}
247

248
// load file block data in disk
249
static int32_t loadPageFromDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
351,653,286✔
250
  if (pg->offset < 0 || pg->length <= 0) {
351,653,286✔
251
    uError("failed to load buf page from disk, offset:%" PRId64 ", length:%d, %s", pg->offset, pg->length, pBuf->id);
×
252
    return TSDB_CODE_INVALID_PARA;
×
253
  }
254

255
  int64_t ret = taosLSeekFile(pBuf->pFile, pg->offset, SEEK_SET);
351,653,295✔
256
  if (ret < 0) {
351,653,298✔
257
    ret = terrno;
×
258
    return ret;
×
259
  }
260

261
  void* pPage = (void*)GET_PAYLOAD_DATA(pg);
351,653,298✔
262
  ret = taosReadFile(pBuf->pFile, pPage, pg->length);
351,653,300✔
263
  if (ret != pg->length) {
351,653,305✔
264
    ret = terrno;
×
265
    return ret;
×
266
  }
267

268
  pBuf->statis.loadBytes += pg->length;
351,653,304✔
269
  pBuf->statis.loadPages += 1;
351,653,301✔
270

271
  int32_t fullSize = 0;
351,653,294✔
272
  return doDecompressData(pPage, pg->length, &fullSize, pBuf);
351,653,300✔
273
}
274

275
static SPageInfo* registerNewPageInfo(SDiskbasedBuf* pBuf, int32_t pageId) {
1,785,268,808✔
276
  pBuf->numOfPages += 1;
1,785,268,808✔
277

278
  SPageInfo* ppi = taosMemoryMalloc(sizeof(SPageInfo));
1,785,327,591✔
279
  if (ppi == NULL) {
1,785,248,041✔
280
    return NULL;
×
281
  }
282

283
  ppi->pageId = pageId;
1,785,248,041✔
284
  ppi->pData = NULL;
1,785,252,332✔
285
  ppi->offset = -1;
1,785,286,185✔
286
  ppi->length = -1;
1,785,295,041✔
287
  ppi->used = true;
1,785,327,545✔
288
  ppi->pn = NULL;
1,785,301,387✔
289
  ppi->dirty = false;
1,785,176,840✔
290

291
  SPageInfo** pRet = taosArrayPush(pBuf->pIdList, &ppi);
1,785,309,495✔
292
  if (NULL == pRet) {
1,785,343,422✔
293
    taosMemoryFree(ppi);
×
294
    return NULL;
×
295
  }
296
  return *pRet;
1,785,343,422✔
297
}
298

299
static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) {
624,102,479✔
300
  SListIter iter = {0};
624,102,479✔
301
  tdListInitIter(pBuf->lruList, &iter, TD_LIST_BACKWARD);
624,102,481✔
302

303
  SListNode* pn = NULL;
624,103,020✔
304
  while ((pn = tdListNext(&iter)) != NULL) {
683,465,408✔
305
    SPageInfo* pageInfo = *(SPageInfo**)pn->data;
683,465,408✔
306

307
    SPageInfo* p = *(SPageInfo**)(pageInfo->pData);
683,465,408✔
308

309
    if (!pageInfo->used) {
683,465,415✔
310
      break;
624,103,032✔
311
    }
312
  }
313

314
  return pn;
624,103,028✔
315
}
316

317
static char* evictBufPage(SDiskbasedBuf* pBuf) {
624,103,025✔
318
  SListNode* pn = getEldestUnrefedPage(pBuf);
624,103,025✔
319
  if (pn == NULL) {  // no available buffer pages now, return.
624,103,032✔
320
    return NULL;
×
321
  }
322

323
  terrno = 0;
624,103,032✔
324
  pn = tdListPopNode(pBuf->lruList, pn);
624,101,913✔
325

326
  SPageInfo* d = *(SPageInfo**)pn->data;
624,101,330✔
327

328
  d->pn = NULL;
624,101,330✔
329
  taosMemoryFreeClear(pn);
624,101,330✔
330

331
  return flushBufPage(pBuf, d);
624,100,796✔
332
}
333

334
static int32_t lruListPushFront(SList* pList, SPageInfo* pi) {
2,136,886,095✔
335
  int32_t code = tdListPrepend(pList, &pi);
2,136,886,095✔
336
  if (TSDB_CODE_SUCCESS != code) {
2,136,889,598✔
337
    return code;
×
338
  }
339
  SListNode* front = tdListGetHead(pList);
2,136,889,598✔
340
  pi->pn = front;
2,136,878,102✔
341
  return TSDB_CODE_SUCCESS;
2,136,905,686✔
342
}
343

344
static void lruListMoveToFront(SList* pList, SPageInfo* pi) {
2,147,483,647✔
345
  pi->pn = tdListPopNode(pList, pi->pn);
2,147,483,647✔
346
  tdListPrependNode(pList, pi->pn);
2,147,483,647✔
347
}
2,147,483,647✔
348

349
static SPageInfo* getPageInfoFromPayload(void* page) {
2,147,483,647✔
350
  char* p = (char*)page - POINTER_BYTES;
2,147,483,647✔
351

352
  SPageInfo* ppi = ((SPageInfo**)p)[0];
2,147,483,647✔
353
  return ppi;
2,147,483,647✔
354
}
355

356
int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int64_t inMemBufSize, const char* id,
376,107,486✔
357
                           const char* dir) {
358
  int32_t code = 0;
376,107,486✔
359
  *pBuf = NULL;
376,107,486✔
360
  SDiskbasedBuf* pPBuf = taosMemoryCalloc(1, sizeof(SDiskbasedBuf));
376,174,864✔
361
  if (pPBuf == NULL) {
375,741,430✔
362
    code = terrno;
×
363
    goto _error;
×
364
  }
365

366
  pPBuf->pageSize = pagesize;
375,741,430✔
367
  pPBuf->numOfPages = 0;  // all pages are in buffer in the first place
375,759,580✔
368
  pPBuf->totalBufSize = 0;
375,799,924✔
369
  pPBuf->allocateId = -1;
375,839,684✔
370
  pPBuf->pFile = NULL;
375,832,126✔
371
  pPBuf->id = taosStrdup(id);
376,015,483✔
372
  if (id != NULL && pPBuf->id == NULL) {
376,074,431✔
373
    code = terrno;
×
374
    goto _error;
×
375
  }
376
  pPBuf->fileSize = 0;
376,034,934✔
377
  pPBuf->pFree = taosArrayInit(4, sizeof(SFreeListItem));
376,063,758✔
378
  pPBuf->freePgList = tdListNew(POINTER_BYTES);
375,985,749✔
379
  if (pPBuf->pFree == NULL || pPBuf->freePgList == NULL) {
375,896,845✔
380
    code = terrno;
×
381
    goto _error;
×
382
  }
383

384
  // at least more than 2 pages must be in memory
385
  if (inMemBufSize < pagesize * 2) {
376,029,276✔
386
    inMemBufSize = pagesize * 2;
1,350,558✔
387
  }
388

389
  pPBuf->inMemPages = inMemBufSize / pagesize;  // maximum allowed pages, it is a soft limit.
376,029,276✔
390
  pPBuf->lruList = tdListNew(POINTER_BYTES);
375,997,878✔
391
  if (pPBuf->lruList == NULL) {
376,174,899✔
392
    code = terrno;
×
393
    goto _error;
×
394
  }
395

396
  // init id hash table
397
  _hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT);
376,106,233✔
398
  pPBuf->pIdList = taosArrayInit(4, POINTER_BYTES);
376,076,361✔
399
  if (pPBuf->pIdList == NULL) {
376,115,743✔
400
    code = terrno;
×
401
    goto _error;
×
402
  }
403

404
  pPBuf->all = tSimpleHashInit(64, fn);
376,035,079✔
405
  if (pPBuf->all == NULL) {
376,037,123✔
406
    code = terrno;
×
407
    goto _error;
×
408
  }
409

410
  pPBuf->prefix = (char*)dir;
376,019,823✔
411
  pPBuf->emptyDummyIdList = taosArrayInit(1, sizeof(int32_t));
376,054,452✔
412
  if (pPBuf->emptyDummyIdList == NULL) {
376,198,972✔
413
    code = terrno;
×
414
    goto _error;
×
415
  }
416

417
  //  qDebug("QInfo:0x%"PRIx64 ", create resBuf for output, page size:%d, inmem buf pages:%d, file:%s", qId,
418
  //  pPBuf->pageSize, pPBuf->inMemPages, pPBuf->path);
419

420
  *pBuf = pPBuf;
376,090,110✔
421
  return TSDB_CODE_SUCCESS;
376,097,624✔
422

423
_error:
×
424
  destroyDiskbasedBuf(pPBuf);
×
425
  *pBuf = NULL;
×
426
  return code;
×
427
}
428

429
static char* doExtractPage(SDiskbasedBuf* pBuf) {
2,136,835,621✔
430
  char* availablePage = NULL;
2,136,835,621✔
431
  if (NO_IN_MEM_AVAILABLE_PAGES(pBuf)) {
2,136,835,621✔
432
    availablePage = evictBufPage(pBuf);
624,103,027✔
433
    if (availablePage == NULL) {
624,103,014✔
434
      uWarn("no available buf pages, current:%d, max:%d, reason: %s, %s", listNEles(pBuf->lruList), pBuf->inMemPages,
×
435
            terrstr(), pBuf->id)
436
    }
437
  } else {
438
    availablePage =
1,512,534,295✔
439
        taosMemoryCalloc(1, getAllocPageSize(pBuf->pageSize));  // add extract bytes in case of zipped buffer increased.
2,147,483,647✔
440
  }
441

442
  return availablePage;
2,136,633,547✔
443
}
444

445
void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) {
1,785,236,448✔
446
  pBuf->statis.getPages += 1;
1,785,236,448✔
447

448
  char* availablePage = doExtractPage(pBuf);
1,785,245,804✔
449
  if (availablePage == NULL) {
1,784,976,016✔
450
    return NULL;
×
451
  }
452

453
  SPageInfo* pi = NULL;
1,784,976,016✔
454
  int32_t    code = 0;
1,784,978,966✔
455
  if (listNEles(pBuf->freePgList) != 0) {
1,784,978,966✔
456
    SListNode* pItem = tdListPopHead(pBuf->freePgList);
×
457
    pi = *(SPageInfo**)pItem->data;
×
458
    pi->used = true;
×
459
    *pageId = pi->pageId;
×
460
    taosMemoryFreeClear(pItem);
×
461
    code = lruListPushFront(pBuf->lruList, pi);
×
462
    if (TSDB_CODE_SUCCESS != code) {
×
463
      taosMemoryFree(pi);
×
464
      taosMemoryFree(availablePage);
×
465
      terrno = code;
×
466
      return NULL;
×
467
    }
468
  } else {  // create a new pageinfo
469
    // register new id in this group
470
    *pageId = (++pBuf->allocateId);
1,785,110,779✔
471

472
    // register page id info
473
    pi = registerNewPageInfo(pBuf, *pageId);
1,785,236,707✔
474
    if (pi == NULL) {
1,785,230,197✔
475
      taosMemoryFree(availablePage);
×
476
      return NULL;
×
477
    }
478

479
    // add to hash map
480
    int32_t code = tSimpleHashPut(pBuf->all, pageId, sizeof(int32_t), &pi, POINTER_BYTES);
1,785,230,197✔
481

482
    if (TSDB_CODE_SUCCESS == code) {
1,785,304,940✔
483
      // add to LRU list
484
      code = lruListPushFront(pBuf->lruList, pi);
1,785,314,676✔
485
    }
486
    if (TSDB_CODE_SUCCESS == code) {
1,785,256,879✔
487
      pBuf->totalBufSize += pBuf->pageSize;
1,785,256,879✔
488
    } else {
489
      taosMemoryFree(availablePage);
×
490
      SPageInfo **pLast = taosArrayPop(pBuf->pIdList);
×
491
      int32_t ret = tSimpleHashRemove(pBuf->all, pageId, sizeof(int32_t));
×
492
      if (ret != TSDB_CODE_SUCCESS) {
×
493
        uError("%s failed to clear pageId %d from buf hash-set since %s", __func__, *pageId, tstrerror(ret));
×
494
      }
495
      taosMemoryFree(pi);
×
496
      terrno = code;
×
497
      return NULL;
×
498
    }
499
  }
500

501
  pi->pData = availablePage;
1,785,257,767✔
502

503
  ((void**)pi->pData)[0] = pi;
1,785,235,853✔
504
#ifdef BUF_PAGE_DEBUG
505
  uDebug("page_getNewBufPage , pi->pData:%p, pageId:%d, offset:%" PRId64, pi->pData, pi->pageId, pi->offset);
506
#endif
507

508
  return (void*)(GET_PAYLOAD_DATA(pi));
1,785,285,435✔
509
}
510

511
void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) {
2,147,483,647✔
512
  if (id < 0) {
2,147,483,647✔
513
    terrno = TSDB_CODE_INVALID_PARA;
×
514
    uError("invalid page id:%d, %s", id, pBuf->id);
×
515
    return NULL;
×
516
  }
517

518
  pBuf->statis.getPages += 1;
2,147,483,647✔
519

520
  SPageInfo** pi = tSimpleHashGet(pBuf->all, &id, sizeof(int32_t));
2,147,483,647✔
521
  if (pi == NULL || *pi == NULL) {
2,147,483,647✔
522
    uError("failed to locate the buffer page:%d, %s", id, pBuf->id);
7,594✔
523
    terrno = TSDB_CODE_INVALID_PARA;
7,594✔
524
    return NULL;
×
525
  }
526

527
  if (BUF_PAGE_IN_MEM(*pi)) {  // it is in memory
2,147,483,647✔
528
    // no need to update the LRU list if only one page exists
529
    if (pBuf->numOfPages == 1) {
2,147,483,647✔
530
      (*pi)->used = true;
2,147,483,647✔
531
      return (void*)(GET_PAYLOAD_DATA(*pi));
2,147,483,647✔
532
    }
533

534
    SPageInfo** pInfo = (SPageInfo**)((*pi)->pn->data);
2,147,483,647✔
535
    if (*pInfo != *pi) {
2,147,483,647✔
536
      terrno = TSDB_CODE_APP_ERROR;
×
537
      uError("inconsistently data in paged buffer, pInfo:%p, pi:%p, %s", *pInfo, *pi, pBuf->id);
×
538
      return NULL;
×
539
    }
540

541
    lruListMoveToFront(pBuf->lruList, (*pi));
2,147,483,647✔
542
    (*pi)->used = true;
2,147,483,647✔
543

544
#ifdef BUF_PAGE_DEBUG
545
    uDebug("page_getBufPage1 pageId:%d, offset:%" PRId64, (*pi)->pageId, (*pi)->offset);
546
#endif
547
    return (void*)(GET_PAYLOAD_DATA(*pi));
2,147,483,647✔
548
  } else {  // not in memory
549

550
    (*pi)->pData = doExtractPage(pBuf);
351,653,502✔
551

552
    // failed to evict buffer page, return with error code.
553
    if ((*pi)->pData == NULL) {
351,653,481✔
554
      return NULL;
×
555
    }
556

557
    // set the ptr to the new SPageInfo
558
    ((void**)((*pi)->pData))[0] = (*pi);
351,653,497✔
559

560
    int32_t code = lruListPushFront(pBuf->lruList, *pi);
351,653,493✔
561
    if (TSDB_CODE_SUCCESS != code) {
351,653,493✔
562
      taosMemoryFree((*pi)->pData);
×
563
      (*pi)->pData = NULL;
×
564
      terrno = code;
×
565
      return NULL;
×
566
    }
567
    (*pi)->used = true;
351,653,493✔
568

569
    // some data has been flushed to disk, and needs to be loaded into buffer again.
570
    if (HAS_DATA_IN_DISK(*pi)) {
351,653,489✔
571
      int32_t code = loadPageFromDisk(pBuf, *pi);
351,653,292✔
572
      if (code != 0) {
351,653,289✔
573
        taosMemoryFree((*pi)->pData);
×
574
        (*pi)->pData = NULL;
×
575
        terrno = code;
×
576
        return NULL;
×
577
      }
578
    }
579
#ifdef BUF_PAGE_DEBUG
580
    uDebug("page_getBufPage2 pageId:%d, offset:%" PRId64, (*pi)->pageId, (*pi)->offset);
581
#endif
582
    return (void*)(GET_PAYLOAD_DATA(*pi));
351,653,493✔
583
  }
584
}
585

586
void releaseBufPage(SDiskbasedBuf* pBuf, void* page) {
2,147,483,647✔
587
  if (page == NULL) {
2,147,483,647✔
588
    return;
×
589
  }
590

591
  SPageInfo* ppi = getPageInfoFromPayload(page);
2,147,483,647✔
592
  releaseBufPageInfo(pBuf, ppi);
2,147,483,647✔
593
}
594

595
void releaseBufPageInfo(SDiskbasedBuf* pBuf, SPageInfo* pi) {
2,147,483,647✔
596
#ifdef BUF_PAGE_DEBUG
597
  uDebug("page_releaseBufPageInfo pageId:%d, used:%d, offset:%" PRId64, pi->pageId, pi->used, pi->offset);
598
#endif
599

600
  if (pi == NULL) {
2,147,483,647✔
601
    return;
×
602
  }
603

604
  if (pi->pData == NULL) {
2,147,483,647✔
605
    uError("pi->pData (page data) is null");
×
606
    return;
×
607
  }
608

609
  pi->used = false;
2,147,483,647✔
610
  pBuf->statis.releasePages += 1;
2,147,483,647✔
611
}
612

613
size_t getTotalBufSize(const SDiskbasedBuf* pBuf) { return (size_t)pBuf->totalBufSize; }
204✔
614

615
SArray* getDataBufPagesIdList(SDiskbasedBuf* pBuf) { return pBuf->pIdList; }
612✔
616

617
void destroyDiskbasedBuf(SDiskbasedBuf* pBuf) {
443,467,310✔
618
  if (pBuf == NULL) {
443,467,310✔
619
    return;
67,232,129✔
620
  }
621

622
  dBufPrintStatis(pBuf);
376,235,181✔
623

624
  bool needRemoveFile = false;
376,206,351✔
625
  if (pBuf->pFile != NULL) {
376,206,351✔
626
    needRemoveFile = true;
107,774✔
627
    uDebug(
107,774✔
628
        "Paged buffer closed, total:%.2f Kb (%d Pages), inmem size:%.2f Kb (%d Pages), file size:%.2f Kb, page "
629
        "size:%.2f Kb, %s",
630
        pBuf->totalBufSize / 1024.0, pBuf->numOfPages, listNEles(pBuf->lruList) * pBuf->pageSize / 1024.0,
631
        listNEles(pBuf->lruList), pBuf->fileSize / 1024.0, pBuf->pageSize / 1024.0f, pBuf->id);
632

633
    int32_t code = taosCloseFile(&pBuf->pFile);
107,774✔
634
    if (TSDB_CODE_SUCCESS != code) {
107,774✔
635
      uError("failed to close paged buffer file when destroying, path:%s, closeCode:%d, err:%s, %s", pBuf->path, code,
×
636
             tstrerror(code), pBuf->id);
637
    }
638
  } else {
639
    uDebug("Paged buffer closed, total:%.2f Kb, no file created, %s", pBuf->totalBufSize / 1024.0, pBuf->id);
376,078,489✔
640
  }
641

642
  // print the statistics information
643
  {
644
    SDiskbasedBufStatis* ps = &pBuf->statis;
376,188,573✔
645
    if (ps->loadPages == 0) {
376,219,256✔
646
      uDebug("Get/Release pages:%d/%d, flushToDisk:%.2f Kb (%d Pages), loadFromDisk:%.2f Kb (%d Pages)", ps->getPages,
376,120,602✔
647
             ps->releasePages, ps->flushBytes / 1024.0f, ps->flushPages, ps->loadBytes / 1024.0f, ps->loadPages);
648
    } else {
649
      uDebug(
107,366✔
650
          "Get/Release pages:%d/%d, flushToDisk:%.2f Kb (%d Pages), loadFromDisk:%.2f Kb (%d Pages), avgPgSize:%.2f Kb",
651
          ps->getPages, ps->releasePages, ps->flushBytes / 1024.0f, ps->flushPages, ps->loadBytes / 1024.0f,
652
          ps->loadPages, ps->loadBytes / (1024.0 * ps->loadPages));
653
    }
654
  }
655

656
  if (needRemoveFile) {
376,236,871✔
657
    int32_t ret = taosRemoveFile(pBuf->path);
107,774✔
658
    if (ret != 0) {  // print the error and discard this error info
107,774✔
659
      uDebug("WARNING tPage remove file failed. path=%s, code:%s", pBuf->path, strerror(ERRNO));
107,774✔
660
    }
661
  }
662

663
  taosMemoryFreeClear(pBuf->path);
376,236,871✔
664

665
  size_t n = taosArrayGetSize(pBuf->pIdList);
376,220,730✔
666
  for (int32_t i = 0; i < n; ++i) {
1,860,814,087✔
667
    SPageInfo* pi = taosArrayGetP(pBuf->pIdList, i);
1,484,582,297✔
668
    taosMemoryFreeClear(pi->pData);
1,484,551,069✔
669
    taosMemoryFreeClear(pi);
1,484,443,602✔
670
  }
671

672
  taosArrayDestroy(pBuf->pIdList);
376,231,790✔
673

674
  pBuf->lruList = tdListFree(pBuf->lruList);
376,199,552✔
675
  pBuf->freePgList = tdListFree(pBuf->freePgList);
376,218,609✔
676

677
  taosArrayDestroy(pBuf->emptyDummyIdList);
376,217,343✔
678
  taosArrayDestroy(pBuf->pFree);
376,198,633✔
679

680
  tSimpleHashCleanup(pBuf->all);
376,196,561✔
681

682
  taosMemoryFreeClear(pBuf->id);
376,226,791✔
683
  taosMemoryFreeClear(pBuf->assistBuf);
376,200,982✔
684
  taosMemoryFreeClear(pBuf);
376,201,552✔
685
}
686

687
SPageInfo* getLastPageInfo(SArray* pList) {
×
688
  size_t     size = taosArrayGetSize(pList);
×
689
  SPageInfo* pPgInfo = taosArrayGetP(pList, size - 1);
×
690
  return pPgInfo;
×
691
}
692

693
int32_t getPageId(const SPageInfo* pPgInfo) { return pPgInfo->pageId; }
×
694

695
int32_t getBufPageSize(const SDiskbasedBuf* pBuf) { return pBuf->pageSize; }
2,147,483,647✔
696

697
int32_t getNumOfInMemBufPages(const SDiskbasedBuf* pBuf) { return pBuf->inMemPages; }
15,185,374✔
698

699
bool isAllDataInMemBuf(const SDiskbasedBuf* pBuf) { return pBuf->fileSize == 0; }
×
700

701
void setBufPageDirty(void* pPage, bool dirty) {
2,147,483,647✔
702
  SPageInfo* ppi = getPageInfoFromPayload(pPage);
2,147,483,647✔
703
  ppi->dirty = dirty;
2,147,483,647✔
704
}
2,147,483,647✔
705

706
int32_t setBufPageCompressOnDisk(SDiskbasedBuf* pBuf, bool comp) {
×
707
  pBuf->comp = comp;
×
708
  if (comp && (pBuf->assistBuf == NULL)) {
×
709
    pBuf->assistBuf = taosMemoryMalloc(pBuf->pageSize + 2);  // EXTRA BYTES
×
710
    if (pBuf->assistBuf) {
×
711
      return terrno;
×
712
    }
713
  }
714
  return TSDB_CODE_SUCCESS;
×
715
}
716

717
int32_t dBufSetBufPageRecycled(SDiskbasedBuf* pBuf, void* pPage) {
×
718
  SPageInfo* ppi = getPageInfoFromPayload(pPage);
×
719

720
  int32_t code = tdListAppend(pBuf->freePgList, &ppi);
×
721
  if (TSDB_CODE_SUCCESS != code) {
×
722
    return code;
×
723
  }
724

725
  ppi->used = false;
×
726
  ppi->dirty = false;
×
727

728
  // add this pageinfo into the free page info list
729
  SListNode* pNode = tdListPopNode(pBuf->lruList, ppi->pn);
×
730
  taosMemoryFreeClear(ppi->pData);
×
731
  taosMemoryFreeClear(pNode);
×
732
  ppi->pn = NULL;
×
733
  return TSDB_CODE_SUCCESS;
×
734
}
735

736
void dBufSetPrintInfo(SDiskbasedBuf* pBuf) { pBuf->printStatis = true; }
37,204,011✔
737

738
SDiskbasedBufStatis getDBufStatis(const SDiskbasedBuf* pBuf) { return pBuf->statis; }
15,709,054✔
739

740
void dBufPrintStatis(const SDiskbasedBuf* pBuf) {
376,238,579✔
741
  if (!pBuf->printStatis) {
376,238,579✔
742
    return;
339,042,092✔
743
  }
744

745
  const SDiskbasedBufStatis* ps = &pBuf->statis;
37,205,635✔
746

747
#if 0
748
  printf(
749
      "Paged buffer closed, total:%.2f Kb (%d Pages), inmem size:%.2f Kb (%d Pages), file size:%.2f Kb, page size:%.2f "
750
      "Kb, %s\n",
751
      pBuf->totalBufSize / 1024.0, pBuf->numOfPages, listNEles(pBuf->lruList) * pBuf->pageSize / 1024.0,
752
      listNEles(pBuf->lruList), pBuf->fileSize / 1024.0, pBuf->pageSize / 1024.0f, pBuf->id);
753
#endif
754

755
  if (ps->loadPages > 0) {
37,208,697✔
756
    (void)printf(
22,578✔
757
        "Get/Release pages:%d/%d, flushToDisk:%.2f Kb (%d Pages), loadFromDisk:%.2f Kb (%d Pages), avgPageSize:%.2f "
758
        "Kb\n",
759
        ps->getPages, ps->releasePages, ps->flushBytes / 1024.0f, ps->flushPages, ps->loadBytes / 1024.0f,
22,578✔
760
        ps->loadPages, ps->loadBytes / (1024.0 * ps->loadPages));
22,578✔
761
  } else {
762
    // printf("no page loaded\n");
763
  }
764
}
765

766
void clearDiskbasedBuf(SDiskbasedBuf* pBuf) {
3,229,617✔
767
  if (pBuf == NULL) {
3,229,617✔
768
    return;
×
769
  }
770

771
  int64_t realSizeBefore = -1;
3,229,617✔
772
  if (pBuf->pFile != NULL && taosFStatFile(pBuf->pFile, &realSizeBefore, NULL) != TSDB_CODE_SUCCESS) {
3,230,541✔
773
    realSizeBefore = -1;
×
774
  }
775

776
  const SDiskbasedBufStatis* ps = &pBuf->statis;
3,231,000✔
777
  uDebug(
3,230,549✔
778
      "clear paged buffer begin, pages:%d, inMemPages:%d, fileSize:%" PRIu64 ", nextPos:%" PRIu64
779
      ", realSize:%" PRId64 ", get/release:%d/%d, flush/load:%d/%d, %s",
780
      pBuf->numOfPages, listNEles(pBuf->lruList), pBuf->fileSize, pBuf->nextPos, realSizeBefore, ps->getPages,
781
      ps->releasePages, ps->flushPages, ps->loadPages, pBuf->id);
782

783
  size_t n = taosArrayGetSize(pBuf->pIdList);
3,230,549✔
784
  for (int32_t i = 0; i < n; ++i) {
304,029,997✔
785
    SPageInfo* pi = taosArrayGetP(pBuf->pIdList, i);
300,799,913✔
786
    taosMemoryFreeClear(pi->pData);
300,800,374✔
787
    taosMemoryFreeClear(pi);
300,798,085✔
788
  }
789

790
  taosArrayClear(pBuf->pIdList);
3,230,084✔
791

792
  tdListEmpty(pBuf->lruList);
3,229,633✔
793
  tdListEmpty(pBuf->freePgList);
3,229,633✔
794

795
  taosArrayClear(pBuf->emptyDummyIdList);
3,230,092✔
796
  taosArrayClear(pBuf->pFree);
3,229,639✔
797

798
  tSimpleHashClear(pBuf->all);
3,228,288✔
799

800
  pBuf->numOfPages = 0;  // all pages are in buffer in the first place
3,230,092✔
801
  pBuf->totalBufSize = 0;
3,230,092✔
802
  pBuf->allocateId = -1;
3,230,092✔
803
  pBuf->fileSize = 0;
3,229,639✔
804
  pBuf->nextPos = 0;
3,229,639✔
805

806
  if (pBuf->pFile != NULL) {
3,230,098✔
807
    int32_t code = taosFtruncateFile(pBuf->pFile, 0);
29,761✔
808
    if (code != TSDB_CODE_SUCCESS) {
29,761✔
809
      uWarn("failed to truncate paged buffer file, path:%s, code:%s, %s", pBuf->path, tstrerror(code), pBuf->id);
×
810
    }
811
  }
812

813
  int64_t realSizeAfter = -1;
3,229,180✔
814
  if (pBuf->pFile != NULL && taosFStatFile(pBuf->pFile, &realSizeAfter, NULL) != TSDB_CODE_SUCCESS) {
3,228,276✔
815
    realSizeAfter = -1;
×
816
  }
817

818
  uDebug("clear paged buffer end, pages:%d, inMemPages:%d, fileSize:%" PRIu64 ", nextPos:%" PRIu64
3,229,649✔
819
         ", realSize:%" PRId64 ", %s",
820
         pBuf->numOfPages, listNEles(pBuf->lruList), pBuf->fileSize, pBuf->nextPos, realSizeAfter, pBuf->id);
821
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc