• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4905

29 Dec 2025 02:08PM UTC coverage: 65.423% (-0.3%) from 65.734%
#4905

push

travis-ci

web-flow
enh: sign connect request (#34067)

23 of 29 new or added lines in 4 files covered. (79.31%)

11614 existing lines in 186 files now uncovered.

193476 of 295730 relevant lines covered (65.42%)

115752566.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

50.6
/source/dnode/vnode/src/bse/bseMgt.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "bse.h"
17
#include "bseInc.h"
18
#include "bseSnapshot.h"
19
#include "bseTable.h"
20
#include "bseTableMgt.h"
21
#include "bseUtil.h"
22
#include "cJSON.h"
23

24
#define BSE_FMT_VER 0x1
25

26
static void bseCfgSetDefault(SBseCfg *pCfg);
27

28
static int32_t bseClear(SBse *pBse);
29
static int32_t bseInitEnv(SBse *p);
30
static int32_t bseInitStartSeq(SBse *pBse);
31
static int32_t bseRecover(SBse *pBse, int8_t rm);
32
static int32_t bseGenCommitInfo(SBse *pBse, SArray *pInfo);
33
static int32_t bseCreateTableManager(SBse *p);
34
static int32_t bseCommitDo(SBse *pBse, SArray *pFileSet);
35

36
static int32_t bseDeserialCommitInfo(SBse *pBse, char *pCurrent, SBseCommitInfo *pInfo);
37
static int32_t bseSerailCommitInfo(SBse *pBse, SArray *fileSet, char **pBuf, int32_t *len);
38
static int32_t bseReadCurrentFile(SBse *pBse, char **p, int64_t *len);
39
static int32_t bseListAllFiles(const char *path, SArray *pFiles);
40
static int32_t bseRemoveUnCommitFile(SBse *p);
41

42
static int32_t bseCreateBatchList(SBse *pBse);
43

44
static int32_t bseBatchMgtInit(SBatchMgt *pBatchMgt, SBse *pBse);
45
static int32_t bseBatchMgtGet(SBatchMgt *pBatchMgt, SBseBatch **pBatch);
46
static int32_t bseBatchMgtRecycle(SBatchMgt *pBatchMgt, SBseBatch *pBatch);
47
static void    bseBatchMgtCleanup(SBatchMgt *pBatchMgt);
48

49
static int32_t bseBatchCreate(SBseBatch **pBatch, int32_t nKeys);
50
static void    bseBatchClear(SBseBatch *pBatch);
51
static int32_t bseRecycleBatchImpl(SBatchMgt *pMgt, SBseBatch *pBatch);
52
static int32_t bseBatchMayResize(SBseBatch *pBatch, int32_t alen);
53

54
int32_t bseSerailCommitInfo(SBse *pBse, SArray *fileSet, char **pBuf, int32_t *len) {
3,444✔
55
  int32_t code = 0;
3,444✔
56
  // int32_t code = 0;
57
  int32_t line = 0;
3,444✔
58

59
  cJSON *pRoot = cJSON_CreateObject();
3,444✔
60
  cJSON *pFileSet = cJSON_CreateArray();
3,444✔
61
  if (pRoot == NULL || pFileSet == NULL) {
3,444✔
62
    code = terrno;
×
63
    TSDB_CHECK_CODE(code, line, _err);
×
64
  }
65

66
  if (cJSON_AddNumberToObject(pRoot, "fmtVer", pBse->commitInfo.fmtVer) == NULL) {
3,444✔
67
    TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_CFG, line, _err);
×
68
  }
69
  if (cJSON_AddNumberToObject(pRoot, "vgId", BSE_VGID(pBse)) == NULL) {
3,444✔
70
    TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_CFG, line, _err);
×
71
  }
72
  if (cJSON_AddNumberToObject(pRoot, "commitVer", pBse->commitInfo.commitVer) == NULL) {
3,444✔
73
    TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_CFG, line, _err);
×
74
  }
75
  if (cJSON_AddNumberToObject(pRoot, "commitSeq", pBse->commitInfo.lastSeq) == NULL) {
3,444✔
76
    TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_CFG, line, _err);
×
77
  }
78

79
  if (!cJSON_AddItemToObject(pRoot, "fileSet", pFileSet)) {
3,444✔
80
    TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_CFG, line, _err);
×
81
  }
82

83
  for (int32_t i = 0; i < taosArrayGetSize(fileSet); i++) {
6,888✔
84
    SBseLiveFileInfo *pInfo = taosArrayGet(fileSet, i);
3,444✔
85
    cJSON            *pField = cJSON_CreateObject();
3,444✔
86
    if (cJSON_AddNumberToObject(pField, "startSeq", pInfo->range.sseq) == NULL) {
3,444✔
87
      TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_CFG, line, _err);
×
88
    }
89
    if (cJSON_AddNumberToObject(pField, "endSeq", pInfo->range.eseq) == NULL) {
3,444✔
90
      TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_CFG, line, _err);
×
91
    }
92
    if (cJSON_AddNumberToObject(pField, "size", pInfo->size) == NULL) {
3,444✔
93
      TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_CFG, line, _err);
×
94
    }
95

96
    if (cJSON_AddNumberToObject(pField, "level", pInfo->level) == NULL) {
3,444✔
97
      TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_CFG, line, _err);
×
98
    }
99
    if (cJSON_AddNumberToObject(pField, "retention", pInfo->timestamp) == NULL) {
3,444✔
100
      TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_CFG, line, _err);
×
101
    }
102
    if (!cJSON_AddItemToArray(pFileSet, pField)) {
3,444✔
103
      TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_CFG, line, _err);
×
104
    }
105
  }
106

107
  char   *pSerialized = cJSON_PrintUnformatted(pRoot);
3,444✔
108
  if (pSerialized == NULL) {
3,444✔
109
    TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_CFG, line, _err);
×
110
  }
111
  int32_t sz = strlen(pSerialized);
3,444✔
112

113
  *pBuf = pSerialized;
3,444✔
114
  *len = sz;
3,444✔
115

116
_err:
3,444✔
117
  if (code != 0) {
3,444✔
118
    bseError("vgId:%d failed at line %d since %s", BSE_VGID(pBse), line, tstrerror(code));
×
119
    cJSON_Delete(pFileSet);
×
120
  }
121
  cJSON_Delete(pRoot);
3,444✔
122
  pRoot = NULL;
3,444✔
123
  return code;
3,444✔
124
}
125
int32_t bseDeserialCommitInfo(SBse *pBse, char *pCurrent, SBseCommitInfo *pInfo) {
×
126
  int32_t code = 0;
×
127
  int32_t lino = 0;
×
128
  cJSON  *pRoot = cJSON_Parse(pCurrent);
×
129
  if (pRoot == NULL) {
×
130
    bseError("vgId:%d, failed to parse current meta", BSE_VGID(pBse));
×
131
    code = TSDB_CODE_FILE_CORRUPTED;
×
132
    TSDB_CHECK_CODE(code, lino, _error);
×
133
  }
134

135
  cJSON *item = cJSON_GetObjectItem(pRoot, "fmtVer");
×
136
  if (item == NULL) {
×
137
    bseError("vgId:%d, failed to get fmtVer from current meta", BSE_VGID(pBse));
×
138
    code = TSDB_CODE_FILE_CORRUPTED;
×
139
    goto _error;
×
140
  }
141
  pInfo->fmtVer = item->valuedouble;
×
142

143
  item = cJSON_GetObjectItem(pRoot, "vgId");
×
144
  if (item == NULL) {
×
145
    bseError("vgId:%d, failed to get vgId from current meta", BSE_VGID(pBse));
×
146
    code = TSDB_CODE_FILE_CORRUPTED;
×
147
    goto _error;
×
148
  }
149
  pInfo->vgId = item->valuedouble;
×
150

151
  item = cJSON_GetObjectItem(pRoot, "commitVer");
×
152
  if (item == NULL) {
×
153
    bseError("vgId:%d, failed to get commitVer from current meta", BSE_VGID(pBse));
×
154
    code = TSDB_CODE_FILE_CORRUPTED;
×
155
    goto _error;
×
156
  }
157
  pInfo->commitVer = item->valuedouble;
×
158

159
  item = cJSON_GetObjectItem(pRoot, "commitSeq");
×
160
  if (item == NULL) {
×
161
    bseError("vgId:%d, failed to get lastSeq from current meta", BSE_VGID(pBse));
×
162
    code = TSDB_CODE_FILE_CORRUPTED;
×
163
    goto _error;
×
164
  }
165
  pInfo->lastSeq = item->valuedouble;
×
166

167
  cJSON *pFiles = cJSON_GetObjectItem(pRoot, "fileSet");
×
168
  cJSON *pField = NULL;
×
169
  cJSON_ArrayForEach(pField, pFiles) {
×
170
    cJSON *pStartSeq = cJSON_GetObjectItem(pField, "startSeq");
×
171
    cJSON *pEndSeq = cJSON_GetObjectItem(pField, "endSeq");
×
172
    cJSON *pFileSize = cJSON_GetObjectItem(pField, "size");
×
173
    cJSON *pLevel = cJSON_GetObjectItem(pField, "level");
×
174
    cJSON *pRetentionTs = cJSON_GetObjectItem(pField, "retention");
×
175
    if (pStartSeq == NULL || pEndSeq == NULL || pFileSize == NULL || pLevel == NULL || pRetentionTs == NULL) {
×
176
      bseError("vgId:%d, failed to get field from files", BSE_VGID(pBse));
×
177
      code = TSDB_CODE_FILE_CORRUPTED;
×
178
      goto _error;
×
179
    }
180

181
    SBseLiveFileInfo info = {0};
×
182
    info.range.sseq = pStartSeq->valuedouble;
×
183
    info.range.eseq = pEndSeq->valuedouble;
×
184
    info.size = pFileSize->valuedouble;
×
185
    info.level = pLevel->valuedouble;
×
186
    info.timestamp = pRetentionTs->valuedouble;
×
187

188
    if (taosArrayPush(pInfo->pFileList, &info) == NULL) {
×
189
      code = terrno;
×
190
      goto _error;
×
191
    }
192
  }
193
_error:
×
194
  if (code != 0) {
×
195
    bseError("vgId:%d failed to get commit info from current meta since %s", BSE_VGID(pBse), tstrerror(code));
×
196
  }
197
  cJSON_Delete(pRoot);
×
198
  return code;
×
199
}
200

201
int32_t bseReadCurrentFile(SBse *pBse, char **p, int64_t *len) {
3,580,479✔
202
  int32_t   code = 0;
3,580,479✔
203
  int32_t   lino = 0;
3,580,479✔
204
  TdFilePtr fd = NULL;
3,580,479✔
205
  int64_t   sz = 0;
3,580,479✔
206
  char      name[TSDB_FILENAME_LEN] = {0};
3,580,479✔
207

208
  char *pCurrent = NULL;
3,580,479✔
209

210
  bseBuildCurrentFullName(pBse, name);
3,580,479✔
211
  if (taosCheckExistFile(name) == 0) {
3,580,401✔
212
    bseInfo("vgId:%d, no current meta file found, skip recover", BSE_VGID(pBse));
3,580,479✔
213
    return 0;
3,580,479✔
214
  }
215
  code = taosStatFile(name, &sz, NULL, NULL);
×
216
  TSDB_CHECK_CODE(code, lino, _error);
×
217

218
  fd = taosOpenFile(name, TD_FILE_READ);
×
219
  if (fd == NULL) {
×
220
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
221
  }
222

223
  pCurrent = (char *)taosMemoryCalloc(1, sz + 1);
×
224
  if (pCurrent == NULL) {
×
225
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
226
  }
227

228
  int64_t nread = taosReadFile(fd, pCurrent, sz);
×
229
  if (nread != sz) {
×
230
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
231
  }
232

233
  if (taosCloseFile(&fd) != 0) {
×
234
    bseError("vgId:%d failed to close file %s since %s", BSE_VGID(pBse), name, tstrerror(terrno));
×
235
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
236
  }
237

238
  *p = pCurrent;
×
239
  *len = sz;
×
240

241
_error:
×
242
  if (code != 0) {
×
243
    bseError("vgId:%d, failed to read current at line %d since %s", BSE_VGID(pBse), lino, tstrerror(code));
×
244
    if (taosCloseFile(&fd) != 0) {
×
245
      bseError("vgId:%d failed to close file %s since %s", BSE_VGID(pBse), name, tstrerror(terrno));
×
246
    }
247
    taosMemoryFree(pCurrent);
×
248
  }
249
  return code;
×
250
}
251

252
int32_t bseListAllFiles(const char *path, SArray *pFiles) {
×
253
  SBseLiveFileInfo info = {0};
×
254

255
  int32_t code = 0;
×
256
  int32_t lino = 0;
×
257

258
  TdDirPtr pDir = taosOpenDir(path);
×
259
  if (pDir == NULL) {
×
260
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
261
  }
262

263
  TdDirEntryPtr de = NULL;
×
264
  while ((de = taosReadDir(pDir)) != NULL) {
×
265
    char *name = taosGetDirEntryName(de);
×
266
    if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue;
×
267

268
    if (strstr(name, BSE_DATA_SUFFIX) == NULL) {
×
269
      continue;
×
270
    }
271
    SBseLiveFileInfo info = {0};
×
272
    memcpy(info.name, name, strlen(name));
×
273

274
    if (taosArrayPush(pFiles, &info) == NULL) {
×
275
      code = terrno;
×
276
      goto _error;
×
277
    }
278
  }
279

280
_error:
×
281
  if (code != 0) {
×
282
    bseError("failed to list files at line %d since %s", lino, tstrerror(code));
×
283
  }
284
  if ((code = taosCloseDir(&pDir)) != 0) {
×
285
    bseError("failed to close dir %s since %s", path, tstrerror(code));
×
286
  }
287
  return code;
×
288
}
289

290
int32_t removeUnCommitFile(SBse *p, SArray *pCommitedFiles, SArray *pAllFiles) {
×
291
  int32_t code = 0;
×
292
  for (int32_t i = 0; i < taosArrayGetSize(pAllFiles); i++) {
×
293
    SBseLiveFileInfo *pInfo = taosArrayGet(pAllFiles, i);
×
294
    int32_t           found = 0;
×
295
    for (int32_t j = 0; j < taosArrayGetSize(pCommitedFiles); j++) {
×
296
      SBseLiveFileInfo *pCommited = taosArrayGet(pCommitedFiles, j);
×
297
      if (strcmp(pInfo->name, pCommited->name) == 0) {
×
298
        found = 1;
×
299
        break;
×
300
      }
301
    }
302
    if (found == 0) {
×
303
      char buf[TSDB_FILENAME_LEN] = {0};
×
304
      bseBuildFullName(p, pInfo->name, buf);
×
305

306
      code = taosRemoveFile(buf);
×
307
      if (code != 0) {
×
308
        bseError("vgId:%d failed to remove file %s since %s", BSE_VGID(p), pInfo->name, tstrerror(code));
×
309
      } else {
310
        bseInfo("vgId:%d remove file %s", BSE_VGID(p), pInfo->name);
×
311
      }
312
    }
313
  }
314

315
  return code;
×
316
}
317
int32_t bseRemoveUnCommitFile(SBse *p) {
×
318
  int32_t code = 0;
×
319

320
  SArray *pFiles = taosArrayInit(64, sizeof(SBseLiveFileInfo));
×
321
  if (pFiles == NULL) {
×
322
    return terrno;
×
323
  }
324

325
  code = bseListAllFiles(p->path, pFiles);
×
326
  if (code != 0) {
×
327
    taosArrayDestroy(pFiles);
×
328
    return code;
×
329
  }
330
  code = removeUnCommitFile(p, p->commitInfo.pFileList, pFiles);
×
331
  taosArrayDestroy(pFiles);
×
332
  return code;
×
333
}
334

335
int32_t bseInitStartSeq(SBse *pBse) {
3,580,479✔
336
  int32_t code = 0;
3,580,479✔
337
  int64_t lastSeq = 0;
3,580,479✔
338

339
  SBseLiveFileInfo *pLastFile = taosArrayGetLast(pBse->commitInfo.pFileList);
3,580,479✔
340
  if (pLastFile != NULL) {
3,580,479✔
341
    lastSeq = pLastFile->range.eseq;
×
342
  }
343

344
  pBse->seq = lastSeq + 1;
3,580,479✔
345
  return code;
3,580,479✔
346
}
347

348
int32_t bseRecover(SBse *pBse, int8_t rmUnCommited) {
3,580,479✔
349
  int32_t code = 0;
3,580,479✔
350
  int32_t lino = 0;
3,580,479✔
351
  char   *pCurrent = NULL;
3,580,479✔
352
  int64_t len = 0;
3,580,479✔
353

354
  code = bseReadCurrentFile(pBse, &pCurrent, &len);
3,580,479✔
355
  TSDB_CHECK_CODE(code, lino, _error);
3,580,479✔
356

357
  if (len == 0) {
3,580,479✔
358
    bseInfo("vgId:%d, no current meta file found, no need to recover", BSE_VGID(pBse));
3,580,479✔
359
  } else {
360
    code = bseDeserialCommitInfo(pBse, pCurrent, &pBse->commitInfo);
×
361
    TSDB_CHECK_CODE(code, lino, _error);
×
362

363
    if (pBse->commitInfo.fmtVer != BSE_FMT_VER) {
×
364
      bseError("vgId:%d, current meta file version %d not match with %d", BSE_VGID(pBse), pBse->commitInfo.fmtVer,
×
365
               BSE_FMT_VER);
366
      code = TSDB_CODE_FILE_CORRUPTED;
×
367
      goto _error;
×
368
    }
369

370
    if (taosArrayGetSize(pBse->commitInfo.pFileList) > 0) {
×
371
      SBseLiveFileInfo *pLast = taosArrayGetLast(pBse->commitInfo.pFileList);
×
372
      code = bseTableMgtRecoverTable(pBse->pTableMgt, pLast);
×
373
      TSDB_CHECK_CODE(code, lino, _error);
×
374

375
      code = bseTableMgtSetLastTableId(pBse->pTableMgt, pLast->timestamp);
×
376
    }
377
  }
378

379
  code = bseInitStartSeq(pBse);
3,580,479✔
380
  TSDB_CHECK_CODE(code, lino, _error);
3,580,479✔
381

382
_error:
3,580,479✔
383
  if (code != 0) {
3,580,479✔
384
    bseError("vgId:%d, failed to recover since %s", BSE_VGID(pBse), tstrerror(code));
×
385
  }
386
  taosMemoryFree(pCurrent);
3,580,479✔
387
  return code;
3,580,479✔
388
}
389
int32_t bseInitLock(SBse *pBse) {
3,579,783✔
390
  TdThreadRwlockAttr attr;
3,579,390✔
391
  (void)taosThreadRwlockAttrInit(&attr);
3,579,783✔
392
  (void)taosThreadRwlockAttrSetKindNP(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
3,580,161✔
393
  (void)taosThreadRwlockInit(&pBse->rwlock, &attr);
3,580,424✔
394
  (void)taosThreadRwlockAttrDestroy(&attr);
3,579,774✔
395

396
  (void)taosThreadMutexInit(&pBse->mutex, NULL);
3,580,041✔
397
  return 0;
3,579,783✔
398
}
399

400
int32_t bseInitEnv(SBse *p) {
3,579,575✔
401
  int32_t code = 0;
3,579,575✔
402
  int32_t lino = 0;
3,579,575✔
403

404
  code = bseInitLock(p);
3,579,575✔
405
  TSDB_CHECK_CODE(code, lino, _err);
3,579,783✔
406

407
  code = taosMkDir(p->path);
3,579,783✔
408
  TSDB_CHECK_CODE(code, lino, _err);
3,580,479✔
409
_err:
3,580,479✔
410
  if (code != 0) {
3,580,479✔
411
    bseError("failed to init bse env at line %d since %s", lino, tstrerror(code));
×
412
  }
413
  return code;
3,580,479✔
414
}
415

416
int32_t bseCreateTableManager(SBse *p) { return bseTableMgtCreate(p, (void **)&p->pTableMgt); }
3,580,479✔
417

418
int32_t bseCreateCommitInfo(SBse *pBse) {
3,580,096✔
419
  SBseCommitInfo *pCommit = &pBse->commitInfo;
3,580,096✔
420
  pCommit->pFileList = taosArrayInit(64, sizeof(SBseLiveFileInfo));
3,580,096✔
421
  if (pCommit->pFileList == NULL) {
3,580,479✔
422
    return terrno;
×
423
  }
424

425
  pCommit->fmtVer = BSE_FMT_VER;
3,580,479✔
426
  return 0;
3,580,479✔
427
}
428

429
void bseCfgSetDefault(SBseCfg *pCfg) {
3,580,309✔
430
  if (pCfg == NULL) {
3,580,309✔
431
    return;
×
432
  }
433
  if (pCfg->compressType == 0) {
3,580,309✔
434
    pCfg->compressType = kLZ4Compres;
3,579,782✔
435
  }
436
  if (pCfg->blockSize == 0) {
3,581,176✔
437
    pCfg->blockSize = BSE_DEFAULT_BLOCK_SIZE;
3,580,096✔
438
  }
439

440
  if (pCfg->keepDays == 0) {
3,578,907✔
441
    pCfg->keepDays = 10;
×
442
  }
443

444
  if (pCfg->keeps == 0) {
3,579,774✔
445
    pCfg->keeps = 365;
×
446
  }
447

448
  if (pCfg->precision == 0) {
3,579,926✔
449
    pCfg->precision = TSDB_TIME_PRECISION_MILLI;  // default precision is 1 second
3,467,846✔
450
  }
451
}
452

453
int32_t bseOpen(const char *path, SBseCfg *pCfg, SBse **pBse) {
3,576,007✔
454
  int32_t lino = 0;
3,576,007✔
455
  int32_t code = 0;
3,576,007✔
456

457
  SBse *p = taosMemoryCalloc(1, sizeof(SBse));
3,576,007✔
458
  if (p == NULL) {
3,580,096✔
459
    TSDB_CHECK_CODE(code = terrno, lino, _err);
×
460
  }
461

462
  p->cfg = *pCfg;
3,580,096✔
463
  bseCfgSetDefault(&p->cfg);
3,580,479✔
464

465
  tstrncpy(p->path, path, sizeof(p->path));
3,578,750✔
466

467
  code = bseInitEnv(p);
3,579,838✔
468
  TSDB_CHECK_CODE(code, lino, _err);
3,580,479✔
469

470
  code = bseCreateTableManager(p);
3,580,479✔
471
  TSDB_CHECK_CODE(code, lino, _err);
3,580,479✔
472

473
  code = bseCreateCommitInfo(p);
3,580,479✔
474
  TSDB_CHECK_CODE(code, lino, _err);
3,580,479✔
475

476
  code = bseBatchMgtInit(p->batchMgt, p);
3,580,479✔
477
  TSDB_CHECK_CODE(code, lino, _err);
3,580,479✔
478

479
  code = bseRecover(p, 1);
3,580,479✔
480
  TSDB_CHECK_CODE(code, lino, _err);
3,580,479✔
481

482
  *pBse = p;
3,580,479✔
483
_err:
3,580,479✔
484
  if (code != 0) {
3,580,479✔
485
    bseClose(p);
×
486
    bseError("vgId:%d failed to open bse at line %d since %s", BSE_VGID(p), lino, tstrerror(code));
×
487
  }
488
  return code;
3,580,479✔
489
}
490

491
static int32_t bseClear(SBse *pBse) {
×
492
  int32_t code = 0;
×
493
  int32_t lino = 0;
×
494

495
  code = bseTableMgtClear(pBse->pTableMgt);
×
496
  TSDB_CHECK_CODE(code, lino, _error);
×
497

498
_error:
×
499
  if (code != 0) {
×
500
    bseError("vgId:%d failed to clear bse at line %d since %s", BSE_VGID(pBse), lino, tstrerror(code));
×
501
  }
502
  return code;
×
503
}
504
void bseClose(SBse *pBse) {
3,580,326✔
505
  int32_t code;
506
  if (pBse == NULL) {
3,580,326✔
507
    return;
×
508
  }
509
  bseTableMgtCleanup(pBse->pTableMgt);
3,580,326✔
510
  bseBatchMgtCleanup(pBse->batchMgt);
3,580,479✔
511

512
  taosArrayDestroy(pBse->commitInfo.pFileList);
3,580,479✔
513
  (void)taosThreadMutexDestroy(&pBse->mutex);
3,580,305✔
514
  (void)taosThreadRwlockDestroy(&pBse->rwlock);
3,580,479✔
515

516
  taosMemoryFree(pBse);
3,580,121✔
517
  return;
3,580,305✔
518
}
519

520
int32_t bseGet(SBse *pBse, uint64_t seq, uint8_t **pValue, int32_t *len) {
30,685,946✔
521
  int32_t line = 0;
30,685,946✔
522
  int32_t code = 0;
30,685,946✔
523

524
  (void)taosThreadRwlockRdlock(&pBse->rwlock);
30,685,946✔
525
  code = bseTableMgtGet(pBse->pTableMgt, seq, pValue, len);
30,685,946✔
526
  (void)taosThreadRwlockUnlock(&pBse->rwlock);
30,685,946✔
527

528
  if (code != 0) {
30,685,946✔
529
    bseError("vgId:%d failed to get value from seq %" PRId64 " at line %d since %s", BSE_VGID(pBse), seq, line,
×
530
             tstrerror(code));
531
  } else {
532
    bseDebug("vgId:%d get value from seq %" PRId64 " at line %d", BSE_VGID(pBse), seq, line);
30,685,946✔
533
  }
534
  return code;
30,685,946✔
535
}
536

537
int32_t bseCommitBatch(SBse *pBse, SBseBatch *pBatch) {
21,017✔
538
  int32_t code = 0;
21,017✔
539
  int32_t lino = 0;
21,017✔
540
  (void)taosThreadMutexLock(&pBse->mutex);
21,017✔
541
  pBatch->commited = 1;
21,017✔
542

543
  while (!BSE_QUEUE_IS_EMPTY(&pBse->batchMgt->queue)) {
42,034✔
544
    bsequeue *h = BSE_QUEUE_HEAD(&pBse->batchMgt->queue);
21,017✔
545

546
    SBseBatch *p = BSE_QUEUE_DATA(h, SBseBatch, node);
21,017✔
547
    if (p->commited == 1) {
21,017✔
548
      BSE_QUEUE_REMOVE(&p->node);
21,017✔
549

550
      code = bseTableMgtAppend(pBse->pTableMgt, pBatch);
21,017✔
551
      TSDB_CHECK_CODE(code, lino, _error);
21,017✔
552

553
      code = bseRecycleBatchImpl(pBse->batchMgt, p);
21,017✔
554
      TSDB_CHECK_CODE(code, lino, _error);
21,017✔
555
    } else {
556
      break;
×
557
    }
558
  }
559
_error:
21,017✔
560
  if (code != 0) {
21,017✔
561
    bseError("vgId:%d failed to append batch at line %d since %s", BSE_VGID(pBse), lino, tstrerror(code));
×
562
  }
563
  (void)taosThreadMutexUnlock(&pBse->mutex);
21,017✔
564
  return code;
21,017✔
565
}
566

567
int32_t bseReload(SBse *pBse) {
×
568
  int32_t code = 0;
×
569
  int32_t lino = 0;
×
570

571
  (void)taosThreadMutexLock(&pBse->mutex);
×
572
  code = bseClear(pBse);
×
573
  TSDB_CHECK_CODE(code, lino, _error);
×
574

575
  code = bseRecover(pBse, 1);
×
576
  TSDB_CHECK_CODE(code, lino, _error);
×
577

578
_error:
×
579
  if (code != 0) {
×
580
    bseError("vgId:%d failed to reload bse at line %d since %s", BSE_VGID(pBse), lino, tstrerror(code));
×
581
  }
582
  (void)taosThreadMutexUnlock(&pBse->mutex);
×
583
  return code;
×
584
}
585
int32_t bseTrim(SBse *pBse) {
×
586
  int32_t code = 0;
×
587
  return code;
×
588
}
589

590
int32_t bseRecycleBatch(SBse *pBse, SBseBatch *pBatch) {
×
591
  int32_t code = 0;
×
592
  if (pBatch == NULL) return code;
×
593

594
  (void)taosThreadMutexLock(&pBse->mutex);
×
595
  code = bseRecycleBatchImpl(pBse->batchMgt, pBatch);
×
596
  (void)taosThreadMutexUnlock(&pBse->mutex);
×
597
  return code;
×
598
}
599

600
static int32_t bseBatchMgtInit(SBatchMgt *pBatchMgt, SBse *pBse) {
3,578,012✔
601
  int32_t code = 0;
3,578,012✔
602
  int32_t lino = 0;
3,578,012✔
603

604
  pBatchMgt->pBse = pBse;
3,578,012✔
605

606
  pBatchMgt->pBatchList = taosArrayInit(2, sizeof(SBseBatch *));
3,580,479✔
607
  if (pBatchMgt->pBatchList == NULL) {
3,580,479✔
608
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
609
  }
610

611
  SBseBatch *b = NULL;
3,580,479✔
612
  code = bseBatchCreate(&b, 1024);
3,580,479✔
613
  TSDB_CHECK_CODE(code, lino, _error);
3,580,479✔
614

615
  if (taosArrayPush(pBatchMgt->pBatchList, &b) == NULL) {
7,160,958✔
616
    bseBatchDestroy(b);
×
617
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
618
  }
619

620
  BSE_QUEUE_INIT(&pBatchMgt->queue);
3,580,479✔
621
_error:
3,580,479✔
622
  if (code != 0) {
3,580,479✔
623
    if (pBatchMgt->pBatchList != NULL) {
×
624
      for (int32_t i = 0; i < taosArrayGetSize(pBatchMgt->pBatchList); i++) {
×
625
        SBseBatch **p = taosArrayGet(pBatchMgt->pBatchList, i);
×
626
        bseBatchDestroy(*p);
×
627
      }
628
      taosArrayDestroy(pBatchMgt->pBatchList);
×
629
    }
630
    bseError("vgId:%d failed to init batch mgt at line %d since %s", BSE_VGID(pBse), lino, tstrerror(code));
×
631
  }
632
  return code;
3,580,479✔
633
}
634

635
static void bseBatchMgtCleanup(SBatchMgt *pBatchMgt) {
3,580,305✔
636
  if (pBatchMgt == NULL) return;
3,580,305✔
637

638
  for (int32_t i = 0; i < taosArrayGetSize(pBatchMgt->pBatchList); i++) {
7,160,784✔
639
    SBseBatch **p = taosArrayGet(pBatchMgt->pBatchList, i);
3,580,479✔
640
    bseBatchDestroy(*p);
3,580,479✔
641
  }
642

643
  taosArrayDestroy(pBatchMgt->pBatchList);
3,580,305✔
644
}
645

646
static int32_t bseBatchMgtRecycle(SBatchMgt *pBatchMgt, SBseBatch *pBatch) {
21,017✔
647
  int32_t code = 0;
21,017✔
648
  if (pBatch == NULL) return code;
21,017✔
649

650
  bseBatchClear(pBatch);
21,017✔
651

652
  if (taosArrayPush(pBatchMgt->pBatchList, &pBatch) == NULL) {
42,034✔
653
    bseBatchDestroy(pBatch);
×
654
    code = terrno;
×
655
  }
656
  if (code != 0) {
21,017✔
657
    bseError("vgId:%d failed to recycle batch since %s", BSE_VGID((SBse *)pBatchMgt->pBse), tstrerror(code));
×
658
  }
659
  return code;
21,017✔
660
}
661
static int32_t bseBatchMgtGet(SBatchMgt *pBatchMgt, SBseBatch **pBatch) {
21,017✔
662
  int32_t code = 0;
21,017✔
663
  int32_t lino = 0;
21,017✔
664

665
  SBseBatch **p;
666

667
  if (taosArrayGetSize(pBatchMgt->pBatchList) > 0) {
21,017✔
668
    p = (SBseBatch **)taosArrayPop(pBatchMgt->pBatchList);
21,017✔
669
  } else {
670
    SBseBatch *b = NULL;
×
671
    code = bseBatchCreate(&b, 1024);
×
672
    TSDB_CHECK_CODE(code, lino, _error);
×
673

674
    if (taosArrayPush(pBatchMgt->pBatchList, &b) == NULL) {
×
675
      bseBatchDestroy(b);
×
676
      TSDB_CHECK_CODE(code = terrno, lino, _error);
×
677
    }
678
    p = (SBseBatch **)taosArrayPop(pBatchMgt->pBatchList);
×
679
  }
680

681
  BSE_QUEUE_PUSH(&pBatchMgt->queue, &((*p)->node));
21,017✔
682
  *pBatch = *p;
21,017✔
683

684
_error:
21,017✔
685
  if (code != 0) {
21,017✔
686
    bseInfo("vgId:%d failed to get bse batch at line %d since %s", BSE_VGID((SBse *)pBatchMgt->pBse), lino,
×
687
            tstrerror(code));
688
  }
689
  return code;
21,017✔
690
}
691

692
int32_t bseRecycleBatchImpl(SBatchMgt *pMgt, SBseBatch *pBatch) {
21,017✔
693
  // code
694
  return bseBatchMgtRecycle(pMgt, pBatch);
21,017✔
695
}
696

697
int32_t bseBatchCreate(SBseBatch **pBatch, int32_t nKeys) {
3,580,479✔
698
  int32_t    code = 0;
3,580,479✔
699
  int32_t    lino = 0;
3,580,479✔
700
  SBseBatch *p = taosMemoryCalloc(1, sizeof(SBseBatch));
3,580,479✔
701
  if (p == NULL) {
3,580,479✔
702
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
703
  }
704

705
  p->len = 0;
3,580,479✔
706
  p->seq = 0;
3,580,479✔
707
  p->cap = 1024;
3,580,479✔
708
  p->buf = taosMemCalloc(1, p->cap);
3,580,479✔
709
  if (p->buf == NULL) {
3,580,479✔
710
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
711
  }
712

713
  p->pSeq = taosArrayInit(nKeys, sizeof(SBlockItemInfo));
3,580,479✔
714
  if (p->pSeq == NULL) {
3,580,479✔
715
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
716
  }
717

718
  BSE_QUEUE_INIT(&p->node);
3,580,479✔
719

720
  *pBatch = p;
3,580,479✔
721

722
_error:
3,580,479✔
723
  if (code != 0) {
3,580,479✔
724
    bseError("failed to create bse batch since %s at line %d", tstrerror(code), lino);
×
725
    bseBatchDestroy(p);
×
726
  }
727
  return code;
3,580,479✔
728
}
729
int32_t bseBatchSetParam(SBseBatch *pBatch, int64_t seq, int32_t cap) {
21,017✔
730
  pBatch->seq = seq;
21,017✔
731
  return taosArrayEnsureCap(pBatch->pSeq, cap);
21,017✔
732
}
733
int32_t bseBatchInit(SBse *pBse, SBseBatch **pBatch, int32_t nKeys) {
21,017✔
734
  int32_t    code = 0;
21,017✔
735
  int32_t    lino = 0;
21,017✔
736
  SBseBatch *p = NULL;
21,017✔
737
  uint64_t   sseq = 0;
21,017✔
738

739
  // atomic later
740
  (void)taosThreadMutexLock(&pBse->mutex);
21,017✔
741
  sseq = pBse->seq;
21,017✔
742
  pBse->seq += nKeys;
21,017✔
743

744
  code = bseBatchMgtGet(pBse->batchMgt, &p);
21,017✔
745
  (void)taosThreadMutexUnlock(&pBse->mutex);
21,017✔
746

747
  bseDebug("vgId:%d bse seq start from: %" PRId64 " to %" PRId64 "", BSE_VGID(pBse), sseq, sseq + nKeys - 1);
21,017✔
748
  TSDB_CHECK_CODE(code, lino, _error);
21,017✔
749

750
  code = bseBatchSetParam(p, sseq, nKeys);
21,017✔
751
  TSDB_CHECK_CODE(code, lino, _error);
21,017✔
752

753
  p->startSeq = sseq;
21,017✔
754
  p->pBse = pBse;
21,017✔
755
  *pBatch = p;
21,017✔
756
_error:
21,017✔
757
  if (code != 0) {
21,017✔
758
    bseError("vgId:%d failed to build batch since %s", BSE_VGID((SBse *)p->pBse), tstrerror(code));
×
759
    bseBatchDestroy(p);
×
760
  }
761
  return code;
21,017✔
762
}
763

764
int32_t bseBatchPut(SBseBatch *pBatch, int64_t *seq, uint8_t *value, int32_t len) {
20,461,046✔
765
  int32_t code = 0;
20,461,046✔
766
  int32_t lino = 0;
20,461,046✔
767
  int32_t offset = 0;
20,461,046✔
768

769
  int64_t lseq = pBatch->seq;
20,461,046✔
770

771
  code = bseBatchMayResize(pBatch, pBatch->len + len);
20,461,046✔
772
  TSDB_CHECK_CODE(code, lino, _error);
20,461,046✔
773

774
  uint8_t *p = pBatch->buf + pBatch->len;
20,461,046✔
775
  pBatch->len += taosEncodeBinary((void **)&p, value, len);
20,461,046✔
776

777
  SBlockItemInfo info = {.size = len, .seq = lseq};
20,461,046✔
778
  if (taosArrayPush(pBatch->pSeq, &info) == NULL) {
40,922,092✔
779
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
780
  }
781

782
  pBatch->seq++;
20,461,046✔
783
  pBatch->num++;
20,461,046✔
784

785
  *seq = lseq;
20,461,046✔
786
  bseDebug("succ to put seq %" PRId64 " to batch", lseq);
20,461,046✔
787

788
_error:
20,461,046✔
789
  if (code != 0) {
20,461,046✔
790
    bseError("vgId:%d failed to put value by seq %" PRId64 " at line %d since %s", BSE_VGID((SBse *)pBatch->pBse), lseq,
×
791
             lino, tstrerror(code));
792
  }
793
  return code;
20,461,046✔
794
}
795

796
int32_t bseBatchGetSize(SBseBatch *pBatch, int32_t *sz) {
×
797
  int32_t code = 0;
×
798

799
  if (pBatch == NULL) return TSDB_CODE_INVALID_MSG;
×
800
  *sz = pBatch->len;
×
801

802
  return code;
×
803
}
804

805
int32_t bseBatchExccedLimit(SBseBatch *pBatch) {
×
806
  if (pBatch == NULL) return 0;
×
807
  SBse *pBse = pBatch->pBse;
×
808
  if ((pBatch->len + 128) >= (BSE_BLOCK_SIZE(pBse) >> 2)) {
×
809
    return 1;
×
810
  }
811
  return 0;
×
812
}
813

814
int32_t bseBatchGet(SBseBatch *pBatch, uint64_t seq, uint8_t **pValue, int32_t *len) {
×
815
  int32_t code = 0;
×
816
  return 0;
×
817
}
818
void bseBatchClear(SBseBatch *pBatch) {
21,017✔
819
  pBatch->len = 0;
21,017✔
820
  pBatch->num = 0;
21,017✔
821
  pBatch->seq = 0;
21,017✔
822
  pBatch->commited = 0;
21,017✔
823
  BSE_QUEUE_REMOVE(&pBatch->node);
21,017✔
824
  taosArrayClear(pBatch->pSeq);
21,017✔
825
}
21,017✔
826
void bseBatchDestroy(SBseBatch *pBatch) {
3,579,530✔
827
  if (pBatch == NULL) return;
3,579,530✔
828

829
  int32_t code = 0;
3,579,530✔
830
  taosMemoryFree(pBatch->buf);
3,579,530✔
831
  taosArrayDestroy(pBatch->pSeq);
3,580,305✔
832
  BSE_QUEUE_REMOVE(&pBatch->node);
3,580,479✔
833

834
  taosMemoryFree(pBatch);
3,579,811✔
835
}
836

837
int32_t bseBatchMayResize(SBseBatch *pBatch, int32_t alen) {
20,461,046✔
838
  int32_t lino = 0;
20,461,046✔
839
  int32_t code = 0;
20,461,046✔
840
  if (alen > pBatch->cap) {
20,461,046✔
841
    int32_t cap = pBatch->cap;
3,787✔
842
    while (cap < alen) {
7,574✔
843
      cap <<= 1;
3,787✔
844
    }
845

846
    uint8_t *buf = taosMemRealloc(pBatch->buf, cap);
3,787✔
847
    if (buf == NULL) {
3,787✔
848
      TSDB_CHECK_CODE(code = terrno, lino, _error);
×
849
    }
850

851
    pBatch->cap = cap;
3,787✔
852
    pBatch->buf = buf;
3,787✔
853
  } else {
854
    return code;
20,457,259✔
855
  }
856
_error:
3,787✔
857
  if (code != 0) {
3,787✔
858
    bseError("failed to resize batch buffer since %s at line %d", tstrerror(code), lino);
×
859
  }
860
  return code;
3,787✔
861
}
862

863
static int32_t seqComparFunc(const void *p1, const void *p2) {
×
864
  uint64_t pu1 = *(const uint64_t *)p1;
×
865
  uint64_t pu2 = *(const uint64_t *)p2;
×
866
  if (pu1 == pu2) {
×
867
    return 0;
×
868
  } else {
869
    return (pu1 < pu2) ? -1 : 1;
×
870
  }
871
}
872
int32_t bseMultiGet(SBse *pBse, SArray *pKey, SArray *ppValue) {
×
873
  int32_t code = 0;
×
874
  taosSort(pKey->pData, taosArrayGetSize(pKey), sizeof(int64_t), seqComparFunc);
×
875
  (void)taosThreadMutexLock(&pBse->mutex);
×
876
  (void)taosThreadMutexUnlock(&pBse->mutex);
×
877
  return code;
×
878
}
879
// int32_t bseIterate(SBse *pBse, uint64_t start, uint64_t end, SArray *pValue) {
880
//   int32_t code = 0;
881
//   taosThreadMutexLock(&pBse->mutex);
882
//   taosThreadMutexUnlock(&pBse->mutex);
883
//   return code;
884
// }
885

886
int32_t bseGenCommitInfo(SBse *pBse, SArray *pFileSet) {
3,444✔
887
  int32_t   code = 0;
3,444✔
888
  int32_t   lino = 0;
3,444✔
889
  char      buf[TSDB_FILENAME_LEN] = {0};
3,444✔
890
  char     *pBuf = NULL;
3,444✔
891
  int32_t   len = 0;
3,444✔
892
  TdFilePtr fd = NULL;
3,444✔
893

894
  code = bseSerailCommitInfo(pBse, pFileSet, &pBuf, &len);
3,444✔
895
  TSDB_CHECK_CODE(code, lino, _error);
3,444✔
896

897
  bseBuildTempCurrentFullName(pBse, buf);
3,444✔
898

899
  fd = taosOpenFile(buf, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
3,444✔
900
  if (fd == NULL) {
3,444✔
901
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
902
  }
903

904
  int64_t nwrite = taosWriteFile(fd, pBuf, len);
3,444✔
905
  if (nwrite != len) {
3,444✔
906
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
907
  }
908

909
  code = taosFsyncFile(fd);
3,444✔
910
  TSDB_CHECK_CODE(code, lino, _error);
3,444✔
911

912
_error:
3,444✔
913
  if (code != 0) {
3,444✔
914
    bseError("vgId:%d failed to gen commit info since %s", BSE_VGID(pBse), tstrerror(code));
×
915
  }
916
  taosMemoryFree(pBuf);
3,444✔
917

918
  if (taosCloseFile(&fd) != 0) {
3,444✔
919
    bseError("vgId:%d failed to close file %s since %s", BSE_VGID(pBse), buf, tstrerror(terrno));
×
920
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
921
  }
922
  return code;
3,444✔
923
}
924

925
int32_t bseCommitFinish(SBse *pBse) {
3,444✔
926
  int32_t code = 0;
3,444✔
927

928
  char buf[TSDB_FILENAME_LEN] = {0};
3,444✔
929
  char tbuf[TSDB_FILENAME_LEN] = {0};
3,444✔
930

931
  bseBuildCurrentFullName(pBse, buf);
3,444✔
932
  bseBuildTempCurrentFullName(pBse, tbuf);
3,444✔
933

934
  code = taosRenameFile(tbuf, buf);
3,444✔
935
  return code;
3,444✔
936
}
937
int32_t bseCommitDo(SBse *pBse, SArray *pFileSet) {
3,444✔
938
  int32_t code = 0;
3,444✔
939
  int32_t lino = 0;
3,444✔
940

941
  code = bseGenCommitInfo(pBse, pFileSet);
3,444✔
942
  TSDB_CHECK_CODE(code, lino, _error);
3,444✔
943

944
  code = bseCommitFinish(pBse);
3,444✔
945
  TSDB_CHECK_CODE(code, lino, _error);
3,444✔
946
_error:
3,444✔
947
  if (code != 0) {
3,444✔
948
    bseError("vgId:%d failed to commit at line %d since %s", BSE_VGID(pBse), lino, tstrerror(code));
×
949
  }
950
  return code;
3,444✔
951
}
952

953
int32_t bseUpdateCommitInfo(SBse *pBse, SBseLiveFileInfo *pInfo, SArray **ppResult) {
3,444✔
954
  int32_t code = 0;
3,444✔
955
  int32_t lino = 0;
3,444✔
956

957
  (void)taosThreadMutexLock(&pBse->mutex);
3,444✔
958

959
  SBseCommitInfo *pCommit = &pBse->commitInfo;
3,444✔
960

961
  int32_t fileListSize = taosArrayGetSize(pCommit->pFileList);
3,444✔
962
  if (fileListSize == 0) {
3,444✔
963
    if (taosArrayPush(pCommit->pFileList, pInfo) == NULL) {
4,724✔
964
      TSDB_CHECK_CODE(code = terrno, lino, _error);
×
965
    }
966
  } else {
967
    SBseLiveFileInfo *pLast = taosArrayGetLast(pCommit->pFileList);
1,082✔
968
    if (pLast->timestamp == pInfo->timestamp) {
1,082✔
969
      *pLast = *pInfo;
1,082✔
970
    } else {
971
      if (taosArrayPush(pCommit->pFileList, pInfo) == NULL) {
×
972
        TSDB_CHECK_CODE(code = terrno, lino, _error);
×
973
      }
974
    }
975
  }
976

977
  code = bseGetAliveFileList(pBse, ppResult, 0);
3,444✔
978
  TSDB_CHECK_CODE(code, lino, _error);
3,444✔
979

980
_error:
3,444✔
981
  if (code != 0) {
3,444✔
982
    bseError("vgId:%d failed to update commit info since %s", BSE_VGID(pBse), tstrerror(code));
×
983
  }
984

985
  (void)taosThreadMutexUnlock(&pBse->mutex);
3,444✔
986
  return code;
3,444✔
987
}
988

989
int32_t bseGetAliveFileList(SBse *pBse, SArray **pFileList, int8_t lock) {
3,444✔
990
  int32_t code = 0;
3,444✔
991
  int32_t lino = 0;
3,444✔
992

993
  SArray *p = taosArrayInit(4, sizeof(SBseLiveFileInfo));
3,444✔
994
  if (p == NULL) {
3,444✔
995
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
996
  }
997

998
  if (lock) {
3,444✔
999
    (void)taosThreadMutexLock(&pBse->mutex);
×
1000
  }
1001

1002
  if (taosArrayAddAll(p, pBse->commitInfo.pFileList) == NULL) {
3,444✔
1003
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1004
  }
1005

1006
  *pFileList = p;
3,444✔
1007
_error:
3,444✔
1008
  if (code != 0) {
3,444✔
1009
    bseError("vgId:%d failed to get alive file list since %s", BSE_VGID(pBse), tstrerror(code));
×
1010
  }
1011
  if (lock) {
3,444✔
1012
    (void)taosThreadMutexUnlock(&pBse->mutex);
×
1013
  }
1014
  return code;
3,444✔
1015
}
1016

1017
int8_t bseSkipCommit(SBse *pBse, SBseLiveFileInfo *info) {
4,728,763✔
1018
  if (info->size == 0) {
4,728,763✔
1019
    bseInfo("vgId:%d no data to commit", BSE_VGID(pBse));
4,725,319✔
1020
    return 1;
4,725,319✔
1021
  }
1022
  return 0;
3,444✔
1023
}
1024

1025
int32_t bseCommit(SBse *pBse) {
4,727,801✔
1026
  // Generate static info and footer info;
1027
  int64_t cost = 0;
4,727,801✔
1028
  int32_t code = 0;
4,727,801✔
1029
  int32_t line = 0;
4,727,801✔
1030
  int64_t st = taosGetTimestampMs();
4,728,763✔
1031
  SArray *pLiveFile = NULL;
4,728,763✔
1032

1033
  SBseLiveFileInfo info = {0};
4,728,222✔
1034
  code = bseTableMgtCommit(pBse->pTableMgt, &info);
4,728,763✔
1035
  TSDB_CHECK_CODE(code, line, _error);
4,728,763✔
1036

1037
  if (bseSkipCommit(pBse, &info)) {
4,728,763✔
1038
    goto _error;
4,725,319✔
1039
  }
1040

1041
  code = bseUpdateCommitInfo(pBse, &info, &pLiveFile);
3,444✔
1042
  TSDB_CHECK_CODE(code, line, _error);
3,444✔
1043

1044
  code = bseCommitDo(pBse, pLiveFile);
3,444✔
1045
  TSDB_CHECK_CODE(code, line, _error);
3,444✔
1046

1047
_error:
4,728,763✔
1048
  cost = taosGetTimestampMs() - st;
4,728,763✔
1049
  if (cost >= 1000) {
4,728,763✔
UNCOV
1050
    bseWarn("vgId:%d bse commit cost %" PRId64 " ms", BSE_VGID(pBse), cost);
×
1051
  }
1052
  if (code != 0) {
4,728,763✔
1053
    bseError("vgId:%d failed to commit at line %d since %s", BSE_VGID(pBse), line, tstrerror(code));
×
1054
  }
1055
  taosArrayDestroy(pLiveFile);
4,728,763✔
1056

1057
  return code;
4,728,763✔
1058
}
1059

1060
int32_t bseRollback(SBse *pBse, int64_t ver) {
×
1061
  // TODO
1062
  int32_t code = 0;
×
1063
  return code;
×
1064
}
1065

1066
int32_t bseRollbackImpl(SBse *pBse) {
×
1067
  int32_t code = 0;
×
1068
  return code;
×
1069
}
1070

1071
int32_t bseCompact(SBse *pBse) {
×
1072
  // impl later
1073
  int32_t code = 0;
×
1074
  return code;
×
1075
}
1076

1077
int32_t bseDelete(SBse *pBse, SSeqRange range) {
×
1078
  int32_t code = 0;
×
1079
  return code;
×
1080
}
1081

1082
int32_t bseUpdateCfg(SBse *pBse, SBseCfg *pCfg) {
×
1083
  int32_t code = 0;
×
1084
  if (pCfg == NULL) {
×
1085
    return TSDB_CODE_INVALID_MSG;
×
1086
  }
1087

1088
  (void)taosThreadMutexLock(&pBse->mutex);
×
1089
  if (pCfg->blockSize > 0) {
×
1090
    pBse->cfg.blockSize = pCfg->blockSize;
×
1091
  }
1092

1093
  if (pCfg->keepDays > 0) {
×
1094
    pBse->cfg.keepDays = pCfg->keepDays;
×
1095
  }
1096

1097
  if (pCfg->compressType >= kNoCompres && pCfg->compressType <= kZxCompress) {
×
1098
    pBse->cfg.compressType = pCfg->compressType;
×
1099
  }
1100

1101
  if (pCfg->tableCacheSize >= 0) {
×
1102
    pBse->cfg.tableCacheSize = pCfg->tableCacheSize;
×
1103
  }
1104

1105
  if (pCfg->blockCacheSize >= 0) {
×
1106
    pBse->cfg.blockCacheSize = pCfg->blockCacheSize;
×
1107
  }
1108
  (void)taosThreadMutexUnlock(&pBse->mutex);
×
1109
  return code;
×
1110
}
1111

1112
int32_t bseUpdatCfgNoLock(SBse *pBse, SBseCfg *pCfg) {
×
1113
  int32_t code = 0;
×
1114
  if (pCfg == NULL) {
×
1115
    return TSDB_CODE_INVALID_MSG;
×
1116
  }
1117
  if (pCfg->blockSize > 0) {
×
1118
    pBse->cfg.blockSize = pCfg->blockSize;
×
1119
  }
1120

1121
  if (pCfg->keepDays > 0) {
×
1122
    pBse->cfg.keepDays = pCfg->keepDays;
×
1123
  }
1124

1125
  if (pCfg->compressType >= kNoCompres && pCfg->compressType <= kZxCompress) {
×
1126
    pBse->cfg.compressType = pCfg->compressType;
×
1127
  }
1128

1129
  if (pCfg->tableCacheSize >= 0) {
×
1130
    pBse->cfg.tableCacheSize = pCfg->tableCacheSize;
×
1131
  }
1132

1133
  if (pCfg->blockCacheSize >= 0) {
×
1134
    pBse->cfg.blockCacheSize = pCfg->blockCacheSize;
×
1135
  }
1136
  return code;
×
1137
}
1138
int32_t bseSetCompressType(SBse *pBse, int8_t compressType) {
×
1139
  int32_t code = 0;
×
1140
  if (compressType < kNoCompres || compressType > kZxCompress) {
×
1141
    return TSDB_CODE_INVALID_MSG;
×
1142
  }
1143
  (void)taosThreadMutexLock(&pBse->mutex);
×
1144
  pBse->cfg.compressType = compressType;
×
1145
  (void)taosThreadMutexUnlock(&pBse->mutex);
×
1146

1147
  return code;
×
1148
}
1149
int32_t bseSetBlockSize(SBse *pBse, int32_t blockSize) {
×
1150
  int32_t code = 0;
×
1151
  if (blockSize <= 0) {
×
1152
    return TSDB_CODE_INVALID_MSG;
×
1153
  }
1154
  (void)taosThreadMutexLock(&pBse->mutex);
×
1155
  pBse->cfg.blockSize = blockSize;
×
1156
  (void)taosThreadMutexUnlock(&pBse->mutex);
×
1157

1158
  return code;
×
1159
}
1160
int32_t bseSetBlockCacheSize(SBse *pBse, int32_t blockCacheSize) {
×
1161
  int32_t code = 0;
×
1162
  if (blockCacheSize <= 0) {
×
1163
    return TSDB_CODE_INVALID_MSG;
×
1164
  }
1165
  (void)taosThreadMutexLock(&pBse->mutex);
×
1166
  pBse->cfg.blockCacheSize = blockCacheSize;
×
1167

1168
  code = bseTableMgtSetBlockCacheSize(pBse->pTableMgt, blockCacheSize);
×
1169
  (void)taosThreadMutexUnlock(&pBse->mutex);
×
1170

1171
  return code;
×
1172
}
1173
int32_t bseSetTableCacheSize(SBse *pBse, int32_t tableCacheSize) {
×
1174
  int32_t code = 0;
×
1175
  if (tableCacheSize <= 0) {
×
1176
    return TSDB_CODE_INVALID_MSG;
×
1177
  }
1178
  (void)taosThreadMutexLock(&pBse->mutex);
×
1179

1180
  pBse->cfg.tableCacheSize = tableCacheSize;
×
1181
  code = bseTableMgtSetTableCacheSize(pBse->pTableMgt, tableCacheSize);
×
1182
  (void)taosThreadMutexUnlock(&pBse->mutex);
×
1183

1184
  return code;
×
1185
}
1186
int32_t bseSetKeepDays(SBse *pBse, int32_t keepDays) {
×
1187
  int32_t code = 0;
×
1188
  if (keepDays <= 0) {
×
1189
    return TSDB_CODE_INVALID_MSG;
×
1190
  }
1191
  (void)taosThreadMutexLock(&pBse->mutex);
×
1192
  pBse->cfg.keepDays = keepDays;
×
1193
  (void)taosThreadMutexUnlock(&pBse->mutex);
×
1194
  return code;
×
1195
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc