• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4986

15 Mar 2026 08:32AM UTC coverage: 37.305% (-31.3%) from 68.601%
#4986

push

travis-ci

tomchon
test: keep docs and unit test

125478 of 336361 relevant lines covered (37.3%)

1134847.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

8.49
/source/dnode/vnode/src/bse/bseTableMgt.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "bseTableMgt.h"
17
#include "bse.h"
18
#include "bseTable.h"
19
#include "bseUtil.h"
20
#include "tglobal.h"
21
#include "thash.h"
22

23
static int32_t tableReaderMgtInit(STableReaderMgt *pReader, SBse *pBse, int64_t timestamp);
24
static int32_t tableReaderMgtSeek(STableReaderMgt *pReaderMgt, int64_t seq, uint8_t **pValue, int32_t *len);
25
// static int32_t tableReaderMgtClear(STableReaderMgt *pReader);
26
static void    tableReaderMgtDestroy(STableReaderMgt *pReader);
27

28
static int32_t tableBuilderMgtInit(STableBuilderMgt *pMgt, SBse *pBse, int64_t timestamp);
29
static int32_t tableBuilderMgtOpenBuilder(STableBuilderMgt *pMgt, int64_t seq, STableBuilder **p);
30
static int32_t tableBuilderMgtCommit(STableBuilderMgt *pMgt, SBseLiveFileInfo *pInfo);
31
static int32_t tableBuilderMgtSeek(STableBuilderMgt *pMgt, int64_t seq, uint8_t **pValue, int32_t *len);
32
static int32_t tableBuilderMgtPutBatch(STableBuilderMgt *pMgt, SBseBatch *pBatch);
33
static int32_t tableBuilderMgtClear(STableBuilderMgt *pMgt);
34
static void    tableBuilderMgtDestroy(STableBuilderMgt *pMgt);
35

36
static int32_t tableBuilderMgtRecover(STableBuilderMgt *pMgt, int64_t seq, STableBuilder **pBuilder, int64_t size);
37

38
static int32_t tableMetaMgtInit(STableMetaMgt *pMgt, SBse *pBse, int64_t timestamp);
39
static void    tableMetaMgtDestroy(STableMetaMgt *pMgt);
40

41
static void tableReaderFree(void *pReader);  // lru table reader free func
42
static void blockFree(void *pBlock);         // block free func
43

44
int32_t bseTableMgtCreate(SBse *pBse, void **pMgt) {
40✔
45
  int32_t code = 0;
40✔
46
  int32_t lino = 0;
40✔
47

48
  STableMgt *p = taosMemoryCalloc(1, sizeof(STableMgt));
40✔
49
  if (p == NULL) {
40✔
50
    return terrno;
×
51
  }
52
  p->pBse = pBse;
40✔
53
  p->pHashObj = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
40✔
54
  if (p->pHashObj == NULL) {
40✔
55
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
56
  }
57

58
  *pMgt = p;
40✔
59
_error:
40✔
60
  if (code != 0) {
40✔
61
    if (p != NULL)
×
62
      bseError("vgId:%d failed to open table pBuilderMgt at line %d since %s", BSE_VGID((SBse *)p->pBse), lino,
×
63
               tstrerror(code));
64
    bseTableMgtCleanup(p);
×
65
  }
66
  return code;
40✔
67
}
68

69
int32_t bseTableMgtSetLastTableId(STableMgt *pMgt, int64_t timestamp) {
×
70
  if (pMgt == NULL) return 0;
×
71

72
  pMgt->timestamp = timestamp;
×
73
  return 0;
×
74
}
75

76
int32_t createSubTableMgt(int64_t timestamp, int32_t readOnly, STableMgt *pMgt, SSubTableMgt **pSubMgt) {
×
77
  int32_t code = 0;
×
78
  int32_t lino = 0;
×
79

80
  SSubTableMgt *p = taosMemCalloc(1, sizeof(SSubTableMgt));
×
81
  if (p == NULL) {
×
82
    code = terrno;
×
83
    TSDB_CHECK_CODE(code, lino, _error);
×
84
  }
85

86
  if (!readOnly) {
×
87
    code = tableBuilderMgtInit(p->pBuilderMgt, pMgt->pBse, timestamp);
×
88
    TSDB_CHECK_CODE(code, lino, _error);
×
89

90
    p->pBuilderMgt->pMgt = p;
×
91
  }
92

93
  code = tableReaderMgtInit(p->pReaderMgt, pMgt->pBse, timestamp);
×
94
  TSDB_CHECK_CODE(code, lino, _error);
×
95

96
  p->pReaderMgt->pMgt = p;
×
97

98
  code = tableMetaMgtInit(p->pTableMetaMgt, pMgt->pBse, timestamp);
×
99
  TSDB_CHECK_CODE(code, lino, _error);
×
100

101
  p->pTableMetaMgt->pMgt = p;
×
102

103
  *pSubMgt = p;
×
104
_error:
×
105
  if (code != 0) {
×
106
    bseError("failed to create sub table mgt at line %d since %s", lino, tstrerror(code));
×
107
    destroySubTableMgt(p);
×
108
  }
109
  return code;
×
110
}
111
void destroySubTableMgt(SSubTableMgt *p) {
40✔
112
  if (p != NULL) {
40✔
113
    tableBuilderMgtDestroy(p->pBuilderMgt);
×
114
    tableReaderMgtDestroy(p->pReaderMgt);
×
115
    tableMetaMgtDestroy(p->pTableMetaMgt);
×
116
  }
117
  taosMemoryFree(p);
40✔
118
}
40✔
119
int32_t bseTableMgtGet(STableMgt *pMgt, int64_t seq, uint8_t **pValue, int32_t *len) {
×
120
  if (pMgt == NULL) return TSDB_CODE_INVALID_PARA;
×
121

122
  int32_t       code = 0;
×
123
  int32_t       lino = 0;
×
124
  int32_t       readOnly = 1;
×
125
  SSubTableMgt *pSubMgt = NULL;
×
126
  SBse *pBse = pMgt->pBse;
×
127

128
  int64_t timestamp = 0;
×
129
  code = bseGetTableIdBySeq(pBse, seq, &timestamp);
×
130
  TSDB_CHECK_CODE(code, lino, _error);
×
131

132
  if (timestamp > 0) {
×
133
    SSubTableMgt **ppSubMgt = taosHashGet(pMgt->pHashObj, &timestamp, sizeof(timestamp));
×
134
    if (ppSubMgt == NULL || *ppSubMgt == NULL) {
×
135
      code = createSubTableMgt(timestamp, 0, pMgt, &pSubMgt);
×
136
      TSDB_CHECK_CODE(code, lino, _error);
×
137

138
      code = taosHashPut(pMgt->pHashObj, &timestamp, sizeof(timestamp), &pSubMgt, sizeof(SSubTableMgt *));
×
139
      TSDB_CHECK_CODE(code, lino, _error);
×
140

141
    } else {
142
      pSubMgt = *ppSubMgt;
×
143
    }
144
  } else {
145
    pSubMgt = pMgt->pCurrTableMgt;
×
146
    if (pSubMgt == NULL) {
×
147
      TSDB_CHECK_CODE(TSDB_CODE_BLOB_SEQ_NOT_FOUND, lino, _error);
×
148
    }
149
    readOnly = 0;
×
150
  }
151

152
  if (readOnly) {
×
153
    code = tableReaderMgtSeek(pSubMgt->pReaderMgt, seq, pValue, len);
×
154
    TSDB_CHECK_CODE(code, lino, _error);
×
155
  } else {
156
    code = tableBuilderMgtSeek(pSubMgt->pBuilderMgt, seq, pValue, len);
×
157
    if (code != TSDB_CODE_SUCCESS) {
×
158
      if (code != TSDB_CODE_OUT_OF_RANGE) {
×
159
        code = tableReaderMgtSeek(pSubMgt->pReaderMgt, seq, pValue, len);
×
160
        TSDB_CHECK_CODE(code, lino, _error);
×
161
      }
162
    }
163
  }
164
_error:
×
165
  if (code != 0) {
×
166
    bseError("vgId:%d failed to get table at line %d since %s", BSE_VGID(pBse), lino, tstrerror(code));
×
167
  }
168
  return code;
×
169
}
170

171
int32_t bseTableMgtRecoverTable(STableMgt *pMgt, SBseLiveFileInfo *pInfo) {
×
172
  int32_t code = 0;
×
173
  int32_t lino = 0;
×
174
  if (pMgt == NULL) return 0;
×
175

176
  SSubTableMgt *pSubMgt = NULL;
×
177

178
  code = createSubTableMgt(pInfo->timestamp, 0, pMgt, &pSubMgt);
×
179
  TSDB_CHECK_CODE(code, lino, _error);
×
180

181
  code = tableBuilderMgtRecover(pSubMgt->pBuilderMgt, 0, NULL, pInfo->size);
×
182
  TSDB_CHECK_CODE(code, lino, _error);
×
183

184
_error:
×
185
  if (code != 0) {
×
186
    bseError("failed to recover table at line %d since %s", lino, tstrerror(code));
×
187
  }
188
  destroySubTableMgt(pSubMgt);
×
189
  return 0;
×
190
}
191

192
void bseTableMgtCleanup(void *pMgt) {
40✔
193
  if (pMgt == NULL) return;
40✔
194

195
  STableMgt *p = (STableMgt *)pMgt;
40✔
196

197
  void *pIter = taosHashIterate(p->pHashObj, NULL);
40✔
198
  while (pIter) {
40✔
199
    SSubTableMgt **ppSubMgt = pIter;
×
200
    destroySubTableMgt(*ppSubMgt);
×
201
    pIter = taosHashIterate(p->pHashObj, pIter);
×
202
  }
203

204
  destroySubTableMgt(p->pCurrTableMgt);
40✔
205

206
  taosHashCleanup(p->pHashObj);
40✔
207
  taosMemoryFree(p);
40✔
208
}
209

210
static int32_t bseCalcNowTimestamp(int8_t precision, int64_t *dst) {
×
211
  int64_t nowSec = taosGetTimestampSec();
×
212
  int32_t code = 0;
×
213
  if (precision == TSDB_TIME_PRECISION_MILLI) {
×
214
    nowSec = nowSec * 1000;
×
215
  } else if (precision == TSDB_TIME_PRECISION_MICRO) {
×
216
    nowSec = nowSec * 1000000l;
×
217
  } else if (precision == TSDB_TIME_PRECISION_NANO) {
×
218
    nowSec = nowSec * 1000000000l;
×
219
  } else {
220
    bseError("bse invalid time precision:%d", precision);
×
221
    return TSDB_CODE_INVALID_PARA;
×
222
  }
223
  *dst = nowSec;
×
224
  return code;
×
225
}
226

227
static int32_t bseShouldSwitchToTable(int64_t nowTimestamp, int64_t timestamp, int8_t precision, int32_t keepDays) {
×
228
  if (timestamp == 0) return 1;
×
229
  if (keepDays <= 0) return 0;
×
230

231
  int64_t threshold = keepDays * 24 * tsTickPerHour[precision];
×
232
  int64_t diff = nowTimestamp - timestamp;
×
233

234
  if (diff < threshold) {
×
235
    return 0;
×
236
  } else {
237
    return 1;
×
238
  }
239

240
  return 0;
241
}
242
static int32_t bseTableMgtGetTable(STableMgt *pMgt, SSubTableMgt **ppSubGgt) {
×
243
  int32_t code = 0;
×
244
  int32_t lino = 0;
×
245

246
  int64_t       startTs = 0;
×
247
  SBse         *pBse = pMgt->pBse;
×
248
  SSubTableMgt *pSubMgt = pMgt->pCurrTableMgt;
×
249

250
  code = bseCalcNowTimestamp(BSE_TIME_PRECISION(pBse), &startTs);
×
251
  TSDB_CHECK_CODE(code, lino, _error);
×
252

253
  if (pSubMgt == NULL) {
×
254
    if (pMgt->timestamp != 0) {
×
255
      if (!bseShouldSwitchToTable(startTs, pMgt->timestamp, BSE_TIME_PRECISION(pBse), BSE_KEEY_DAYS(pBse))) {
×
256
        startTs = pMgt->timestamp;
×
257
      }
258
    }
259

260
    code = createSubTableMgt(startTs, 0, pMgt, &pMgt->pCurrTableMgt);
×
261
    TSDB_CHECK_CODE(code, lino, _error);
×
262

263
    pSubMgt = pMgt->pCurrTableMgt;
×
264
  } else {
265
    if (bseShouldSwitchToTable(startTs, pSubMgt->pBuilderMgt->timestamp, BSE_TIME_PRECISION(pBse),
×
266
                               BSE_KEEY_DAYS(pBse))) {
×
267
      code = bseCommit(pBse);
×
268
      TSDB_CHECK_CODE(code, lino, _error);
×
269

270
      destroySubTableMgt(pSubMgt);
×
271

272
      code = createSubTableMgt(startTs, 0, pMgt, &pMgt->pCurrTableMgt);
×
273
      TSDB_CHECK_CODE(code, lino, _error);
×
274
    }
275

276
    pSubMgt = pMgt->pCurrTableMgt;
×
277
  }
278

279
_error:
×
280
  if (code != 0) {
×
281
    bseError("failed to get sub table at line %d since %s", lino, tstrerror(code));
×
282
  } else {
283
    *ppSubGgt = pSubMgt;
×
284
  }
285

286
  return 0;
×
287
}
288
int32_t bseTableMgtAppend(STableMgt *pMgt, SBseBatch *pBatch) {
×
289
  int32_t code = 0;
×
290
  int32_t lino = 0;
×
291

292
  SBse         *pBse = pMgt->pBse;
×
293
  SSubTableMgt *pSubMgt = NULL;
×
294
  code = bseTableMgtGetTable(pMgt, &pSubMgt);
×
295
  TSDB_CHECK_CODE(code, lino, _error);
×
296

297
  code = tableBuilderMgtPutBatch(pSubMgt->pBuilderMgt, pBatch);
×
298
  TSDB_CHECK_CODE(code, lino, _error);
×
299

300
_error:
×
301
  if (code != 0) {
×
302
    bseError("failed to append table at line %d since %s", lino, tstrerror(code));
×
303
  }
304
  return code;
×
305
}
306

307
int32_t bseTableMgtGetLiveFileSet(STableMgt *pMgt, SArray **pList) {
×
308
  int32_t code = 0;
×
309
  return code;
×
310
}
311

312
int32_t bseTableMgtCommit(STableMgt *pMgt, SBseLiveFileInfo *pInfo) {
4✔
313
  int32_t code = 0;
4✔
314
  int32_t lino = 0;
4✔
315

316
  SSubTableMgt *pSubMgt = pMgt->pCurrTableMgt;
4✔
317
  if (pSubMgt == NULL) {
4✔
318
    bseInfo("nothing to commit table");
4✔
319
    return code;
4✔
320
  }
321

322
  code = tableBuilderMgtCommit(pSubMgt->pBuilderMgt, pInfo);
×
323
  TSDB_CHECK_CODE(code, lino, _error);
×
324
_error:
×
325
  if (code != 0) {
×
326
    bseError("failed to commit table at line %d since %s", lino, tstrerror(code));
×
327
  } else {
328
    bseInfo("succ to commit bse table");
×
329
  }
330
  return code;
×
331
}
332

333
int32_t bseTableMgtUpdateLiveFileSet(STableMgt *pMgt, SArray *pLiveFileSet) {
×
334
  int32_t code = 0;
×
335
  return code;
×
336
}
337

338
int32_t bseTableMgtSetBlockCacheSize(STableMgt *pMgt, int32_t cap) {
×
339
  int32_t code = 0;
×
340
  return code;
×
341
  // return blockCacheResize(pMgt->pReaderMgt->pBlockCache, cap);
342
}
343

344
int32_t bseTableMgtSetTableCacheSize(STableMgt *pMgt, int32_t cap) {
×
345
  int32_t code = 0;
×
346
  return code;
×
347
  // return tableCacheResize(pMgt->pReaderMgt->pTableCache, cap);
348
}
349

350
int32_t bseTableMgtClear(STableMgt *pMgt) {
×
351
  int32_t code = 0;
×
352
  int32_t lino = 0;
×
353
  if (pMgt == NULL) return 0;
×
354

355
  destroySubTableMgt(pMgt->pCurrTableMgt);
×
356

357
  void *pIter = taosHashIterate(pMgt->pHashObj, NULL);
×
358
  while (pIter) {
×
359
    SSubTableMgt **ppSubMgt = pIter;
×
360
    destroySubTableMgt(*ppSubMgt);
×
361
    pIter = taosHashIterate(pMgt->pHashObj, pIter);
×
362
  }
363
  taosHashClear(pMgt->pHashObj);
×
364

365
_error:
×
366
  if (code != 0) {
×
367
    bseError("failed to clear table at line %d since %s", lino, tstrerror(code));
×
368
  }
369
  return code;
×
370
}
371

372
void tableReaderFree(void *pReader) {
×
373
  STableReader *p = (STableReader *)pReader;
×
374
  if (p != NULL) {
×
375
    tableReaderClose(p);
×
376
  }
377
}
×
378
void blockFree(void *pBlock) { taosMemoryFree(pBlock); }
×
379

380
int32_t tableReaderMgtInit(STableReaderMgt *pReader, SBse *pBse, int64_t timestamp) {
×
381
  int32_t code = 0;
×
382
  int32_t lino = 0;
×
383

384
  (void)taosThreadRwlockInit(&pReader->mutex, NULL);
×
385

386
  code = blockCacheOpen(48, blockFree, &pReader->pBlockCache);
×
387
  TSDB_CHECK_CODE(code, lino, _error);
×
388

389
  code = tableCacheOpen(32, tableReaderFree, &pReader->pTableCache);
×
390
  TSDB_CHECK_CODE(code, lino, _error);
×
391

392
  pReader->pBse = pBse;
×
393
  pReader->timestamp = timestamp;
×
394

395
_error:
×
396
  if (code != 0) {
×
397
    bseError("failed to init table pReaderMgt mgt at line %d since %s", lino, tstrerror(code));
×
398
  }
399
  return code;
×
400
}
401

402
// int32_t tableReaderMgtClear(STableReaderMgt *pReader) {
403
//   int32_t code = 0;
404

405
//   (void)taosThreadRwlockWrlock(&pReader->mutex);
406

407
//   (void)(tableCacheClear(pReader->pTableCache));
408

409
//   (void)(blockCacheClear(pReader->pBlockCache));
410
//   (void)taosThreadRwlockUnlock(&pReader->mutex);
411

412
//   return code;
413
// }
414

415
void tableReaderMgtDestroy(STableReaderMgt *pReader) {
×
416
  tableCacheClose(pReader->pTableCache);
×
417
  blockCacheClose(pReader->pBlockCache);
×
418
  (void)taosThreadRwlockDestroy(&pReader->mutex);
×
419
}
×
420

421
int32_t tableReaderMgtSeek(STableReaderMgt *pReaderMgt, int64_t seq, uint8_t **pValue, int32_t *len) {
×
422
  int32_t code = 0;
×
423
  int32_t lino = 0;
×
424

425
  STableReader *pReader = NULL;
×
426

427
  code = tableReaderOpen(pReaderMgt->timestamp, &pReader, pReaderMgt);
×
428
  TSDB_CHECK_CODE(code, lino, _error);
×
429

430
  code = tableReaderGet(pReader, seq, pValue, len);
×
431
  TSDB_CHECK_CODE(code, lino, _error);
×
432

433
_error:
×
434
  if (code != 0) {
×
435
    bseError("failed to seek table pReaderMgt at line %d since %s", lino, tstrerror(code));
×
436
  }
437

438
  tableReaderClose(pReader);
×
439
  return code;
×
440
}
441

442
int32_t tableBuilderMgtInit(STableBuilderMgt *pMgt, SBse *pBse, int64_t timestamp) {
×
443
  int32_t code = 0;
×
444
  int32_t lino = 0;
×
445

446
  (void)taosThreadRwlockInit(&pMgt->mutex, NULL);
×
447
  pMgt->pBse = pBse;
×
448
  pMgt->timestamp = timestamp;
×
449
  return code;
×
450
}
451

452
int32_t tableBuilderMgtClear(STableBuilderMgt *pMgt) {
×
453
  int32_t code = 0;
×
454
  int32_t lino = 0;
×
455

456
  (void)taosThreadRwlockWrlock(&pMgt->mutex);
×
457
  tableBuilderClose(pMgt->p, 0);
×
458
  (void)taosThreadRwlockUnlock(&pMgt->mutex);
×
459
  return code;
×
460
}
461

462
int32_t tableBuilderMgtPutBatch(STableBuilderMgt *pMgt, SBseBatch *pBatch) {
×
463
  int32_t code = 0;
×
464
  int32_t lino = 0;
×
465
  int64_t seq = pBatch->startSeq;
×
466

467
  (void)taosThreadRwlockWrlock(&pMgt->mutex);
×
468
  STableBuilder *p = pMgt->p;
×
469

470
  if (p == NULL) {
×
471
    code = tableBuilderMgtOpenBuilder(pMgt, seq, &p);
×
472
    if (code != 0) {
×
473
      TSDB_CHECK_CODE(code, lino, _error);
×
474
    }
475
  }
476
  if (p->pMemTable == NULL) {
×
477
    code = bseMemTableCreate(&p->pMemTable, BSE_BLOCK_SIZE(pMgt->pBse));
×
478
    if (code != 0) {
×
479
      TSDB_CHECK_CODE(code, lino, _error);
×
480
    }
481

482
    p->pMemTable->pTableBuilder = p;
×
483
  }
484
  code = tableBuilderPut(p, pBatch);
×
485
_error:
×
486
  if (code != 0) {
×
487
    bseError("failed to put batch to table builder at line %d since %s", lino, tstrerror(code));
×
488
  } else {
489
    bseTrace("succ to put batch to table builder mem %p, imumm table %p", p->pMemTable, p->pImmuMemTable);
×
490
  }
491
  (void)taosThreadRwlockUnlock(&pMgt->mutex);
×
492

493
  return code;
×
494
}
495

496
int32_t tableBuilderMgtSeek(STableBuilderMgt *pMgt, int64_t seq, uint8_t **pValue, int32_t *len) {
×
497
  int32_t        code = 0;
×
498
  int32_t        lino = 0;
×
499
  STableBuilder *pBuilder = NULL;
×
500

501
  (void)taosThreadRwlockRdlock(&pMgt->mutex);
×
502
  pBuilder = pMgt->p;
×
503

504
  if (pBuilder) {
×
505
    code = tableBuilderGet(pBuilder, seq, pValue, len);
×
506
  } else {
507
    code = TSDB_CODE_OUT_OF_RANGE;  //  continue to read from reader
×
508
  }
509
  (void)taosThreadRwlockUnlock(&pMgt->mutex);
×
510
  return code;
×
511
}
512

513
int32_t tableBuilderMgtOpenBuilder(STableBuilderMgt *pMgt, int64_t seq, STableBuilder **pBuilder) {
×
514
  int32_t code = 0;
×
515
  int32_t lino = 0;
×
516

517
  SBse *pBse = pMgt->pBse;
×
518

519
  STableBuilder *p = NULL;
×
520

521
  code = tableBuilderOpen(pMgt->timestamp, &p, pBse);
×
522
  TSDB_CHECK_CODE(code, lino, _error);
×
523

524
  p->pTableMeta = pMgt->pMgt->pTableMetaMgt->pTableMeta;
×
525
  p->pBuilderMgt = pMgt;
×
526
  pMgt->p = p;
×
527

528
  *pBuilder = p;
×
529

530
_error:
×
531
  if (code != 0) {
×
532
    bseError("failed to open table builder at line %d since %s", __LINE__, tstrerror(code));
×
533
  }
534

535
  return code;
×
536
}
537

538
int32_t tableBuilderMgtRecover(STableBuilderMgt *pMgt, int64_t seq, STableBuilder **pBuilder, int64_t size) {
×
539
  int32_t        code = 0;
×
540
  int32_t        lino = 0;
×
541
  STableBuilder *pTable = NULL;
×
542

543
  code = tableBuilderMgtOpenBuilder(pMgt, seq, &pTable);
×
544
  TSDB_CHECK_CODE(code, lino, _error);
×
545

546
  if (pTable->offset > size) {
×
547
    code = tableBuilderTruncFile(pTable, size);
×
548
    TSDB_CHECK_CODE(code, lino, _error);
×
549
  }
550
_error:
×
551
  if (code != 0) {
×
552
    bseError("failed to open table builder at line %d since %s", lino, tstrerror(code));
×
553
  }
554

555
  return code;
×
556
}
557
int32_t tableBuilderMgtCommit(STableBuilderMgt *pMgt, SBseLiveFileInfo *pInfo) {
×
558
  int32_t        code = 0;
×
559
  int32_t        lino = 0;
×
560
  int8_t         flushIdx = -1;
×
561
  STableBuilder *pBuilder = NULL;
×
562

563
  (void)taosThreadRwlockWrlock(&pMgt->mutex);
×
564
  pBuilder = pMgt->p;
×
565

566
  bseInfo("start to commit bse table builder mem %p, immu mem %p", pBuilder->pMemTable, pBuilder->pImmuMemTable);
×
567
  pBuilder->pImmuMemTable = pBuilder->pMemTable;
×
568
  pBuilder->pMemTable = NULL;
×
569
  (void)taosThreadRwlockUnlock(&pMgt->mutex);
×
570

571
  code = tableBuilderCommit(pBuilder, pInfo);
×
572

573
_error:
×
574
  if (code != 0) {
×
575
    bseError("failed to commit table builder at line %d since %s", lino, tstrerror(code));
×
576
  } else {
577
    bseTrace("succ to commit bse table builder mem %p, immu mem %p", pBuilder->pMemTable, pBuilder->pImmuMemTable);
×
578
  }
579
  return code;
×
580
}
581

582
void tableBuilderMgtDestroy(STableBuilderMgt *pMgt) {
×
583
  tableBuilderClose(pMgt->p, 0);
×
584
  (void)taosThreadRwlockDestroy(&pMgt->mutex);
×
585
}
×
586

587
int32_t tableMetaMgtInit(STableMetaMgt *pMgt, SBse *pBse, int64_t timestamp) {
×
588
  int32_t code = 0;
×
589
  int32_t lino = 0;
×
590
  pMgt->pBse = pBse;
×
591

592
  code = tableMetaOpen(NULL, &pMgt->pTableMeta, pMgt);
×
593
  TSDB_CHECK_CODE(code, lino, _error);
×
594

595
  pMgt->timestamp = timestamp;
×
596
  pMgt->pTableMeta->timestamp = timestamp;
×
597
  pMgt->pTableMeta->pBse = pBse;
×
598

599
_error:
×
600
  if (code != 0) {
×
601
    bseError("failed to init table meta mgt at line %d since %s", lino, tstrerror(code));
×
602
  }
603
  return code;
×
604
}
605

606
static void tableMetaMgtDestroy(STableMetaMgt *pMgt) {
×
607
  if (pMgt->pTableMeta != NULL) {
×
608
    tableMetaClose(pMgt->pTableMeta);
×
609
    pMgt->pTableMeta = NULL;
×
610
  }
611
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc