• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5051

13 May 2026 12:00PM UTC coverage: 73.358% (-0.04%) from 73.398%
#5051

push

travis-ci

web-flow
feat: taosdump support stream backup/restore (#35326)

139 of 170 new or added lines in 3 files covered. (81.76%)

714 existing lines in 146 files now uncovered.

281543 of 383795 relevant lines covered (73.36%)

135448694.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.86
/source/dnode/vnode/src/bse/bseTableMgt.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "bseTableMgt.h"
17
#include "bse.h"
18
#include "bseTable.h"
19
#include "bseUtil.h"
20
#include "tglobal.h"
21
#include "thash.h"
22

23
static int32_t tableReaderMgtInit(STableReaderMgt *pReader, SBse *pBse, int64_t timestamp);
24
static int32_t tableReaderMgtSeek(STableReaderMgt *pReaderMgt, int64_t seq, uint8_t **pValue, int32_t *len);
25
// static int32_t tableReaderMgtClear(STableReaderMgt *pReader);
26
static void    tableReaderMgtDestroy(STableReaderMgt *pReader);
27

28
static int32_t tableBuilderMgtInit(STableBuilderMgt *pMgt, SBse *pBse, int64_t timestamp);
29
static int32_t tableBuilderMgtOpenBuilder(STableBuilderMgt *pMgt, int64_t seq, STableBuilder **p);
30
static int32_t tableBuilderMgtCommit(STableBuilderMgt *pMgt, SBseLiveFileInfo *pInfo);
31
static int32_t tableBuilderMgtSeek(STableBuilderMgt *pMgt, int64_t seq, uint8_t **pValue, int32_t *len);
32
static int32_t tableBuilderMgtPutBatch(STableBuilderMgt *pMgt, SBseBatch *pBatch);
33
static int32_t tableBuilderMgtClear(STableBuilderMgt *pMgt);
34
static void    tableBuilderMgtDestroy(STableBuilderMgt *pMgt);
35

36
static int32_t tableBuilderMgtRecover(STableBuilderMgt *pMgt, int64_t seq, STableBuilder **pBuilder, int64_t size);
37

38
static int32_t tableMetaMgtInit(STableMetaMgt *pMgt, SBse *pBse, int64_t timestamp);
39
static void    tableMetaMgtDestroy(STableMetaMgt *pMgt);
40

41
static void tableReaderFree(void *pReader);  // lru table reader free func
42
static void blockFree(void *pBlock);         // block free func
43

44
int32_t bseTableMgtCreate(SBse *pBse, void **pMgt) {
5,004,558✔
45
  int32_t code = 0;
5,004,558✔
46
  int32_t lino = 0;
5,004,558✔
47

48
  STableMgt *p = taosMemoryCalloc(1, sizeof(STableMgt));
5,004,558✔
49
  if (p == NULL) {
5,009,121✔
50
    return terrno;
×
51
  }
52
  p->pBse = pBse;
5,009,121✔
53
  p->pHashObj = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
5,008,601✔
54
  if (p->pHashObj == NULL) {
5,009,445✔
55
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
56
  }
57

58
  *pMgt = p;
5,009,406✔
59
_error:
5,009,097✔
60
  if (code != 0) {
5,009,097✔
61
    if (p != NULL)
×
62
      bseError("vgId:%d failed to open table pBuilderMgt at line %d since %s", BSE_VGID((SBse *)p->pBse), lino,
×
63
               tstrerror(code));
64
    bseTableMgtCleanup(p);
×
65
  }
66
  return code;
5,009,445✔
67
}
68

69
int32_t bseTableMgtSetLastTableId(STableMgt *pMgt, int64_t timestamp) {
×
70
  if (pMgt == NULL) return 0;
×
71

72
  pMgt->timestamp = timestamp;
×
73
  return 0;
×
74
}
75

76
int32_t createSubTableMgt(int64_t timestamp, int32_t readOnly, STableMgt *pMgt, SSubTableMgt **pSubMgt) {
9,035✔
77
  int32_t code = 0;
9,035✔
78
  int32_t lino = 0;
9,035✔
79

80
  SSubTableMgt *p = taosMemCalloc(1, sizeof(SSubTableMgt));
9,035✔
81
  if (p == NULL) {
9,035✔
82
    code = terrno;
×
83
    TSDB_CHECK_CODE(code, lino, _error);
×
84
  }
85

86
  if (!readOnly) {
9,035✔
87
    code = tableBuilderMgtInit(p->pBuilderMgt, pMgt->pBse, timestamp);
9,035✔
88
    TSDB_CHECK_CODE(code, lino, _error);
9,035✔
89

90
    p->pBuilderMgt->pMgt = p;
9,035✔
91
  }
92

93
  code = tableReaderMgtInit(p->pReaderMgt, pMgt->pBse, timestamp);
9,035✔
94
  TSDB_CHECK_CODE(code, lino, _error);
9,035✔
95

96
  p->pReaderMgt->pMgt = p;
9,035✔
97

98
  code = tableMetaMgtInit(p->pTableMetaMgt, pMgt->pBse, timestamp);
9,035✔
99
  TSDB_CHECK_CODE(code, lino, _error);
9,035✔
100

101
  p->pTableMetaMgt->pMgt = p;
9,035✔
102

103
  *pSubMgt = p;
9,035✔
104
_error:
9,035✔
105
  if (code != 0) {
9,035✔
106
    bseError("failed to create sub table mgt at line %d since %s", lino, tstrerror(code));
×
107
    destroySubTableMgt(p);
×
108
  }
109
  return code;
9,035✔
110
}
111
void destroySubTableMgt(SSubTableMgt *p) {
5,011,555✔
112
  if (p != NULL) {
5,011,555✔
113
    tableBuilderMgtDestroy(p->pBuilderMgt);
9,035✔
114
    tableReaderMgtDestroy(p->pReaderMgt);
9,035✔
115
    tableMetaMgtDestroy(p->pTableMetaMgt);
9,035✔
116
  }
117
  taosMemoryFree(p);
5,011,555✔
118
}
5,011,452✔
119
int32_t bseTableMgtGet(STableMgt *pMgt, int64_t seq, uint8_t **pValue, int32_t *len) {
37,868,717✔
120
  if (pMgt == NULL) return TSDB_CODE_INVALID_PARA;
37,868,717✔
121

122
  int32_t       code = 0;
37,868,717✔
123
  int32_t       lino = 0;
37,868,717✔
124
  int32_t       readOnly = 1;
37,868,717✔
125
  SSubTableMgt *pSubMgt = NULL;
37,868,717✔
126
  SBse *pBse = pMgt->pBse;
37,868,717✔
127

128
  int64_t timestamp = 0;
37,868,717✔
129
  code = bseGetTableIdBySeq(pBse, seq, &timestamp);
37,868,717✔
130
  TSDB_CHECK_CODE(code, lino, _error);
37,868,717✔
131

132
  if (timestamp > 0) {
37,868,717✔
133
    SSubTableMgt **ppSubMgt = taosHashGet(pMgt->pHashObj, &timestamp, sizeof(timestamp));
12,607,857✔
134
    if (ppSubMgt == NULL || *ppSubMgt == NULL) {
12,607,857✔
135
      code = createSubTableMgt(timestamp, 0, pMgt, &pSubMgt);
1,705✔
136
      TSDB_CHECK_CODE(code, lino, _error);
1,705✔
137

138
      code = taosHashPut(pMgt->pHashObj, &timestamp, sizeof(timestamp), &pSubMgt, sizeof(SSubTableMgt *));
1,705✔
139
      TSDB_CHECK_CODE(code, lino, _error);
1,705✔
140

141
    } else {
142
      pSubMgt = *ppSubMgt;
12,606,152✔
143
    }
144
  } else {
145
    pSubMgt = pMgt->pCurrTableMgt;
25,260,860✔
146
    if (pSubMgt == NULL) {
25,260,860✔
147
      TSDB_CHECK_CODE(TSDB_CODE_BLOB_SEQ_NOT_FOUND, lino, _error);
×
148
    }
149
    readOnly = 0;
25,260,860✔
150
  }
151

152
  if (readOnly) {
37,868,717✔
153
    code = tableReaderMgtSeek(pSubMgt->pReaderMgt, seq, pValue, len);
12,607,857✔
154
    TSDB_CHECK_CODE(code, lino, _error);
12,607,857✔
155
  } else {
156
    code = tableBuilderMgtSeek(pSubMgt->pBuilderMgt, seq, pValue, len);
25,260,860✔
157
    if (code != TSDB_CODE_SUCCESS) {
25,260,860✔
UNCOV
158
      if (code != TSDB_CODE_OUT_OF_RANGE) {
×
UNCOV
159
        code = tableReaderMgtSeek(pSubMgt->pReaderMgt, seq, pValue, len);
×
UNCOV
160
        TSDB_CHECK_CODE(code, lino, _error);
×
161
      }
162
    }
163
  }
164
_error:
37,868,717✔
165
  if (code != 0) {
37,868,717✔
166
    bseError("vgId:%d failed to get table at line %d since %s", BSE_VGID(pBse), lino, tstrerror(code));
×
167
  }
168
  return code;
37,868,717✔
169
}
170

171
int32_t bseTableMgtRecoverTable(STableMgt *pMgt, SBseLiveFileInfo *pInfo) {
×
172
  int32_t code = 0;
×
173
  int32_t lino = 0;
×
174
  if (pMgt == NULL) return 0;
×
175

176
  SSubTableMgt *pSubMgt = NULL;
×
177

178
  code = createSubTableMgt(pInfo->timestamp, 0, pMgt, &pSubMgt);
×
179
  TSDB_CHECK_CODE(code, lino, _error);
×
180

181
  code = tableBuilderMgtRecover(pSubMgt->pBuilderMgt, 0, NULL, pInfo->size);
×
182
  TSDB_CHECK_CODE(code, lino, _error);
×
183

184
_error:
×
185
  if (code != 0) {
×
186
    bseError("failed to recover table at line %d since %s", lino, tstrerror(code));
×
187
  }
188
  destroySubTableMgt(pSubMgt);
×
189
  return 0;
×
190
}
191

192
void bseTableMgtCleanup(void *pMgt) {
5,009,850✔
193
  if (pMgt == NULL) return;
5,009,850✔
194

195
  STableMgt *p = (STableMgt *)pMgt;
5,009,850✔
196

197
  void *pIter = taosHashIterate(p->pHashObj, NULL);
5,009,850✔
198
  while (pIter) {
5,011,555✔
199
    SSubTableMgt **ppSubMgt = pIter;
1,705✔
200
    destroySubTableMgt(*ppSubMgt);
1,705✔
201
    pIter = taosHashIterate(p->pHashObj, pIter);
1,705✔
202
  }
203

204
  destroySubTableMgt(p->pCurrTableMgt);
5,009,850✔
205

206
  taosHashCleanup(p->pHashObj);
5,009,747✔
207
  taosMemoryFree(p);
5,009,549✔
208
}
209

210
static int32_t bseCalcNowTimestamp(int8_t precision, int64_t *dst) {
51,338✔
211
  int64_t nowSec = taosGetTimestampSec();
51,338✔
212
  int32_t code = 0;
51,338✔
213
  if (precision == TSDB_TIME_PRECISION_MILLI) {
51,338✔
214
    nowSec = nowSec * 1000;
51,338✔
215
  } else if (precision == TSDB_TIME_PRECISION_MICRO) {
×
216
    nowSec = nowSec * 1000000l;
×
217
  } else if (precision == TSDB_TIME_PRECISION_NANO) {
×
218
    nowSec = nowSec * 1000000000l;
×
219
  } else {
220
    bseError("bse invalid time precision:%d", precision);
×
221
    return TSDB_CODE_INVALID_PARA;
×
222
  }
223
  *dst = nowSec;
51,338✔
224
  return code;
51,338✔
225
}
226

227
static int32_t bseShouldSwitchToTable(int64_t nowTimestamp, int64_t timestamp, int8_t precision, int32_t keepDays) {
44,008✔
228
  if (timestamp == 0) return 1;
44,008✔
229
  if (keepDays <= 0) return 0;
44,008✔
230

231
  int64_t threshold = keepDays * 24 * tsTickPerHour[precision];
44,008✔
232
  int64_t diff = nowTimestamp - timestamp;
44,008✔
233

234
  if (diff < threshold) {
44,008✔
235
    return 0;
44,008✔
236
  } else {
237
    return 1;
×
238
  }
239

240
  return 0;
241
}
242
static int32_t bseTableMgtGetTable(STableMgt *pMgt, SSubTableMgt **ppSubGgt) {
51,338✔
243
  int32_t code = 0;
51,338✔
244
  int32_t lino = 0;
51,338✔
245

246
  int64_t       startTs = 0;
51,338✔
247
  SBse         *pBse = pMgt->pBse;
51,338✔
248
  SSubTableMgt *pSubMgt = pMgt->pCurrTableMgt;
51,338✔
249

250
  code = bseCalcNowTimestamp(BSE_TIME_PRECISION(pBse), &startTs);
51,338✔
251
  TSDB_CHECK_CODE(code, lino, _error);
51,338✔
252

253
  if (pSubMgt == NULL) {
51,338✔
254
    if (pMgt->timestamp != 0) {
7,330✔
255
      if (!bseShouldSwitchToTable(startTs, pMgt->timestamp, BSE_TIME_PRECISION(pBse), BSE_KEEY_DAYS(pBse))) {
×
256
        startTs = pMgt->timestamp;
×
257
      }
258
    }
259

260
    code = createSubTableMgt(startTs, 0, pMgt, &pMgt->pCurrTableMgt);
7,330✔
261
    TSDB_CHECK_CODE(code, lino, _error);
7,330✔
262

263
    pSubMgt = pMgt->pCurrTableMgt;
7,330✔
264
  } else {
265
    if (bseShouldSwitchToTable(startTs, pSubMgt->pBuilderMgt->timestamp, BSE_TIME_PRECISION(pBse),
44,008✔
266
                               BSE_KEEY_DAYS(pBse))) {
44,008✔
267
      code = bseCommit(pBse);
×
268
      TSDB_CHECK_CODE(code, lino, _error);
×
269

270
      destroySubTableMgt(pSubMgt);
×
271

272
      code = createSubTableMgt(startTs, 0, pMgt, &pMgt->pCurrTableMgt);
×
273
      TSDB_CHECK_CODE(code, lino, _error);
×
274
    }
275

276
    pSubMgt = pMgt->pCurrTableMgt;
44,008✔
277
  }
278

279
_error:
51,338✔
280
  if (code != 0) {
51,338✔
281
    bseError("failed to get sub table at line %d since %s", lino, tstrerror(code));
×
282
  } else {
283
    *ppSubGgt = pSubMgt;
51,338✔
284
  }
285

286
  return 0;
51,338✔
287
}
288
int32_t bseTableMgtAppend(STableMgt *pMgt, SBseBatch *pBatch) {
51,338✔
289
  int32_t code = 0;
51,338✔
290
  int32_t lino = 0;
51,338✔
291

292
  SBse         *pBse = pMgt->pBse;
51,338✔
293
  SSubTableMgt *pSubMgt = NULL;
51,338✔
294
  code = bseTableMgtGetTable(pMgt, &pSubMgt);
51,338✔
295
  TSDB_CHECK_CODE(code, lino, _error);
51,338✔
296

297
  code = tableBuilderMgtPutBatch(pSubMgt->pBuilderMgt, pBatch);
51,338✔
298
  TSDB_CHECK_CODE(code, lino, _error);
51,338✔
299

300
_error:
51,338✔
301
  if (code != 0) {
51,338✔
302
    bseError("failed to append table at line %d since %s", lino, tstrerror(code));
×
303
  }
304
  return code;
51,338✔
305
}
306

307
int32_t bseTableMgtGetLiveFileSet(STableMgt *pMgt, SArray **pList) {
×
308
  int32_t code = 0;
×
309
  return code;
×
310
}
311

312
int32_t bseTableMgtCommit(STableMgt *pMgt, SBseLiveFileInfo *pInfo) {
6,070,498✔
313
  int32_t code = 0;
6,070,498✔
314
  int32_t lino = 0;
6,070,498✔
315

316
  SSubTableMgt *pSubMgt = pMgt->pCurrTableMgt;
6,070,498✔
317
  if (pSubMgt == NULL) {
6,074,403✔
318
    bseInfo("nothing to commit table");
6,067,383✔
319
    return code;
6,067,383✔
320
  }
321

322
  code = tableBuilderMgtCommit(pSubMgt->pBuilderMgt, pInfo);
7,020✔
323
  TSDB_CHECK_CODE(code, lino, _error);
7,020✔
324
_error:
7,020✔
325
  if (code != 0) {
7,020✔
326
    bseError("failed to commit table at line %d since %s", lino, tstrerror(code));
×
327
  } else {
328
    bseInfo("succ to commit bse table");
7,020✔
329
  }
330
  return code;
7,020✔
331
}
332

333
int32_t bseTableMgtUpdateLiveFileSet(STableMgt *pMgt, SArray *pLiveFileSet) {
×
334
  int32_t code = 0;
×
335
  return code;
×
336
}
337

338
int32_t bseTableMgtSetBlockCacheSize(STableMgt *pMgt, int32_t cap) {
×
339
  int32_t code = 0;
×
340
  return code;
×
341
  // return blockCacheResize(pMgt->pReaderMgt->pBlockCache, cap);
342
}
343

344
int32_t bseTableMgtSetTableCacheSize(STableMgt *pMgt, int32_t cap) {
×
345
  int32_t code = 0;
×
346
  return code;
×
347
  // return tableCacheResize(pMgt->pReaderMgt->pTableCache, cap);
348
}
349

350
int32_t bseTableMgtClear(STableMgt *pMgt) {
×
351
  int32_t code = 0;
×
352
  int32_t lino = 0;
×
353
  if (pMgt == NULL) return 0;
×
354

355
  destroySubTableMgt(pMgt->pCurrTableMgt);
×
356

357
  void *pIter = taosHashIterate(pMgt->pHashObj, NULL);
×
358
  while (pIter) {
×
359
    SSubTableMgt **ppSubMgt = pIter;
×
360
    destroySubTableMgt(*ppSubMgt);
×
361
    pIter = taosHashIterate(pMgt->pHashObj, pIter);
×
362
  }
363
  taosHashClear(pMgt->pHashObj);
×
364

365
_error:
×
366
  if (code != 0) {
×
367
    bseError("failed to clear table at line %d since %s", lino, tstrerror(code));
×
368
  }
369
  return code;
×
370
}
371

372
void tableReaderFree(void *pReader) {
×
373
  STableReader *p = (STableReader *)pReader;
×
374
  if (p != NULL) {
×
375
    tableReaderClose(p);
×
376
  }
377
}
×
378
void blockFree(void *pBlock) { taosMemoryFree(pBlock); }
3,039✔
379

380
int32_t tableReaderMgtInit(STableReaderMgt *pReader, SBse *pBse, int64_t timestamp) {
9,035✔
381
  int32_t code = 0;
9,035✔
382
  int32_t lino = 0;
9,035✔
383

384
  (void)taosThreadRwlockInit(&pReader->mutex, NULL);
9,035✔
385

386
  code = blockCacheOpen(48, blockFree, &pReader->pBlockCache);
9,035✔
387
  TSDB_CHECK_CODE(code, lino, _error);
9,035✔
388

389
  code = tableCacheOpen(32, tableReaderFree, &pReader->pTableCache);
9,035✔
390
  TSDB_CHECK_CODE(code, lino, _error);
9,035✔
391

392
  pReader->pBse = pBse;
9,035✔
393
  pReader->timestamp = timestamp;
9,035✔
394

395
_error:
9,035✔
396
  if (code != 0) {
9,035✔
397
    bseError("failed to init table pReaderMgt mgt at line %d since %s", lino, tstrerror(code));
×
398
  }
399
  return code;
9,035✔
400
}
401

402
// int32_t tableReaderMgtClear(STableReaderMgt *pReader) {
403
//   int32_t code = 0;
404

405
//   (void)taosThreadRwlockWrlock(&pReader->mutex);
406

407
//   (void)(tableCacheClear(pReader->pTableCache));
408

409
//   (void)(blockCacheClear(pReader->pBlockCache));
410
//   (void)taosThreadRwlockUnlock(&pReader->mutex);
411

412
//   return code;
413
// }
414

415
void tableReaderMgtDestroy(STableReaderMgt *pReader) {
9,035✔
416
  tableCacheClose(pReader->pTableCache);
9,035✔
417
  blockCacheClose(pReader->pBlockCache);
9,035✔
418
  (void)taosThreadRwlockDestroy(&pReader->mutex);
9,035✔
419
}
9,035✔
420

421
int32_t tableReaderMgtSeek(STableReaderMgt *pReaderMgt, int64_t seq, uint8_t **pValue, int32_t *len) {
12,607,857✔
422
  int32_t code = 0;
12,607,857✔
423
  int32_t lino = 0;
12,607,857✔
424

425
  STableReader *pReader = NULL;
12,607,857✔
426

427
  code = tableReaderOpen(pReaderMgt->timestamp, &pReader, pReaderMgt);
12,607,857✔
428
  TSDB_CHECK_CODE(code, lino, _error);
12,607,857✔
429

430
  code = tableReaderGet(pReader, seq, pValue, len);
12,607,857✔
431
  TSDB_CHECK_CODE(code, lino, _error);
12,607,857✔
432

433
_error:
12,607,857✔
434
  if (code != 0) {
12,607,857✔
435
    bseError("failed to seek table pReaderMgt at line %d since %s", lino, tstrerror(code));
×
436
  }
437

438
  tableReaderClose(pReader);
12,607,857✔
439
  return code;
12,607,857✔
440
}
441

442
int32_t tableBuilderMgtInit(STableBuilderMgt *pMgt, SBse *pBse, int64_t timestamp) {
9,035✔
443
  int32_t code = 0;
9,035✔
444
  int32_t lino = 0;
9,035✔
445

446
  (void)taosThreadRwlockInit(&pMgt->mutex, NULL);
9,035✔
447
  pMgt->pBse = pBse;
9,035✔
448
  pMgt->timestamp = timestamp;
9,035✔
449
  return code;
9,035✔
450
}
451

452
int32_t tableBuilderMgtClear(STableBuilderMgt *pMgt) {
×
453
  int32_t code = 0;
×
454
  int32_t lino = 0;
×
455

456
  (void)taosThreadRwlockWrlock(&pMgt->mutex);
×
457
  tableBuilderClose(pMgt->p, 0);
×
458
  (void)taosThreadRwlockUnlock(&pMgt->mutex);
×
459
  return code;
×
460
}
461

462
int32_t tableBuilderMgtPutBatch(STableBuilderMgt *pMgt, SBseBatch *pBatch) {
51,338✔
463
  int32_t code = 0;
51,338✔
464
  int32_t lino = 0;
51,338✔
465
  int64_t seq = pBatch->startSeq;
51,338✔
466

467
  (void)taosThreadRwlockWrlock(&pMgt->mutex);
51,338✔
468
  STableBuilder *p = pMgt->p;
51,338✔
469

470
  if (p == NULL) {
51,338✔
471
    code = tableBuilderMgtOpenBuilder(pMgt, seq, &p);
7,330✔
472
    if (code != 0) {
7,330✔
473
      TSDB_CHECK_CODE(code, lino, _error);
×
474
    }
475
  }
476
  if (p->pMemTable == NULL) {
51,338✔
477
    code = bseMemTableCreate(&p->pMemTable, BSE_BLOCK_SIZE(pMgt->pBse));
4,226✔
478
    if (code != 0) {
4,226✔
479
      TSDB_CHECK_CODE(code, lino, _error);
×
480
    }
481

482
    p->pMemTable->pTableBuilder = p;
4,226✔
483
  }
484
  code = tableBuilderPut(p, pBatch);
51,338✔
485
_error:
51,338✔
486
  if (code != 0) {
51,338✔
487
    bseError("failed to put batch to table builder at line %d since %s", lino, tstrerror(code));
×
488
  } else {
489
    bseTrace("succ to put batch to table builder mem %p, imumm table %p", p->pMemTable, p->pImmuMemTable);
51,338✔
490
  }
491
  (void)taosThreadRwlockUnlock(&pMgt->mutex);
51,338✔
492

493
  return code;
51,338✔
494
}
495

496
int32_t tableBuilderMgtSeek(STableBuilderMgt *pMgt, int64_t seq, uint8_t **pValue, int32_t *len) {
25,260,860✔
497
  int32_t        code = 0;
25,260,860✔
498
  int32_t        lino = 0;
25,260,860✔
499
  STableBuilder *pBuilder = NULL;
25,260,860✔
500

501
  (void)taosThreadRwlockRdlock(&pMgt->mutex);
25,260,860✔
502
  pBuilder = pMgt->p;
25,260,860✔
503

504
  if (pBuilder) {
25,260,860✔
505
    code = tableBuilderGet(pBuilder, seq, pValue, len);
25,260,860✔
506
  } else {
507
    code = TSDB_CODE_OUT_OF_RANGE;  //  continue to read from reader
×
508
  }
509
  (void)taosThreadRwlockUnlock(&pMgt->mutex);
25,260,860✔
510
  return code;
25,260,860✔
511
}
512

513
int32_t tableBuilderMgtOpenBuilder(STableBuilderMgt *pMgt, int64_t seq, STableBuilder **pBuilder) {
7,330✔
514
  int32_t code = 0;
7,330✔
515
  int32_t lino = 0;
7,330✔
516

517
  SBse *pBse = pMgt->pBse;
7,330✔
518

519
  STableBuilder *p = NULL;
7,330✔
520

521
  code = tableBuilderOpen(pMgt->timestamp, &p, pBse);
7,330✔
522
  TSDB_CHECK_CODE(code, lino, _error);
7,330✔
523

524
  p->pTableMeta = pMgt->pMgt->pTableMetaMgt->pTableMeta;
7,330✔
525
  p->pBuilderMgt = pMgt;
7,330✔
526
  pMgt->p = p;
7,330✔
527

528
  *pBuilder = p;
7,330✔
529

530
_error:
7,330✔
531
  if (code != 0) {
7,330✔
532
    bseError("failed to open table builder at line %d since %s", __LINE__, tstrerror(code));
×
533
  }
534

535
  return code;
7,330✔
536
}
537

538
int32_t tableBuilderMgtRecover(STableBuilderMgt *pMgt, int64_t seq, STableBuilder **pBuilder, int64_t size) {
×
539
  int32_t        code = 0;
×
540
  int32_t        lino = 0;
×
541
  STableBuilder *pTable = NULL;
×
542

543
  code = tableBuilderMgtOpenBuilder(pMgt, seq, &pTable);
×
544
  TSDB_CHECK_CODE(code, lino, _error);
×
545

546
  if (pTable->offset > size) {
×
547
    code = tableBuilderTruncFile(pTable, size);
×
548
    TSDB_CHECK_CODE(code, lino, _error);
×
549
  }
550
_error:
×
551
  if (code != 0) {
×
552
    bseError("failed to open table builder at line %d since %s", lino, tstrerror(code));
×
553
  }
554

555
  return code;
×
556
}
557
int32_t tableBuilderMgtCommit(STableBuilderMgt *pMgt, SBseLiveFileInfo *pInfo) {
7,020✔
558
  int32_t        code = 0;
7,020✔
559
  int32_t        lino = 0;
7,020✔
560
  int8_t         flushIdx = -1;
7,020✔
561
  STableBuilder *pBuilder = NULL;
7,020✔
562

563
  (void)taosThreadRwlockWrlock(&pMgt->mutex);
7,020✔
564
  pBuilder = pMgt->p;
7,020✔
565

566
  bseInfo("start to commit bse table builder mem %p, immu mem %p", pBuilder->pMemTable, pBuilder->pImmuMemTable);
7,020✔
567
  pBuilder->pImmuMemTable = pBuilder->pMemTable;
7,020✔
568
  pBuilder->pMemTable = NULL;
7,020✔
569
  (void)taosThreadRwlockUnlock(&pMgt->mutex);
7,020✔
570

571
  code = tableBuilderCommit(pBuilder, pInfo);
7,020✔
572

573
_error:
7,020✔
574
  if (code != 0) {
7,020✔
575
    bseError("failed to commit table builder at line %d since %s", lino, tstrerror(code));
×
576
  } else {
577
    bseTrace("succ to commit bse table builder mem %p, immu mem %p", pBuilder->pMemTable, pBuilder->pImmuMemTable);
7,020✔
578
  }
579
  return code;
7,020✔
580
}
581

582
void tableBuilderMgtDestroy(STableBuilderMgt *pMgt) {
9,035✔
583
  tableBuilderClose(pMgt->p, 0);
9,035✔
584
  (void)taosThreadRwlockDestroy(&pMgt->mutex);
9,035✔
585
}
9,035✔
586

587
int32_t tableMetaMgtInit(STableMetaMgt *pMgt, SBse *pBse, int64_t timestamp) {
9,035✔
588
  int32_t code = 0;
9,035✔
589
  int32_t lino = 0;
9,035✔
590
  pMgt->pBse = pBse;
9,035✔
591

592
  code = tableMetaOpen(NULL, &pMgt->pTableMeta, pMgt);
9,035✔
593
  TSDB_CHECK_CODE(code, lino, _error);
9,035✔
594

595
  pMgt->timestamp = timestamp;
9,035✔
596
  pMgt->pTableMeta->timestamp = timestamp;
9,035✔
597
  pMgt->pTableMeta->pBse = pBse;
9,035✔
598

599
_error:
9,035✔
600
  if (code != 0) {
9,035✔
601
    bseError("failed to init table meta mgt at line %d since %s", lino, tstrerror(code));
×
602
  }
603
  return code;
9,035✔
604
}
605

606
static void tableMetaMgtDestroy(STableMetaMgt *pMgt) {
9,035✔
607
  if (pMgt->pTableMeta != NULL) {
9,035✔
608
    tableMetaClose(pMgt->pTableMeta);
9,035✔
609
    pMgt->pTableMeta = NULL;
9,035✔
610
  }
611
}
9,035✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc