• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4940

27 Jan 2026 10:23AM UTC coverage: 66.832% (-0.1%) from 66.931%
#4940

push

travis-ci

web-flow
fix: asan invalid write issue (#34400)

7 of 8 new or added lines in 2 files covered. (87.5%)

822 existing lines in 141 files now uncovered.

204293 of 305680 relevant lines covered (66.83%)

124534723.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

50.83
/source/dnode/vnode/src/bse/bseMgt.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "bse.h"
17
#include "bseInc.h"
18
#include "bseSnapshot.h"
19
#include "bseTable.h"
20
#include "bseTableMgt.h"
21
#include "bseUtil.h"
22
#include "cJSON.h"
23
#include "tencrypt.h"
24

25
#define BSE_FMT_VER 0x1
26

27
static void bseCfgSetDefault(SBseCfg *pCfg);
28

29
static int32_t bseClear(SBse *pBse);
30
static int32_t bseInitEnv(SBse *p);
31
static int32_t bseInitStartSeq(SBse *pBse);
32
static int32_t bseRecover(SBse *pBse, int8_t rm);
33
static int32_t bseGenCommitInfo(SBse *pBse, SArray *pInfo);
34
static int32_t bseCreateTableManager(SBse *p);
35
static int32_t bseCommitDo(SBse *pBse, SArray *pFileSet);
36

37
static int32_t bseDeserialCommitInfo(SBse *pBse, char *pCurrent, SBseCommitInfo *pInfo);
38
static int32_t bseSerailCommitInfo(SBse *pBse, SArray *fileSet, char **pBuf, int32_t *len);
39
static int32_t bseReadCurrentFile(SBse *pBse, char **p, int64_t *len);
40
static int32_t bseListAllFiles(const char *path, SArray *pFiles);
41
static int32_t bseRemoveUnCommitFile(SBse *p);
42

43
static int32_t bseCreateBatchList(SBse *pBse);
44

45
static int32_t bseBatchMgtInit(SBatchMgt *pBatchMgt, SBse *pBse);
46
static int32_t bseBatchMgtGet(SBatchMgt *pBatchMgt, SBseBatch **pBatch);
47
static int32_t bseBatchMgtRecycle(SBatchMgt *pBatchMgt, SBseBatch *pBatch);
48
static void    bseBatchMgtCleanup(SBatchMgt *pBatchMgt);
49

50
static int32_t bseBatchCreate(SBseBatch **pBatch, int32_t nKeys);
51
static void    bseBatchClear(SBseBatch *pBatch);
52
static int32_t bseRecycleBatchImpl(SBatchMgt *pMgt, SBseBatch *pBatch);
53
static int32_t bseBatchMayResize(SBseBatch *pBatch, int32_t alen);
54

55
int32_t bseSerailCommitInfo(SBse *pBse, SArray *fileSet, char **pBuf, int32_t *len) {
3,476✔
56
  int32_t code = 0;
3,476✔
57
  // int32_t code = 0;
58
  int32_t line = 0;
3,476✔
59

60
  cJSON *pRoot = cJSON_CreateObject();
3,476✔
61
  cJSON *pFileSet = cJSON_CreateArray();
3,476✔
62
  if (pRoot == NULL || pFileSet == NULL) {
3,476✔
63
    code = terrno;
×
64
    TSDB_CHECK_CODE(code, line, _err);
×
65
  }
66

67
  if (cJSON_AddNumberToObject(pRoot, "fmtVer", pBse->commitInfo.fmtVer) == NULL) {
3,476✔
68
    TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_CFG, line, _err);
×
69
  }
70
  if (cJSON_AddNumberToObject(pRoot, "vgId", BSE_VGID(pBse)) == NULL) {
3,476✔
71
    TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_CFG, line, _err);
×
72
  }
73
  if (cJSON_AddNumberToObject(pRoot, "commitVer", pBse->commitInfo.commitVer) == NULL) {
3,476✔
74
    TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_CFG, line, _err);
×
75
  }
76
  if (cJSON_AddNumberToObject(pRoot, "commitSeq", pBse->commitInfo.lastSeq) == NULL) {
3,476✔
77
    TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_CFG, line, _err);
×
78
  }
79

80
  if (!cJSON_AddItemToObject(pRoot, "fileSet", pFileSet)) {
3,476✔
81
    TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_CFG, line, _err);
×
82
  }
83

84
  for (int32_t i = 0; i < taosArrayGetSize(fileSet); i++) {
6,952✔
85
    SBseLiveFileInfo *pInfo = taosArrayGet(fileSet, i);
3,476✔
86
    cJSON            *pField = cJSON_CreateObject();
3,476✔
87
    if (cJSON_AddNumberToObject(pField, "startSeq", pInfo->range.sseq) == NULL) {
3,476✔
88
      TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_CFG, line, _err);
×
89
    }
90
    if (cJSON_AddNumberToObject(pField, "endSeq", pInfo->range.eseq) == NULL) {
3,476✔
91
      TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_CFG, line, _err);
×
92
    }
93
    if (cJSON_AddNumberToObject(pField, "size", pInfo->size) == NULL) {
3,476✔
94
      TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_CFG, line, _err);
×
95
    }
96

97
    if (cJSON_AddNumberToObject(pField, "level", pInfo->level) == NULL) {
3,476✔
98
      TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_CFG, line, _err);
×
99
    }
100
    if (cJSON_AddNumberToObject(pField, "retention", pInfo->timestamp) == NULL) {
3,476✔
101
      TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_CFG, line, _err);
×
102
    }
103
    if (!cJSON_AddItemToArray(pFileSet, pField)) {
3,476✔
104
      TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_CFG, line, _err);
×
105
    }
106
  }
107

108
  char   *pSerialized = cJSON_PrintUnformatted(pRoot);
3,476✔
109
  if (pSerialized == NULL) {
3,476✔
110
    TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_CFG, line, _err);
×
111
  }
112
  int32_t sz = strlen(pSerialized);
3,476✔
113

114
  *pBuf = pSerialized;
3,476✔
115
  *len = sz;
3,476✔
116

117
_err:
3,476✔
118
  if (code != 0) {
3,476✔
119
    bseError("vgId:%d failed at line %d since %s", BSE_VGID(pBse), line, tstrerror(code));
×
120
    cJSON_Delete(pFileSet);
×
121
  }
122
  cJSON_Delete(pRoot);
3,476✔
123
  pRoot = NULL;
3,476✔
124
  return code;
3,476✔
125
}
126
int32_t bseDeserialCommitInfo(SBse *pBse, char *pCurrent, SBseCommitInfo *pInfo) {
×
127
  int32_t code = 0;
×
128
  int32_t lino = 0;
×
129
  cJSON  *pRoot = cJSON_Parse(pCurrent);
×
130
  if (pRoot == NULL) {
×
131
    bseError("vgId:%d, failed to parse current meta", BSE_VGID(pBse));
×
132
    code = TSDB_CODE_FILE_CORRUPTED;
×
133
    TSDB_CHECK_CODE(code, lino, _error);
×
134
  }
135

136
  cJSON *item = cJSON_GetObjectItem(pRoot, "fmtVer");
×
137
  if (item == NULL) {
×
138
    bseError("vgId:%d, failed to get fmtVer from current meta", BSE_VGID(pBse));
×
139
    code = TSDB_CODE_FILE_CORRUPTED;
×
140
    goto _error;
×
141
  }
142
  pInfo->fmtVer = item->valuedouble;
×
143

144
  item = cJSON_GetObjectItem(pRoot, "vgId");
×
145
  if (item == NULL) {
×
146
    bseError("vgId:%d, failed to get vgId from current meta", BSE_VGID(pBse));
×
147
    code = TSDB_CODE_FILE_CORRUPTED;
×
148
    goto _error;
×
149
  }
150
  pInfo->vgId = item->valuedouble;
×
151

152
  item = cJSON_GetObjectItem(pRoot, "commitVer");
×
153
  if (item == NULL) {
×
154
    bseError("vgId:%d, failed to get commitVer from current meta", BSE_VGID(pBse));
×
155
    code = TSDB_CODE_FILE_CORRUPTED;
×
156
    goto _error;
×
157
  }
158
  pInfo->commitVer = item->valuedouble;
×
159

160
  item = cJSON_GetObjectItem(pRoot, "commitSeq");
×
161
  if (item == NULL) {
×
162
    bseError("vgId:%d, failed to get lastSeq from current meta", BSE_VGID(pBse));
×
163
    code = TSDB_CODE_FILE_CORRUPTED;
×
164
    goto _error;
×
165
  }
166
  pInfo->lastSeq = item->valuedouble;
×
167

168
  cJSON *pFiles = cJSON_GetObjectItem(pRoot, "fileSet");
×
169
  cJSON *pField = NULL;
×
170
  cJSON_ArrayForEach(pField, pFiles) {
×
171
    cJSON *pStartSeq = cJSON_GetObjectItem(pField, "startSeq");
×
172
    cJSON *pEndSeq = cJSON_GetObjectItem(pField, "endSeq");
×
173
    cJSON *pFileSize = cJSON_GetObjectItem(pField, "size");
×
174
    cJSON *pLevel = cJSON_GetObjectItem(pField, "level");
×
175
    cJSON *pRetentionTs = cJSON_GetObjectItem(pField, "retention");
×
176
    if (pStartSeq == NULL || pEndSeq == NULL || pFileSize == NULL || pLevel == NULL || pRetentionTs == NULL) {
×
177
      bseError("vgId:%d, failed to get field from files", BSE_VGID(pBse));
×
178
      code = TSDB_CODE_FILE_CORRUPTED;
×
179
      goto _error;
×
180
    }
181

182
    SBseLiveFileInfo info = {0};
×
183
    info.range.sseq = pStartSeq->valuedouble;
×
184
    info.range.eseq = pEndSeq->valuedouble;
×
185
    info.size = pFileSize->valuedouble;
×
186
    info.level = pLevel->valuedouble;
×
187
    info.timestamp = pRetentionTs->valuedouble;
×
188

189
    if (taosArrayPush(pInfo->pFileList, &info) == NULL) {
×
190
      code = terrno;
×
191
      goto _error;
×
192
    }
193
  }
194
_error:
×
195
  if (code != 0) {
×
196
    bseError("vgId:%d failed to get commit info from current meta since %s", BSE_VGID(pBse), tstrerror(code));
×
197
  }
198
  cJSON_Delete(pRoot);
×
199
  return code;
×
200
}
201

202
int32_t bseReadCurrentFile(SBse *pBse, char **p, int64_t *len) {
3,726,448✔
203
  int32_t code = 0;
3,726,448✔
204
  int32_t lino = 0;
3,726,448✔
205
  char    name[TSDB_FILENAME_LEN] = {0};
3,726,448✔
206
  char   *pCurrent = NULL;
3,727,764✔
207
  int32_t sz = 0;
3,727,364✔
208

209
  bseBuildCurrentFullName(pBse, name);
3,727,764✔
210
  if (taosCheckExistFile(name) == 0) {
3,727,247✔
211
    bseInfo("vgId:%d, no current meta file found, skip recover", BSE_VGID(pBse));
3,727,647✔
212
    *p = NULL;
3,729,028✔
213
    *len = 0;
3,727,764✔
214
    return 0;
3,727,764✔
215
  }
216

217
  // Use taosReadCfgFile for encrypted read
218
  // Note: taosReadCfgFile uses global tsCfgKey for decryption
219
  code = taosReadCfgFile(name, &pCurrent, &sz);
×
220
  if (code != 0) {
×
221
    bseError("vgId:%d failed to read CURRENT file %s since %s", BSE_VGID(pBse), name, tstrerror(code));
×
222
    TSDB_CHECK_CODE(code, lino, _error);
×
223
  }
224

225
  *p = pCurrent;
×
226
  *len = (int64_t)sz;
×
227

228
_error:
×
229
  if (code != 0) {
×
230
    bseError("vgId:%d, failed to read current at line %d since %s", BSE_VGID(pBse), lino, tstrerror(code));
×
231
    taosMemoryFree(pCurrent);
×
232
  }
233
  return code;
×
234
}
235

236
int32_t bseListAllFiles(const char *path, SArray *pFiles) {
×
237
  SBseLiveFileInfo info = {0};
×
238

239
  int32_t code = 0;
×
240
  int32_t lino = 0;
×
241

242
  TdDirPtr pDir = taosOpenDir(path);
×
243
  if (pDir == NULL) {
×
244
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
245
  }
246

247
  TdDirEntryPtr de = NULL;
×
248
  while ((de = taosReadDir(pDir)) != NULL) {
×
249
    char *name = taosGetDirEntryName(de);
×
250
    if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue;
×
251

252
    if (strstr(name, BSE_DATA_SUFFIX) == NULL) {
×
253
      continue;
×
254
    }
255
    SBseLiveFileInfo info = {0};
×
256
    memcpy(info.name, name, strlen(name));
×
257

258
    if (taosArrayPush(pFiles, &info) == NULL) {
×
259
      code = terrno;
×
260
      goto _error;
×
261
    }
262
  }
263

264
_error:
×
265
  if (code != 0) {
×
266
    bseError("failed to list files at line %d since %s", lino, tstrerror(code));
×
267
  }
268
  if ((code = taosCloseDir(&pDir)) != 0) {
×
269
    bseError("failed to close dir %s since %s", path, tstrerror(code));
×
270
  }
271
  return code;
×
272
}
273

274
int32_t removeUnCommitFile(SBse *p, SArray *pCommitedFiles, SArray *pAllFiles) {
×
275
  int32_t code = 0;
×
276
  for (int32_t i = 0; i < taosArrayGetSize(pAllFiles); i++) {
×
277
    SBseLiveFileInfo *pInfo = taosArrayGet(pAllFiles, i);
×
278
    int32_t           found = 0;
×
279
    for (int32_t j = 0; j < taosArrayGetSize(pCommitedFiles); j++) {
×
280
      SBseLiveFileInfo *pCommited = taosArrayGet(pCommitedFiles, j);
×
281
      if (strcmp(pInfo->name, pCommited->name) == 0) {
×
282
        found = 1;
×
283
        break;
×
284
      }
285
    }
286
    if (found == 0) {
×
287
      char buf[TSDB_FILENAME_LEN] = {0};
×
288
      bseBuildFullName(p, pInfo->name, buf);
×
289

290
      code = taosRemoveFile(buf);
×
291
      if (code != 0) {
×
292
        bseError("vgId:%d failed to remove file %s since %s", BSE_VGID(p), pInfo->name, tstrerror(code));
×
293
      } else {
294
        bseInfo("vgId:%d remove file %s", BSE_VGID(p), pInfo->name);
×
295
      }
296
    }
297
  }
298

299
  return code;
×
300
}
301
int32_t bseRemoveUnCommitFile(SBse *p) {
×
302
  int32_t code = 0;
×
303

304
  SArray *pFiles = taosArrayInit(64, sizeof(SBseLiveFileInfo));
×
305
  if (pFiles == NULL) {
×
306
    return terrno;
×
307
  }
308

309
  code = bseListAllFiles(p->path, pFiles);
×
310
  if (code != 0) {
×
311
    taosArrayDestroy(pFiles);
×
312
    return code;
×
313
  }
314
  code = removeUnCommitFile(p, p->commitInfo.pFileList, pFiles);
×
315
  taosArrayDestroy(pFiles);
×
316
  return code;
×
317
}
318

319
int32_t bseInitStartSeq(SBse *pBse) {
3,727,764✔
320
  int32_t code = 0;
3,727,764✔
321
  int64_t lastSeq = 0;
3,727,764✔
322

323
  SBseLiveFileInfo *pLastFile = taosArrayGetLast(pBse->commitInfo.pFileList);
3,727,764✔
324
  if (pLastFile != NULL) {
3,727,764✔
325
    lastSeq = pLastFile->range.eseq;
×
326
  }
327

328
  pBse->seq = lastSeq + 1;
3,727,764✔
329
  return code;
3,727,764✔
330
}
331

332
int32_t bseRecover(SBse *pBse, int8_t rmUnCommited) {
3,727,364✔
333
  int32_t code = 0;
3,727,364✔
334
  int32_t lino = 0;
3,727,364✔
335
  char   *pCurrent = NULL;
3,727,364✔
336
  int64_t len = 0;
3,727,764✔
337

338
  code = bseReadCurrentFile(pBse, &pCurrent, &len);
3,727,364✔
339
  TSDB_CHECK_CODE(code, lino, _error);
3,727,764✔
340

341
  if (len == 0) {
3,727,764✔
342
    bseInfo("vgId:%d, no current meta file found, no need to recover", BSE_VGID(pBse));
3,727,764✔
343
  } else {
344
    code = bseDeserialCommitInfo(pBse, pCurrent, &pBse->commitInfo);
×
345
    TSDB_CHECK_CODE(code, lino, _error);
×
346

347
    if (pBse->commitInfo.fmtVer != BSE_FMT_VER) {
×
348
      bseError("vgId:%d, current meta file version %d not match with %d", BSE_VGID(pBse), pBse->commitInfo.fmtVer,
×
349
               BSE_FMT_VER);
350
      code = TSDB_CODE_FILE_CORRUPTED;
×
351
      goto _error;
×
352
    }
353

354
    if (taosArrayGetSize(pBse->commitInfo.pFileList) > 0) {
×
355
      SBseLiveFileInfo *pLast = taosArrayGetLast(pBse->commitInfo.pFileList);
×
356
      code = bseTableMgtRecoverTable(pBse->pTableMgt, pLast);
×
357
      TSDB_CHECK_CODE(code, lino, _error);
×
358

359
      code = bseTableMgtSetLastTableId(pBse->pTableMgt, pLast->timestamp);
×
360
    }
361
  }
362

363
  code = bseInitStartSeq(pBse);
3,727,764✔
364
  TSDB_CHECK_CODE(code, lino, _error);
3,727,764✔
365

366
_error:
3,727,764✔
367
  if (code != 0) {
3,727,764✔
368
    bseError("vgId:%d, failed to recover since %s", BSE_VGID(pBse), tstrerror(code));
×
369
  }
370
  taosMemoryFree(pCurrent);
3,727,764✔
371
  return code;
3,727,764✔
372
}
373
int32_t bseInitLock(SBse *pBse) {
3,722,500✔
374
  TdThreadRwlockAttr attr;
3,718,931✔
375
  (void)taosThreadRwlockAttrInit(&attr);
3,722,689✔
376
  (void)taosThreadRwlockAttrSetKindNP(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
3,723,531✔
377
  (void)taosThreadRwlockInit(&pBse->rwlock, &attr);
3,722,371✔
378
  (void)taosThreadRwlockAttrDestroy(&attr);
3,722,181✔
379

380
  (void)taosThreadMutexInit(&pBse->mutex, NULL);
3,725,178✔
381
  return 0;
3,724,082✔
382
}
383

384
int32_t bseInitEnv(SBse *p) {
3,724,744✔
385
  int32_t code = 0;
3,724,744✔
386
  int32_t lino = 0;
3,724,744✔
387

388
  code = bseInitLock(p);
3,724,744✔
389
  TSDB_CHECK_CODE(code, lino, _err);
3,723,824✔
390

391
  code = taosMkDir(p->path);
3,723,824✔
392
  TSDB_CHECK_CODE(code, lino, _err);
3,727,647✔
393
_err:
3,727,647✔
394
  if (code != 0) {
3,727,647✔
395
    bseError("failed to init bse env at line %d since %s", lino, tstrerror(code));
×
396
  }
397
  return code;
3,727,233✔
398
}
399

400
int32_t bseCreateTableManager(SBse *p) { return bseTableMgtCreate(p, (void **)&p->pTableMgt); }
3,726,899✔
401

402
int32_t bseCreateCommitInfo(SBse *pBse) {
3,727,571✔
403
  SBseCommitInfo *pCommit = &pBse->commitInfo;
3,727,571✔
404
  pCommit->pFileList = taosArrayInit(64, sizeof(SBseLiveFileInfo));
3,727,571✔
405
  if (pCommit->pFileList == NULL) {
3,727,364✔
406
    return terrno;
×
407
  }
408

409
  pCommit->fmtVer = BSE_FMT_VER;
3,727,764✔
410
  return 0;
3,727,764✔
411
}
412

413
void bseCfgSetDefault(SBseCfg *pCfg) {
3,725,998✔
414
  if (pCfg == NULL) {
3,725,998✔
415
    return;
×
416
  }
417
  if (pCfg->compressType == 0) {
3,725,998✔
418
    pCfg->compressType = kLZ4Compres;
3,726,420✔
419
  }
420
  if (pCfg->blockSize == 0) {
3,726,770✔
421
    pCfg->blockSize = BSE_DEFAULT_BLOCK_SIZE;
3,726,688✔
422
  }
423

424
  if (pCfg->keepDays == 0) {
3,727,265✔
425
    pCfg->keepDays = 10;
×
426
  }
427

428
  if (pCfg->keeps == 0) {
3,725,141✔
429
    pCfg->keeps = 365;
×
430
  }
431

432
  if (pCfg->precision == 0) {
3,725,010✔
433
    pCfg->precision = TSDB_TIME_PRECISION_MILLI;  // default precision is 1 second
3,611,745✔
434
  }
435
}
436

437
int32_t bseOpen(const char *path, SBseCfg *pCfg, SBse **pBse) {
3,716,228✔
438
  int32_t lino = 0;
3,716,228✔
439
  int32_t code = 0;
3,716,228✔
440

441
  SBse *p = taosMemoryCalloc(1, sizeof(SBse));
3,716,228✔
442
  if (p == NULL) {
3,727,064✔
443
    TSDB_CHECK_CODE(code = terrno, lino, _err);
×
444
  }
445

446
  p->cfg = *pCfg;
3,727,064✔
447
  bseCfgSetDefault(&p->cfg);
3,727,064✔
448

449
  tstrncpy(p->path, path, sizeof(p->path));
3,723,860✔
450

451
  code = bseInitEnv(p);
3,724,682✔
452
  TSDB_CHECK_CODE(code, lino, _err);
3,726,899✔
453

454
  code = bseCreateTableManager(p);
3,726,899✔
455
  TSDB_CHECK_CODE(code, lino, _err);
3,727,571✔
456

457
  code = bseCreateCommitInfo(p);
3,727,571✔
458
  TSDB_CHECK_CODE(code, lino, _err);
3,727,364✔
459

460
  code = bseBatchMgtInit(p->batchMgt, p);
3,727,364✔
461
  TSDB_CHECK_CODE(code, lino, _err);
3,727,364✔
462

463
  code = bseRecover(p, 1);
3,727,364✔
464
  TSDB_CHECK_CODE(code, lino, _err);
3,727,764✔
465

466
  *pBse = p;
3,727,764✔
467
_err:
3,727,764✔
468
  if (code != 0) {
3,727,764✔
469
    bseClose(p);
×
470
    bseError("vgId:%d failed to open bse at line %d since %s", BSE_VGID(p), lino, tstrerror(code));
×
471
  }
472
  return code;
3,727,764✔
473
}
474

475
static int32_t bseClear(SBse *pBse) {
×
476
  int32_t code = 0;
×
477
  int32_t lino = 0;
×
478

479
  code = bseTableMgtClear(pBse->pTableMgt);
×
480
  TSDB_CHECK_CODE(code, lino, _error);
×
481

482
_error:
×
483
  if (code != 0) {
×
484
    bseError("vgId:%d failed to clear bse at line %d since %s", BSE_VGID(pBse), lino, tstrerror(code));
×
485
  }
486
  return code;
×
487
}
488
void bseClose(SBse *pBse) {
3,727,764✔
489
  int32_t code;
490
  if (pBse == NULL) {
3,727,764✔
491
    return;
×
492
  }
493
  bseTableMgtCleanup(pBse->pTableMgt);
3,727,764✔
494
  bseBatchMgtCleanup(pBse->batchMgt);
3,727,764✔
495

496
  taosArrayDestroy(pBse->commitInfo.pFileList);
3,727,187✔
497
  (void)taosThreadMutexDestroy(&pBse->mutex);
3,727,105✔
498
  (void)taosThreadRwlockDestroy(&pBse->rwlock);
3,726,498✔
499

500
  taosMemoryFree(pBse);
3,726,954✔
501
  return;
3,726,973✔
502
}
503

504
int32_t bseGet(SBse *pBse, uint64_t seq, uint8_t **pValue, int32_t *len) {
30,686,108✔
505
  int32_t line = 0;
30,686,108✔
506
  int32_t code = 0;
30,686,108✔
507

508
  (void)taosThreadRwlockRdlock(&pBse->rwlock);
30,686,108✔
509
  code = bseTableMgtGet(pBse->pTableMgt, seq, pValue, len);
30,686,108✔
510
  (void)taosThreadRwlockUnlock(&pBse->rwlock);
30,686,108✔
511

512
  if (code != 0) {
30,686,108✔
513
    bseError("vgId:%d failed to get value from seq %" PRId64 " at line %d since %s", BSE_VGID(pBse), seq, line,
×
514
             tstrerror(code));
515
  } else {
516
    bseDebug("vgId:%d get value from seq %" PRId64 " at line %d", BSE_VGID(pBse), seq, line);
30,686,108✔
517
  }
518
  return code;
30,686,108✔
519
}
520

521
int32_t bseCommitBatch(SBse *pBse, SBseBatch *pBatch) {
21,269✔
522
  int32_t code = 0;
21,269✔
523
  int32_t lino = 0;
21,269✔
524
  (void)taosThreadMutexLock(&pBse->mutex);
21,269✔
525
  pBatch->commited = 1;
21,269✔
526

527
  while (!BSE_QUEUE_IS_EMPTY(&pBse->batchMgt->queue)) {
42,538✔
528
    bsequeue *h = BSE_QUEUE_HEAD(&pBse->batchMgt->queue);
21,269✔
529

530
    SBseBatch *p = BSE_QUEUE_DATA(h, SBseBatch, node);
21,269✔
531
    if (p->commited == 1) {
21,269✔
532
      BSE_QUEUE_REMOVE(&p->node);
21,269✔
533

534
      code = bseTableMgtAppend(pBse->pTableMgt, pBatch);
21,269✔
535
      TSDB_CHECK_CODE(code, lino, _error);
21,269✔
536

537
      code = bseRecycleBatchImpl(pBse->batchMgt, p);
21,269✔
538
      TSDB_CHECK_CODE(code, lino, _error);
21,269✔
539
    } else {
540
      break;
×
541
    }
542
  }
543
_error:
21,269✔
544
  if (code != 0) {
21,269✔
545
    bseError("vgId:%d failed to append batch at line %d since %s", BSE_VGID(pBse), lino, tstrerror(code));
×
546
  }
547
  (void)taosThreadMutexUnlock(&pBse->mutex);
21,269✔
548
  return code;
21,269✔
549
}
550

551
int32_t bseReload(SBse *pBse) {
×
552
  int32_t code = 0;
×
553
  int32_t lino = 0;
×
554

555
  (void)taosThreadMutexLock(&pBse->mutex);
×
556
  code = bseClear(pBse);
×
557
  TSDB_CHECK_CODE(code, lino, _error);
×
558

559
  code = bseRecover(pBse, 1);
×
560
  TSDB_CHECK_CODE(code, lino, _error);
×
561

562
_error:
×
563
  if (code != 0) {
×
564
    bseError("vgId:%d failed to reload bse at line %d since %s", BSE_VGID(pBse), lino, tstrerror(code));
×
565
  }
566
  (void)taosThreadMutexUnlock(&pBse->mutex);
×
567
  return code;
×
568
}
569
int32_t bseTrim(SBse *pBse) {
×
570
  int32_t code = 0;
×
571
  return code;
×
572
}
573

574
int32_t bseRecycleBatch(SBse *pBse, SBseBatch *pBatch) {
×
575
  int32_t code = 0;
×
576
  if (pBatch == NULL) return code;
×
577

578
  (void)taosThreadMutexLock(&pBse->mutex);
×
579
  code = bseRecycleBatchImpl(pBse->batchMgt, pBatch);
×
580
  (void)taosThreadMutexUnlock(&pBse->mutex);
×
581
  return code;
×
582
}
583

584
static int32_t bseBatchMgtInit(SBatchMgt *pBatchMgt, SBse *pBse) {
3,727,681✔
585
  int32_t code = 0;
3,727,681✔
586
  int32_t lino = 0;
3,727,681✔
587

588
  pBatchMgt->pBse = pBse;
3,727,681✔
589

590
  pBatchMgt->pBatchList = taosArrayInit(2, sizeof(SBseBatch *));
3,727,681✔
591
  if (pBatchMgt->pBatchList == NULL) {
3,727,364✔
592
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
593
  }
594

595
  SBseBatch *b = NULL;
3,727,364✔
596
  code = bseBatchCreate(&b, 1024);
3,727,247✔
597
  TSDB_CHECK_CODE(code, lino, _error);
3,726,964✔
598

599
  if (taosArrayPush(pBatchMgt->pBatchList, &b) == NULL) {
7,454,728✔
600
    bseBatchDestroy(b);
×
601
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
602
  }
603

604
  BSE_QUEUE_INIT(&pBatchMgt->queue);
3,727,764✔
605
_error:
3,727,364✔
606
  if (code != 0) {
3,727,364✔
607
    if (pBatchMgt->pBatchList != NULL) {
×
608
      for (int32_t i = 0; i < taosArrayGetSize(pBatchMgt->pBatchList); i++) {
×
609
        SBseBatch **p = taosArrayGet(pBatchMgt->pBatchList, i);
×
610
        bseBatchDestroy(*p);
×
611
      }
612
      taosArrayDestroy(pBatchMgt->pBatchList);
×
613
    }
614
    bseError("vgId:%d failed to init batch mgt at line %d since %s", BSE_VGID(pBse), lino, tstrerror(code));
×
615
  }
616
  return code;
3,727,364✔
617
}
618

619
static void bseBatchMgtCleanup(SBatchMgt *pBatchMgt) {
3,727,764✔
620
  if (pBatchMgt == NULL) return;
3,727,764✔
621

622
  for (int32_t i = 0; i < taosArrayGetSize(pBatchMgt->pBatchList); i++) {
7,455,446✔
623
    SBseBatch **p = taosArrayGet(pBatchMgt->pBatchList, i);
3,727,187✔
624
    bseBatchDestroy(*p);
3,727,187✔
625
  }
626

627
  taosArrayDestroy(pBatchMgt->pBatchList);
3,727,682✔
628
}
629

630
static int32_t bseBatchMgtRecycle(SBatchMgt *pBatchMgt, SBseBatch *pBatch) {
21,269✔
631
  int32_t code = 0;
21,269✔
632
  if (pBatch == NULL) return code;
21,269✔
633

634
  bseBatchClear(pBatch);
21,269✔
635

636
  if (taosArrayPush(pBatchMgt->pBatchList, &pBatch) == NULL) {
42,538✔
637
    bseBatchDestroy(pBatch);
×
638
    code = terrno;
×
639
  }
640
  if (code != 0) {
21,269✔
641
    bseError("vgId:%d failed to recycle batch since %s", BSE_VGID((SBse *)pBatchMgt->pBse), tstrerror(code));
×
642
  }
643
  return code;
21,269✔
644
}
645
static int32_t bseBatchMgtGet(SBatchMgt *pBatchMgt, SBseBatch **pBatch) {
21,269✔
646
  int32_t code = 0;
21,269✔
647
  int32_t lino = 0;
21,269✔
648

649
  SBseBatch **p;
650

651
  if (taosArrayGetSize(pBatchMgt->pBatchList) > 0) {
21,269✔
652
    p = (SBseBatch **)taosArrayPop(pBatchMgt->pBatchList);
21,269✔
653
  } else {
654
    SBseBatch *b = NULL;
×
655
    code = bseBatchCreate(&b, 1024);
×
656
    TSDB_CHECK_CODE(code, lino, _error);
×
657

658
    if (taosArrayPush(pBatchMgt->pBatchList, &b) == NULL) {
×
659
      bseBatchDestroy(b);
×
660
      TSDB_CHECK_CODE(code = terrno, lino, _error);
×
661
    }
662
    p = (SBseBatch **)taosArrayPop(pBatchMgt->pBatchList);
×
663
  }
664

665
  BSE_QUEUE_PUSH(&pBatchMgt->queue, &((*p)->node));
21,269✔
666
  *pBatch = *p;
21,269✔
667

668
_error:
21,269✔
669
  if (code != 0) {
21,269✔
670
    bseInfo("vgId:%d failed to get bse batch at line %d since %s", BSE_VGID((SBse *)pBatchMgt->pBse), lino,
×
671
            tstrerror(code));
672
  }
673
  return code;
21,269✔
674
}
675

676
int32_t bseRecycleBatchImpl(SBatchMgt *pMgt, SBseBatch *pBatch) {
21,269✔
677
  // code
678
  return bseBatchMgtRecycle(pMgt, pBatch);
21,269✔
679
}
680

681
int32_t bseBatchCreate(SBseBatch **pBatch, int32_t nKeys) {
3,702,527✔
682
  int32_t    code = 0;
3,702,527✔
683
  int32_t    lino = 0;
3,702,527✔
684
  SBseBatch *p = taosMemoryCalloc(1, sizeof(SBseBatch));
3,702,527✔
685
  if (p == NULL) {
3,727,335✔
686
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
687
  }
688

689
  p->len = 0;
3,727,335✔
690
  p->seq = 0;
3,726,884✔
691
  p->cap = 1024;
3,726,913✔
692
  p->buf = taosMemCalloc(1, p->cap);
3,727,542✔
693
  if (p->buf == NULL) {
3,726,771✔
694
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
695
  }
696

697
  p->pSeq = taosArrayInit(nKeys, sizeof(SBlockItemInfo));
3,726,630✔
698
  if (p->pSeq == NULL) {
3,727,764✔
699
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
700
  }
701

702
  BSE_QUEUE_INIT(&p->node);
3,727,764✔
703

704
  *pBatch = p;
3,727,364✔
705

706
_error:
3,727,364✔
707
  if (code != 0) {
3,727,364✔
708
    bseError("failed to create bse batch since %s at line %d", tstrerror(code), lino);
×
709
    bseBatchDestroy(p);
×
710
  }
711
  return code;
3,727,764✔
712
}
713
int32_t bseBatchSetParam(SBseBatch *pBatch, int64_t seq, int32_t cap) {
21,269✔
714
  pBatch->seq = seq;
21,269✔
715
  return taosArrayEnsureCap(pBatch->pSeq, cap);
21,269✔
716
}
717
int32_t bseBatchInit(SBse *pBse, SBseBatch **pBatch, int32_t nKeys) {
21,269✔
718
  int32_t    code = 0;
21,269✔
719
  int32_t    lino = 0;
21,269✔
720
  SBseBatch *p = NULL;
21,269✔
721
  uint64_t   sseq = 0;
21,269✔
722

723
  // atomic later
724
  (void)taosThreadMutexLock(&pBse->mutex);
21,269✔
725
  sseq = pBse->seq;
21,269✔
726
  pBse->seq += nKeys;
21,269✔
727

728
  code = bseBatchMgtGet(pBse->batchMgt, &p);
21,269✔
729
  (void)taosThreadMutexUnlock(&pBse->mutex);
21,269✔
730

731
  bseDebug("vgId:%d bse seq start from: %" PRId64 " to %" PRId64 "", BSE_VGID(pBse), sseq, sseq + nKeys - 1);
21,269✔
732
  TSDB_CHECK_CODE(code, lino, _error);
21,269✔
733

734
  code = bseBatchSetParam(p, sseq, nKeys);
21,269✔
735
  TSDB_CHECK_CODE(code, lino, _error);
21,269✔
736

737
  p->startSeq = sseq;
21,269✔
738
  p->pBse = pBse;
21,269✔
739
  *pBatch = p;
21,269✔
740
_error:
21,269✔
741
  if (code != 0) {
21,269✔
742
    bseError("vgId:%d failed to build batch since %s", BSE_VGID((SBse *)p->pBse), tstrerror(code));
×
743
    bseBatchDestroy(p);
×
744
  }
745
  return code;
21,269✔
746
}
747

748
int32_t bseBatchPut(SBseBatch *pBatch, int64_t *seq, uint8_t *value, int32_t len) {
20,461,208✔
749
  int32_t code = 0;
20,461,208✔
750
  int32_t lino = 0;
20,461,208✔
751
  int32_t offset = 0;
20,461,208✔
752

753
  int64_t lseq = pBatch->seq;
20,461,208✔
754

755
  code = bseBatchMayResize(pBatch, pBatch->len + len);
20,461,208✔
756
  TSDB_CHECK_CODE(code, lino, _error);
20,461,208✔
757

758
  uint8_t *p = pBatch->buf + pBatch->len;
20,461,208✔
759
  pBatch->len += taosEncodeBinary((void **)&p, value, len);
20,461,208✔
760

761
  SBlockItemInfo info = {.size = len, .seq = lseq};
20,461,208✔
762
  if (taosArrayPush(pBatch->pSeq, &info) == NULL) {
40,922,416✔
763
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
764
  }
765

766
  pBatch->seq++;
20,461,208✔
767
  pBatch->num++;
20,461,208✔
768

769
  *seq = lseq;
20,461,208✔
770
  bseDebug("succ to put seq %" PRId64 " to batch", lseq);
20,461,208✔
771

772
_error:
20,461,208✔
773
  if (code != 0) {
20,461,208✔
774
    bseError("vgId:%d failed to put value by seq %" PRId64 " at line %d since %s", BSE_VGID((SBse *)pBatch->pBse), lseq,
×
775
             lino, tstrerror(code));
776
  }
777
  return code;
20,461,208✔
778
}
779

780
int32_t bseBatchGetSize(SBseBatch *pBatch, int32_t *sz) {
×
781
  int32_t code = 0;
×
782

783
  if (pBatch == NULL) return TSDB_CODE_INVALID_MSG;
×
784
  *sz = pBatch->len;
×
785

786
  return code;
×
787
}
788

789
int32_t bseBatchExccedLimit(SBseBatch *pBatch) {
×
790
  if (pBatch == NULL) return 0;
×
791
  SBse *pBse = pBatch->pBse;
×
792
  if ((pBatch->len + 128) >= (BSE_BLOCK_SIZE(pBse) >> 2)) {
×
793
    return 1;
×
794
  }
795
  return 0;
×
796
}
797

798
int32_t bseBatchGet(SBseBatch *pBatch, uint64_t seq, uint8_t **pValue, int32_t *len) {
×
799
  int32_t code = 0;
×
800
  return 0;
×
801
}
802
void bseBatchClear(SBseBatch *pBatch) {
21,269✔
803
  pBatch->len = 0;
21,269✔
804
  pBatch->num = 0;
21,269✔
805
  pBatch->seq = 0;
21,269✔
806
  pBatch->commited = 0;
21,269✔
807
  BSE_QUEUE_REMOVE(&pBatch->node);
21,269✔
808
  taosArrayClear(pBatch->pSeq);
21,269✔
809
}
21,269✔
810
void bseBatchDestroy(SBseBatch *pBatch) {
3,727,764✔
811
  if (pBatch == NULL) return;
3,727,764✔
812

813
  int32_t code = 0;
3,727,764✔
814
  taosMemoryFree(pBatch->buf);
3,727,764✔
815
  taosArrayDestroy(pBatch->pSeq);
3,727,187✔
816
  BSE_QUEUE_REMOVE(&pBatch->node);
3,727,187✔
817

818
  taosMemoryFree(pBatch);
3,727,764✔
819
}
820

821
int32_t bseBatchMayResize(SBseBatch *pBatch, int32_t alen) {
20,461,208✔
822
  int32_t lino = 0;
20,461,208✔
823
  int32_t code = 0;
20,461,208✔
824
  if (alen > pBatch->cap) {
20,461,208✔
825
    int32_t cap = pBatch->cap;
3,787✔
826
    while (cap < alen) {
7,574✔
827
      cap <<= 1;
3,787✔
828
    }
829

830
    uint8_t *buf = taosMemRealloc(pBatch->buf, cap);
3,787✔
831
    if (buf == NULL) {
3,787✔
832
      TSDB_CHECK_CODE(code = terrno, lino, _error);
×
833
    }
834

835
    pBatch->cap = cap;
3,787✔
836
    pBatch->buf = buf;
3,787✔
837
  } else {
838
    return code;
20,457,421✔
839
  }
840
_error:
3,787✔
841
  if (code != 0) {
3,787✔
842
    bseError("failed to resize batch buffer since %s at line %d", tstrerror(code), lino);
×
843
  }
844
  return code;
3,787✔
845
}
846

847
static int32_t seqComparFunc(const void *p1, const void *p2) {
×
848
  uint64_t pu1 = *(const uint64_t *)p1;
×
849
  uint64_t pu2 = *(const uint64_t *)p2;
×
850
  if (pu1 == pu2) {
×
851
    return 0;
×
852
  } else {
853
    return (pu1 < pu2) ? -1 : 1;
×
854
  }
855
}
856
int32_t bseMultiGet(SBse *pBse, SArray *pKey, SArray *ppValue) {
×
857
  int32_t code = 0;
×
858
  taosSort(pKey->pData, taosArrayGetSize(pKey), sizeof(int64_t), seqComparFunc);
×
859
  (void)taosThreadMutexLock(&pBse->mutex);
×
860
  (void)taosThreadMutexUnlock(&pBse->mutex);
×
861
  return code;
×
862
}
863
// int32_t bseIterate(SBse *pBse, uint64_t start, uint64_t end, SArray *pValue) {
864
//   int32_t code = 0;
865
//   taosThreadMutexLock(&pBse->mutex);
866
//   taosThreadMutexUnlock(&pBse->mutex);
867
//   return code;
868
// }
869

870
int32_t bseGenCommitInfo(SBse *pBse, SArray *pFileSet) {
3,476✔
871
  int32_t code = 0;
3,476✔
872
  int32_t lino = 0;
3,476✔
873
  char    buf[TSDB_FILENAME_LEN] = {0};
3,476✔
874
  char   *pBuf = NULL;
3,476✔
875
  int32_t len = 0;
3,476✔
876

877
  code = bseSerailCommitInfo(pBse, pFileSet, &pBuf, &len);
3,476✔
878
  TSDB_CHECK_CODE(code, lino, _error);
3,476✔
879

880
  bseBuildCurrentFullName(pBse, buf);
3,476✔
881

882
  // Use taosWriteCfgFile for encrypted write (atomic with temp file)
883
  // Note: taosWriteCfgFile uses global tsCfgKey for encryption
884
  code = taosWriteCfgFile(buf, pBuf, len);
3,476✔
885
  if (code != 0) {
3,476✔
886
    bseError("vgId:%d failed to write CURRENT file since %s", BSE_VGID(pBse), tstrerror(code));
×
887
    TSDB_CHECK_CODE(code, lino, _error);
×
888
  }
889

890
_error:
3,476✔
891
  if (code != 0) {
3,476✔
892
    bseError("vgId:%d failed to gen commit info since %s", BSE_VGID(pBse), tstrerror(code));
×
893
  }
894
  taosMemoryFree(pBuf);
3,476✔
895
  return code;
3,476✔
896
}
897

898
int32_t bseCommitFinish(SBse *pBse) {
3,476✔
899
  // taosWriteCfgFile already handles atomic write with temp file and rename
900
  // No additional action needed here
901
  return 0;
3,476✔
902
}
903
int32_t bseCommitDo(SBse *pBse, SArray *pFileSet) {
3,476✔
904
  int32_t code = 0;
3,476✔
905
  int32_t lino = 0;
3,476✔
906

907
  code = bseGenCommitInfo(pBse, pFileSet);
3,476✔
908
  TSDB_CHECK_CODE(code, lino, _error);
3,476✔
909

910
  code = bseCommitFinish(pBse);
3,476✔
911
  TSDB_CHECK_CODE(code, lino, _error);
3,476✔
912
_error:
3,476✔
913
  if (code != 0) {
3,476✔
914
    bseError("vgId:%d failed to commit at line %d since %s", BSE_VGID(pBse), lino, tstrerror(code));
×
915
  }
916
  return code;
3,476✔
917
}
918

919
int32_t bseUpdateCommitInfo(SBse *pBse, SBseLiveFileInfo *pInfo, SArray **ppResult) {
3,476✔
920
  int32_t code = 0;
3,476✔
921
  int32_t lino = 0;
3,476✔
922

923
  (void)taosThreadMutexLock(&pBse->mutex);
3,476✔
924

925
  SBseCommitInfo *pCommit = &pBse->commitInfo;
3,476✔
926

927
  int32_t fileListSize = taosArrayGetSize(pCommit->pFileList);
3,476✔
928
  if (fileListSize == 0) {
3,476✔
929
    if (taosArrayPush(pCommit->pFileList, pInfo) == NULL) {
4,788✔
930
      TSDB_CHECK_CODE(code = terrno, lino, _error);
×
931
    }
932
  } else {
933
    SBseLiveFileInfo *pLast = taosArrayGetLast(pCommit->pFileList);
1,082✔
934
    if (pLast->timestamp == pInfo->timestamp) {
1,082✔
935
      *pLast = *pInfo;
1,082✔
936
    } else {
937
      if (taosArrayPush(pCommit->pFileList, pInfo) == NULL) {
×
938
        TSDB_CHECK_CODE(code = terrno, lino, _error);
×
939
      }
940
    }
941
  }
942

943
  code = bseGetAliveFileList(pBse, ppResult, 0);
3,476✔
944
  TSDB_CHECK_CODE(code, lino, _error);
3,476✔
945

946
_error:
3,476✔
947
  if (code != 0) {
3,476✔
948
    bseError("vgId:%d failed to update commit info since %s", BSE_VGID(pBse), tstrerror(code));
×
949
  }
950

951
  (void)taosThreadMutexUnlock(&pBse->mutex);
3,476✔
952
  return code;
3,476✔
953
}
954

955
int32_t bseGetAliveFileList(SBse *pBse, SArray **pFileList, int8_t lock) {
3,476✔
956
  int32_t code = 0;
3,476✔
957
  int32_t lino = 0;
3,476✔
958

959
  SArray *p = taosArrayInit(4, sizeof(SBseLiveFileInfo));
3,476✔
960
  if (p == NULL) {
3,476✔
961
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
962
  }
963

964
  if (lock) {
3,476✔
965
    (void)taosThreadMutexLock(&pBse->mutex);
×
966
  }
967

968
  if (taosArrayAddAll(p, pBse->commitInfo.pFileList) == NULL) {
3,476✔
969
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
970
  }
971

972
  *pFileList = p;
3,476✔
973
_error:
3,476✔
974
  if (code != 0) {
3,476✔
975
    bseError("vgId:%d failed to get alive file list since %s", BSE_VGID(pBse), tstrerror(code));
×
976
  }
977
  if (lock) {
3,476✔
978
    (void)taosThreadMutexUnlock(&pBse->mutex);
×
979
  }
980
  return code;
3,476✔
981
}
982

983
int8_t bseSkipCommit(SBse *pBse, SBseLiveFileInfo *info) {
4,653,370✔
984
  if (info->size == 0) {
4,653,370✔
985
    bseInfo("vgId:%d no data to commit", BSE_VGID(pBse));
4,649,894✔
986
    return 1;
4,649,894✔
987
  }
988
  return 0;
3,476✔
989
}
990

991
int32_t bseCommit(SBse *pBse) {
4,649,829✔
992
  // Generate static info and footer info;
993
  int64_t cost = 0;
4,649,829✔
994
  int32_t code = 0;
4,649,829✔
995
  int32_t line = 0;
4,649,829✔
996
  int64_t st = taosGetTimestampMs();
4,653,370✔
997
  SArray *pLiveFile = NULL;
4,653,370✔
998

999
  SBseLiveFileInfo info = {0};
4,653,129✔
1000
  code = bseTableMgtCommit(pBse->pTableMgt, &info);
4,653,129✔
1001
  TSDB_CHECK_CODE(code, line, _error);
4,653,370✔
1002

1003
  if (bseSkipCommit(pBse, &info)) {
4,653,370✔
1004
    goto _error;
4,649,894✔
1005
  }
1006

1007
  code = bseUpdateCommitInfo(pBse, &info, &pLiveFile);
3,476✔
1008
  TSDB_CHECK_CODE(code, line, _error);
3,476✔
1009

1010
  code = bseCommitDo(pBse, pLiveFile);
3,476✔
1011
  TSDB_CHECK_CODE(code, line, _error);
3,476✔
1012

1013
_error:
4,653,370✔
1014
  cost = taosGetTimestampMs() - st;
4,653,370✔
1015
  if (cost >= 1000) {
4,653,370✔
UNCOV
1016
    bseWarn("vgId:%d bse commit cost %" PRId64 " ms", BSE_VGID(pBse), cost);
×
1017
  }
1018
  if (code != 0) {
4,653,370✔
1019
    bseError("vgId:%d failed to commit at line %d since %s", BSE_VGID(pBse), line, tstrerror(code));
×
1020
  }
1021
  taosArrayDestroy(pLiveFile);
4,653,370✔
1022

1023
  return code;
4,653,370✔
1024
}
1025

1026
int32_t bseRollback(SBse *pBse, int64_t ver) {
×
1027
  // TODO
1028
  int32_t code = 0;
×
1029
  return code;
×
1030
}
1031

1032
int32_t bseRollbackImpl(SBse *pBse) {
×
1033
  int32_t code = 0;
×
1034
  return code;
×
1035
}
1036

1037
int32_t bseCompact(SBse *pBse) {
×
1038
  // impl later
1039
  int32_t code = 0;
×
1040
  return code;
×
1041
}
1042

1043
int32_t bseDelete(SBse *pBse, SSeqRange range) {
×
1044
  int32_t code = 0;
×
1045
  return code;
×
1046
}
1047

1048
int32_t bseUpdateCfg(SBse *pBse, SBseCfg *pCfg) {
×
1049
  int32_t code = 0;
×
1050
  if (pCfg == NULL) {
×
1051
    return TSDB_CODE_INVALID_MSG;
×
1052
  }
1053

1054
  (void)taosThreadMutexLock(&pBse->mutex);
×
1055
  if (pCfg->blockSize > 0) {
×
1056
    pBse->cfg.blockSize = pCfg->blockSize;
×
1057
  }
1058

1059
  if (pCfg->keepDays > 0) {
×
1060
    pBse->cfg.keepDays = pCfg->keepDays;
×
1061
  }
1062

1063
  if (pCfg->compressType >= kNoCompres && pCfg->compressType <= kZxCompress) {
×
1064
    pBse->cfg.compressType = pCfg->compressType;
×
1065
  }
1066

1067
  if (pCfg->tableCacheSize >= 0) {
×
1068
    pBse->cfg.tableCacheSize = pCfg->tableCacheSize;
×
1069
  }
1070

1071
  if (pCfg->blockCacheSize >= 0) {
×
1072
    pBse->cfg.blockCacheSize = pCfg->blockCacheSize;
×
1073
  }
1074
  (void)taosThreadMutexUnlock(&pBse->mutex);
×
1075
  return code;
×
1076
}
1077

1078
int32_t bseUpdatCfgNoLock(SBse *pBse, SBseCfg *pCfg) {
×
1079
  int32_t code = 0;
×
1080
  if (pCfg == NULL) {
×
1081
    return TSDB_CODE_INVALID_MSG;
×
1082
  }
1083
  if (pCfg->blockSize > 0) {
×
1084
    pBse->cfg.blockSize = pCfg->blockSize;
×
1085
  }
1086

1087
  if (pCfg->keepDays > 0) {
×
1088
    pBse->cfg.keepDays = pCfg->keepDays;
×
1089
  }
1090

1091
  if (pCfg->compressType >= kNoCompres && pCfg->compressType <= kZxCompress) {
×
1092
    pBse->cfg.compressType = pCfg->compressType;
×
1093
  }
1094

1095
  if (pCfg->tableCacheSize >= 0) {
×
1096
    pBse->cfg.tableCacheSize = pCfg->tableCacheSize;
×
1097
  }
1098

1099
  if (pCfg->blockCacheSize >= 0) {
×
1100
    pBse->cfg.blockCacheSize = pCfg->blockCacheSize;
×
1101
  }
1102
  return code;
×
1103
}
1104
int32_t bseSetCompressType(SBse *pBse, int8_t compressType) {
×
1105
  int32_t code = 0;
×
1106
  if (compressType < kNoCompres || compressType > kZxCompress) {
×
1107
    return TSDB_CODE_INVALID_MSG;
×
1108
  }
1109
  (void)taosThreadMutexLock(&pBse->mutex);
×
1110
  pBse->cfg.compressType = compressType;
×
1111
  (void)taosThreadMutexUnlock(&pBse->mutex);
×
1112

1113
  return code;
×
1114
}
1115
int32_t bseSetBlockSize(SBse *pBse, int32_t blockSize) {
×
1116
  int32_t code = 0;
×
1117
  if (blockSize <= 0) {
×
1118
    return TSDB_CODE_INVALID_MSG;
×
1119
  }
1120
  (void)taosThreadMutexLock(&pBse->mutex);
×
1121
  pBse->cfg.blockSize = blockSize;
×
1122
  (void)taosThreadMutexUnlock(&pBse->mutex);
×
1123

1124
  return code;
×
1125
}
1126
int32_t bseSetBlockCacheSize(SBse *pBse, int32_t blockCacheSize) {
×
1127
  int32_t code = 0;
×
1128
  if (blockCacheSize <= 0) {
×
1129
    return TSDB_CODE_INVALID_MSG;
×
1130
  }
1131
  (void)taosThreadMutexLock(&pBse->mutex);
×
1132
  pBse->cfg.blockCacheSize = blockCacheSize;
×
1133

1134
  code = bseTableMgtSetBlockCacheSize(pBse->pTableMgt, blockCacheSize);
×
1135
  (void)taosThreadMutexUnlock(&pBse->mutex);
×
1136

1137
  return code;
×
1138
}
1139
int32_t bseSetTableCacheSize(SBse *pBse, int32_t tableCacheSize) {
×
1140
  int32_t code = 0;
×
1141
  if (tableCacheSize <= 0) {
×
1142
    return TSDB_CODE_INVALID_MSG;
×
1143
  }
1144
  (void)taosThreadMutexLock(&pBse->mutex);
×
1145

1146
  pBse->cfg.tableCacheSize = tableCacheSize;
×
1147
  code = bseTableMgtSetTableCacheSize(pBse->pTableMgt, tableCacheSize);
×
1148
  (void)taosThreadMutexUnlock(&pBse->mutex);
×
1149

1150
  return code;
×
1151
}
1152
int32_t bseSetKeepDays(SBse *pBse, int32_t keepDays) {
×
1153
  int32_t code = 0;
×
1154
  if (keepDays <= 0) {
×
1155
    return TSDB_CODE_INVALID_MSG;
×
1156
  }
1157
  (void)taosThreadMutexLock(&pBse->mutex);
×
1158
  pBse->cfg.keepDays = keepDays;
×
1159
  (void)taosThreadMutexUnlock(&pBse->mutex);
×
1160
  return code;
×
1161
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc