• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4913

06 Jan 2026 01:30AM UTC coverage: 64.884% (-0.004%) from 64.888%
#4913

push

travis-ci

web-flow
merge: from main to 3.0 branch #34167

180 of 319 new or added lines in 14 files covered. (56.43%)

571 existing lines in 128 files now uncovered.

195016 of 300563 relevant lines covered (64.88%)

117540852.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.86
/source/dnode/vnode/src/bse/bseTableMgt.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "bseTableMgt.h"
17
#include "bse.h"
18
#include "bseTable.h"
19
#include "bseUtil.h"
20
#include "tglobal.h"
21
#include "thash.h"
22

23
static int32_t tableReaderMgtInit(STableReaderMgt *pReader, SBse *pBse, int64_t timestamp);
24
static int32_t tableReaderMgtSeek(STableReaderMgt *pReaderMgt, int64_t seq, uint8_t **pValue, int32_t *len);
25
// static int32_t tableReaderMgtClear(STableReaderMgt *pReader);
26
static void    tableReaderMgtDestroy(STableReaderMgt *pReader);
27

28
static int32_t tableBuilderMgtInit(STableBuilderMgt *pMgt, SBse *pBse, int64_t timestamp);
29
static int32_t tableBuilderMgtOpenBuilder(STableBuilderMgt *pMgt, int64_t seq, STableBuilder **p);
30
static int32_t tableBuilderMgtCommit(STableBuilderMgt *pMgt, SBseLiveFileInfo *pInfo);
31
static int32_t tableBuilderMgtSeek(STableBuilderMgt *pMgt, int64_t seq, uint8_t **pValue, int32_t *len);
32
static int32_t tableBuilderMgtPutBatch(STableBuilderMgt *pMgt, SBseBatch *pBatch);
33
static int32_t tableBuilderMgtClear(STableBuilderMgt *pMgt);
34
static void    tableBuilderMgtDestroy(STableBuilderMgt *pMgt);
35

36
static int32_t tableBuilderMgtRecover(STableBuilderMgt *pMgt, int64_t seq, STableBuilder **pBuilder, int64_t size);
37

38
static int32_t tableMetaMgtInit(STableMetaMgt *pMgt, SBse *pBse, int64_t timestamp);
39
static void    tableMetaMgtDestroy(STableMetaMgt *pMgt);
40

41
static void tableReaderFree(void *pReader);  // lru table reader free func
42
static void blockFree(void *pBlock);         // block free func
43

44
int32_t bseTableMgtCreate(SBse *pBse, void **pMgt) {
3,719,938✔
45
  int32_t code = 0;
3,719,938✔
46
  int32_t lino = 0;
3,719,938✔
47

48
  STableMgt *p = taosMemoryCalloc(1, sizeof(STableMgt));
3,719,938✔
49
  if (p == NULL) {
3,725,747✔
50
    return terrno;
×
51
  }
52
  p->pBse = pBse;
3,725,747✔
53
  p->pHashObj = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
3,725,747✔
54
  if (p->pHashObj == NULL) {
3,725,747✔
55
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
56
  }
57

58
  *pMgt = p;
3,725,279✔
59
_error:
3,725,134✔
60
  if (code != 0) {
3,725,134✔
61
    if (p != NULL)
×
62
      bseError("vgId:%d failed to open table pBuilderMgt at line %d since %s", BSE_VGID((SBse *)p->pBse), lino,
×
63
               tstrerror(code));
64
    bseTableMgtCleanup(p);
×
65
  }
66
  return code;
3,725,279✔
67
}
68

69
int32_t bseTableMgtSetLastTableId(STableMgt *pMgt, int64_t timestamp) {
×
70
  if (pMgt == NULL) return 0;
×
71

72
  pMgt->timestamp = timestamp;
×
73
  return 0;
×
74
}
75

76
int32_t createSubTableMgt(int64_t timestamp, int32_t readOnly, STableMgt *pMgt, SSubTableMgt **pSubMgt) {
3,687✔
77
  int32_t code = 0;
3,687✔
78
  int32_t lino = 0;
3,687✔
79

80
  SSubTableMgt *p = taosMemCalloc(1, sizeof(SSubTableMgt));
3,687✔
81
  if (p == NULL) {
3,687✔
82
    code = terrno;
×
83
    TSDB_CHECK_CODE(code, lino, _error);
×
84
  }
85

86
  if (!readOnly) {
3,687✔
87
    code = tableBuilderMgtInit(p->pBuilderMgt, pMgt->pBse, timestamp);
3,687✔
88
    TSDB_CHECK_CODE(code, lino, _error);
3,687✔
89

90
    p->pBuilderMgt->pMgt = p;
3,687✔
91
  }
92

93
  code = tableReaderMgtInit(p->pReaderMgt, pMgt->pBse, timestamp);
3,687✔
94
  TSDB_CHECK_CODE(code, lino, _error);
3,687✔
95

96
  p->pReaderMgt->pMgt = p;
3,687✔
97

98
  code = tableMetaMgtInit(p->pTableMetaMgt, pMgt->pBse, timestamp);
3,687✔
99
  TSDB_CHECK_CODE(code, lino, _error);
3,687✔
100

101
  p->pTableMetaMgt->pMgt = p;
3,687✔
102

103
  *pSubMgt = p;
3,687✔
104
_error:
3,687✔
105
  if (code != 0) {
3,687✔
106
    bseError("failed to create sub table mgt at line %d since %s", lino, tstrerror(code));
×
107
    destroySubTableMgt(p);
×
108
  }
109
  return code;
3,687✔
110
}
111
void destroySubTableMgt(SSubTableMgt *p) {
3,726,299✔
112
  if (p != NULL) {
3,726,299✔
113
    tableBuilderMgtDestroy(p->pBuilderMgt);
3,687✔
114
    tableReaderMgtDestroy(p->pReaderMgt);
3,687✔
115
    tableMetaMgtDestroy(p->pTableMetaMgt);
3,687✔
116
  }
117
  taosMemoryFree(p);
3,726,299✔
118
}
3,726,299✔
119
int32_t bseTableMgtGet(STableMgt *pMgt, int64_t seq, uint8_t **pValue, int32_t *len) {
31,309,755✔
120
  if (pMgt == NULL) return TSDB_CODE_INVALID_PARA;
31,309,755✔
121

122
  int32_t       code = 0;
31,309,755✔
123
  int32_t       lino = 0;
31,309,755✔
124
  int32_t       readOnly = 1;
31,309,755✔
125
  SSubTableMgt *pSubMgt = NULL;
31,309,755✔
126
  SBse *pBse = pMgt->pBse;
31,309,755✔
127

128
  int64_t timestamp = 0;
31,309,755✔
129
  code = bseGetTableIdBySeq(pBse, seq, &timestamp);
31,309,755✔
130
  TSDB_CHECK_CODE(code, lino, _error);
31,309,755✔
131

132
  if (timestamp > 0) {
31,309,755✔
133
    SSubTableMgt **ppSubMgt = taosHashGet(pMgt->pHashObj, &timestamp, sizeof(timestamp));
10,432,800✔
134
    if (ppSubMgt == NULL || *ppSubMgt == NULL) {
10,432,800✔
135
      code = createSubTableMgt(timestamp, 0, pMgt, &pSubMgt);
552✔
136
      TSDB_CHECK_CODE(code, lino, _error);
552✔
137

138
      code = taosHashPut(pMgt->pHashObj, &timestamp, sizeof(timestamp), &pSubMgt, sizeof(SSubTableMgt *));
552✔
139
      TSDB_CHECK_CODE(code, lino, _error);
552✔
140

141
    } else {
142
      pSubMgt = *ppSubMgt;
10,432,248✔
143
    }
144
  } else {
145
    pSubMgt = pMgt->pCurrTableMgt;
20,876,955✔
146
    if (pSubMgt == NULL) {
20,876,955✔
147
      TSDB_CHECK_CODE(TSDB_CODE_BLOB_SEQ_NOT_FOUND, lino, _error);
×
148
    }
149
    readOnly = 0;
20,876,955✔
150
  }
151

152
  if (readOnly) {
31,309,755✔
153
    code = tableReaderMgtSeek(pSubMgt->pReaderMgt, seq, pValue, len);
10,432,800✔
154
    TSDB_CHECK_CODE(code, lino, _error);
10,432,800✔
155
  } else {
156
    code = tableBuilderMgtSeek(pSubMgt->pBuilderMgt, seq, pValue, len);
20,876,955✔
157
    if (code != TSDB_CODE_SUCCESS) {
20,876,955✔
UNCOV
158
      if (code != TSDB_CODE_OUT_OF_RANGE) {
×
UNCOV
159
        code = tableReaderMgtSeek(pSubMgt->pReaderMgt, seq, pValue, len);
×
UNCOV
160
        TSDB_CHECK_CODE(code, lino, _error);
×
161
      }
162
    }
163
  }
164
_error:
31,309,755✔
165
  if (code != 0) {
31,309,755✔
166
    bseError("vgId:%d failed to get table at line %d since %s", BSE_VGID(pBse), lino, tstrerror(code));
×
167
  }
168
  return code;
31,309,755✔
169
}
170

171
int32_t bseTableMgtRecoverTable(STableMgt *pMgt, SBseLiveFileInfo *pInfo) {
×
172
  int32_t code = 0;
×
173
  int32_t lino = 0;
×
174
  if (pMgt == NULL) return 0;
×
175

176
  SSubTableMgt *pSubMgt = NULL;
×
177

178
  code = createSubTableMgt(pInfo->timestamp, 0, pMgt, &pSubMgt);
×
179
  TSDB_CHECK_CODE(code, lino, _error);
×
180

181
  code = tableBuilderMgtRecover(pSubMgt->pBuilderMgt, 0, NULL, pInfo->size);
×
182
  TSDB_CHECK_CODE(code, lino, _error);
×
183

184
_error:
×
185
  if (code != 0) {
×
186
    bseError("failed to recover table at line %d since %s", lino, tstrerror(code));
×
187
  }
188
  destroySubTableMgt(pSubMgt);
×
189
  return 0;
×
190
}
191

192
void bseTableMgtCleanup(void *pMgt) {
3,725,747✔
193
  if (pMgt == NULL) return;
3,725,747✔
194

195
  STableMgt *p = (STableMgt *)pMgt;
3,725,747✔
196

197
  void *pIter = taosHashIterate(p->pHashObj, NULL);
3,725,747✔
198
  while (pIter) {
3,726,299✔
199
    SSubTableMgt **ppSubMgt = pIter;
552✔
200
    destroySubTableMgt(*ppSubMgt);
552✔
201
    pIter = taosHashIterate(p->pHashObj, pIter);
552✔
202
  }
203

204
  destroySubTableMgt(p->pCurrTableMgt);
3,725,747✔
205

206
  taosHashCleanup(p->pHashObj);
3,725,747✔
207
  taosMemoryFree(p);
3,724,977✔
208
}
209

210
static int32_t bseCalcNowTimestamp(int8_t precision, int64_t *dst) {
21,246✔
211
  int64_t nowSec = taosGetTimestampSec();
21,246✔
212
  int32_t code = 0;
21,246✔
213
  if (precision == TSDB_TIME_PRECISION_MILLI) {
21,246✔
214
    nowSec = nowSec * 1000;
21,246✔
215
  } else if (precision == TSDB_TIME_PRECISION_MICRO) {
×
216
    nowSec = nowSec * 1000000l;
×
217
  } else if (precision == TSDB_TIME_PRECISION_NANO) {
×
218
    nowSec = nowSec * 1000000000l;
×
219
  } else {
220
    bseError("bse invalid time precision:%d", precision);
×
221
    return TSDB_CODE_INVALID_PARA;
×
222
  }
223
  *dst = nowSec;
21,246✔
224
  return code;
21,246✔
225
}
226

227
static int32_t bseShouldSwitchToTable(int64_t nowTimestamp, int64_t timestamp, int8_t precision, int32_t keepDays) {
18,111✔
228
  if (timestamp == 0) return 1;
18,111✔
229
  if (keepDays <= 0) return 0;
18,111✔
230

231
  int64_t threshold = keepDays * 24 * tsTickPerHour[precision];
18,111✔
232
  int64_t diff = nowTimestamp - timestamp;
18,111✔
233

234
  if (diff < threshold) {
18,111✔
235
    return 0;
18,111✔
236
  } else {
237
    return 1;
×
238
  }
239

240
  return 0;
241
}
242
static int32_t bseTableMgtGetTable(STableMgt *pMgt, SSubTableMgt **ppSubGgt) {
21,246✔
243
  int32_t code = 0;
21,246✔
244
  int32_t lino = 0;
21,246✔
245

246
  int64_t       startTs = 0;
21,246✔
247
  SBse         *pBse = pMgt->pBse;
21,246✔
248
  SSubTableMgt *pSubMgt = pMgt->pCurrTableMgt;
21,246✔
249

250
  code = bseCalcNowTimestamp(BSE_TIME_PRECISION(pBse), &startTs);
21,246✔
251
  TSDB_CHECK_CODE(code, lino, _error);
21,246✔
252

253
  if (pSubMgt == NULL) {
21,246✔
254
    if (pMgt->timestamp != 0) {
3,135✔
255
      if (!bseShouldSwitchToTable(startTs, pMgt->timestamp, BSE_TIME_PRECISION(pBse), BSE_KEEY_DAYS(pBse))) {
×
256
        startTs = pMgt->timestamp;
×
257
      }
258
    }
259

260
    code = createSubTableMgt(startTs, 0, pMgt, &pMgt->pCurrTableMgt);
3,135✔
261
    TSDB_CHECK_CODE(code, lino, _error);
3,135✔
262

263
    pSubMgt = pMgt->pCurrTableMgt;
3,135✔
264
  } else {
265
    if (bseShouldSwitchToTable(startTs, pSubMgt->pBuilderMgt->timestamp, BSE_TIME_PRECISION(pBse),
18,111✔
266
                               BSE_KEEY_DAYS(pBse))) {
18,111✔
267
      code = bseCommit(pBse);
×
268
      TSDB_CHECK_CODE(code, lino, _error);
×
269

270
      destroySubTableMgt(pSubMgt);
×
271

272
      code = createSubTableMgt(startTs, 0, pMgt, &pMgt->pCurrTableMgt);
×
273
      TSDB_CHECK_CODE(code, lino, _error);
×
274
    }
275

276
    pSubMgt = pMgt->pCurrTableMgt;
18,111✔
277
  }
278

279
_error:
21,246✔
280
  if (code != 0) {
21,246✔
281
    bseError("failed to get sub table at line %d since %s", lino, tstrerror(code));
×
282
  } else {
283
    *ppSubGgt = pSubMgt;
21,246✔
284
  }
285

286
  return 0;
21,246✔
287
}
288
int32_t bseTableMgtAppend(STableMgt *pMgt, SBseBatch *pBatch) {
21,246✔
289
  int32_t code = 0;
21,246✔
290
  int32_t lino = 0;
21,246✔
291

292
  SBse         *pBse = pMgt->pBse;
21,246✔
293
  SSubTableMgt *pSubMgt = NULL;
21,246✔
294
  code = bseTableMgtGetTable(pMgt, &pSubMgt);
21,246✔
295
  TSDB_CHECK_CODE(code, lino, _error);
21,246✔
296

297
  code = tableBuilderMgtPutBatch(pSubMgt->pBuilderMgt, pBatch);
21,246✔
298
  TSDB_CHECK_CODE(code, lino, _error);
21,246✔
299

300
_error:
21,246✔
301
  if (code != 0) {
21,246✔
302
    bseError("failed to append table at line %d since %s", lino, tstrerror(code));
×
303
  }
304
  return code;
21,246✔
305
}
306

307
int32_t bseTableMgtGetLiveFileSet(STableMgt *pMgt, SArray **pList) {
×
308
  int32_t code = 0;
×
309
  return code;
×
310
}
311

312
int32_t bseTableMgtCommit(STableMgt *pMgt, SBseLiveFileInfo *pInfo) {
4,859,374✔
313
  int32_t code = 0;
4,859,374✔
314
  int32_t lino = 0;
4,859,374✔
315

316
  SSubTableMgt *pSubMgt = pMgt->pCurrTableMgt;
4,859,374✔
317
  if (pSubMgt == NULL) {
4,859,374✔
318
    bseInfo("nothing to commit table");
4,855,866✔
319
    return code;
4,855,866✔
320
  }
321

322
  code = tableBuilderMgtCommit(pSubMgt->pBuilderMgt, pInfo);
3,508✔
323
  TSDB_CHECK_CODE(code, lino, _error);
3,508✔
324
_error:
3,508✔
325
  if (code != 0) {
3,508✔
326
    bseError("failed to commit table at line %d since %s", lino, tstrerror(code));
×
327
  } else {
328
    bseInfo("succ to commit bse table");
3,508✔
329
  }
330
  return code;
3,508✔
331
}
332

333
int32_t bseTableMgtUpdateLiveFileSet(STableMgt *pMgt, SArray *pLiveFileSet) {
×
334
  int32_t code = 0;
×
335
  return code;
×
336
}
337

338
int32_t bseTableMgtSetBlockCacheSize(STableMgt *pMgt, int32_t cap) {
×
339
  int32_t code = 0;
×
340
  return code;
×
341
  // return blockCacheResize(pMgt->pReaderMgt->pBlockCache, cap);
342
}
343

344
int32_t bseTableMgtSetTableCacheSize(STableMgt *pMgt, int32_t cap) {
×
345
  int32_t code = 0;
×
346
  return code;
×
347
  // return tableCacheResize(pMgt->pReaderMgt->pTableCache, cap);
348
}
349

350
int32_t bseTableMgtClear(STableMgt *pMgt) {
×
351
  int32_t code = 0;
×
352
  int32_t lino = 0;
×
353
  if (pMgt == NULL) return 0;
×
354

355
  destroySubTableMgt(pMgt->pCurrTableMgt);
×
356

357
  void *pIter = taosHashIterate(pMgt->pHashObj, NULL);
×
358
  while (pIter) {
×
359
    SSubTableMgt **ppSubMgt = pIter;
×
360
    destroySubTableMgt(*ppSubMgt);
×
361
    pIter = taosHashIterate(pMgt->pHashObj, pIter);
×
362
  }
363
  taosHashClear(pMgt->pHashObj);
×
364

365
_error:
×
366
  if (code != 0) {
×
367
    bseError("failed to clear table at line %d since %s", lino, tstrerror(code));
×
368
  }
369
  return code;
×
370
}
371

372
void tableReaderFree(void *pReader) {
×
373
  STableReader *p = (STableReader *)pReader;
×
374
  if (p != NULL) {
×
375
    tableReaderClose(p);
×
376
  }
377
}
×
378
void blockFree(void *pBlock) { taosMemoryFree(pBlock); }
1,656✔
379

380
int32_t tableReaderMgtInit(STableReaderMgt *pReader, SBse *pBse, int64_t timestamp) {
3,687✔
381
  int32_t code = 0;
3,687✔
382
  int32_t lino = 0;
3,687✔
383

384
  (void)taosThreadRwlockInit(&pReader->mutex, NULL);
3,687✔
385

386
  code = blockCacheOpen(48, blockFree, &pReader->pBlockCache);
3,687✔
387
  TSDB_CHECK_CODE(code, lino, _error);
3,687✔
388

389
  code = tableCacheOpen(32, tableReaderFree, &pReader->pTableCache);
3,687✔
390
  TSDB_CHECK_CODE(code, lino, _error);
3,687✔
391

392
  pReader->pBse = pBse;
3,687✔
393
  pReader->timestamp = timestamp;
3,687✔
394

395
_error:
3,687✔
396
  if (code != 0) {
3,687✔
397
    bseError("failed to init table pReaderMgt mgt at line %d since %s", lino, tstrerror(code));
×
398
  }
399
  return code;
3,687✔
400
}
401

402
// int32_t tableReaderMgtClear(STableReaderMgt *pReader) {
403
//   int32_t code = 0;
404

405
//   (void)taosThreadRwlockWrlock(&pReader->mutex);
406

407
//   (void)(tableCacheClear(pReader->pTableCache));
408

409
//   (void)(blockCacheClear(pReader->pBlockCache));
410
//   (void)taosThreadRwlockUnlock(&pReader->mutex);
411

412
//   return code;
413
// }
414

415
void tableReaderMgtDestroy(STableReaderMgt *pReader) {
3,687✔
416
  tableCacheClose(pReader->pTableCache);
3,687✔
417
  blockCacheClose(pReader->pBlockCache);
3,687✔
418
  (void)taosThreadRwlockDestroy(&pReader->mutex);
3,687✔
419
}
3,687✔
420

421
int32_t tableReaderMgtSeek(STableReaderMgt *pReaderMgt, int64_t seq, uint8_t **pValue, int32_t *len) {
10,432,800✔
422
  int32_t code = 0;
10,432,800✔
423
  int32_t lino = 0;
10,432,800✔
424

425
  STableReader *pReader = NULL;
10,432,800✔
426

427
  code = tableReaderOpen(pReaderMgt->timestamp, &pReader, pReaderMgt);
10,432,800✔
428
  TSDB_CHECK_CODE(code, lino, _error);
10,432,800✔
429

430
  code = tableReaderGet(pReader, seq, pValue, len);
10,432,800✔
431
  TSDB_CHECK_CODE(code, lino, _error);
10,432,800✔
432

433
_error:
10,432,800✔
434
  if (code != 0) {
10,432,800✔
435
    bseError("failed to seek table pReaderMgt at line %d since %s", lino, tstrerror(code));
×
436
  }
437

438
  tableReaderClose(pReader);
10,432,800✔
439
  return code;
10,432,800✔
440
}
441

442
int32_t tableBuilderMgtInit(STableBuilderMgt *pMgt, SBse *pBse, int64_t timestamp) {
3,687✔
443
  int32_t code = 0;
3,687✔
444
  int32_t lino = 0;
3,687✔
445

446
  (void)taosThreadRwlockInit(&pMgt->mutex, NULL);
3,687✔
447
  pMgt->pBse = pBse;
3,687✔
448
  pMgt->timestamp = timestamp;
3,687✔
449
  return code;
3,687✔
450
}
451

452
int32_t tableBuilderMgtClear(STableBuilderMgt *pMgt) {
×
453
  int32_t code = 0;
×
454
  int32_t lino = 0;
×
455

456
  (void)taosThreadRwlockWrlock(&pMgt->mutex);
×
457
  tableBuilderClose(pMgt->p, 0);
×
458
  (void)taosThreadRwlockUnlock(&pMgt->mutex);
×
459
  return code;
×
460
}
461

462
int32_t tableBuilderMgtPutBatch(STableBuilderMgt *pMgt, SBseBatch *pBatch) {
21,246✔
463
  int32_t code = 0;
21,246✔
464
  int32_t lino = 0;
21,246✔
465
  int64_t seq = pBatch->startSeq;
21,246✔
466

467
  (void)taosThreadRwlockWrlock(&pMgt->mutex);
21,246✔
468
  STableBuilder *p = pMgt->p;
21,246✔
469

470
  if (p == NULL) {
21,246✔
471
    code = tableBuilderMgtOpenBuilder(pMgt, seq, &p);
3,135✔
472
    if (code != 0) {
3,135✔
473
      TSDB_CHECK_CODE(code, lino, _error);
×
474
    }
475
  }
476
  if (p->pMemTable == NULL) {
21,246✔
477
    code = bseMemTableCreate(&p->pMemTable, BSE_BLOCK_SIZE(pMgt->pBse));
3,118✔
478
    if (code != 0) {
3,118✔
479
      TSDB_CHECK_CODE(code, lino, _error);
×
480
    }
481

482
    p->pMemTable->pTableBuilder = p;
3,118✔
483
  }
484
  code = tableBuilderPut(p, pBatch);
21,246✔
485
_error:
21,246✔
486
  if (code != 0) {
21,246✔
487
    bseError("failed to put batch to table builder at line %d since %s", lino, tstrerror(code));
×
488
  } else {
489
    bseTrace("succ to put batch to table builder mem %p, imumm table %p", p->pMemTable, p->pImmuMemTable);
21,246✔
490
  }
491
  (void)taosThreadRwlockUnlock(&pMgt->mutex);
21,246✔
492

493
  return code;
21,246✔
494
}
495

496
int32_t tableBuilderMgtSeek(STableBuilderMgt *pMgt, int64_t seq, uint8_t **pValue, int32_t *len) {
20,876,955✔
497
  int32_t        code = 0;
20,876,955✔
498
  int32_t        lino = 0;
20,876,955✔
499
  STableBuilder *pBuilder = NULL;
20,876,955✔
500

501
  (void)taosThreadRwlockRdlock(&pMgt->mutex);
20,876,955✔
502
  pBuilder = pMgt->p;
20,876,955✔
503

504
  if (pBuilder) {
20,876,955✔
505
    code = tableBuilderGet(pBuilder, seq, pValue, len);
20,876,955✔
506
  } else {
507
    code = TSDB_CODE_OUT_OF_RANGE;  //  continue to read from reader
×
508
  }
509
  (void)taosThreadRwlockUnlock(&pMgt->mutex);
20,876,955✔
510
  return code;
20,876,955✔
511
}
512

513
int32_t tableBuilderMgtOpenBuilder(STableBuilderMgt *pMgt, int64_t seq, STableBuilder **pBuilder) {
3,135✔
514
  int32_t code = 0;
3,135✔
515
  int32_t lino = 0;
3,135✔
516

517
  SBse *pBse = pMgt->pBse;
3,135✔
518

519
  STableBuilder *p = NULL;
3,135✔
520

521
  code = tableBuilderOpen(pMgt->timestamp, &p, pBse);
3,135✔
522
  TSDB_CHECK_CODE(code, lino, _error);
3,135✔
523

524
  p->pTableMeta = pMgt->pMgt->pTableMetaMgt->pTableMeta;
3,135✔
525
  p->pBuilderMgt = pMgt;
3,135✔
526
  pMgt->p = p;
3,135✔
527

528
  *pBuilder = p;
3,135✔
529

530
_error:
3,135✔
531
  if (code != 0) {
3,135✔
532
    bseError("failed to open table builder at line %d since %s", __LINE__, tstrerror(code));
×
533
  }
534

535
  return code;
3,135✔
536
}
537

538
int32_t tableBuilderMgtRecover(STableBuilderMgt *pMgt, int64_t seq, STableBuilder **pBuilder, int64_t size) {
×
539
  int32_t        code = 0;
×
540
  int32_t        lino = 0;
×
541
  STableBuilder *pTable = NULL;
×
542

543
  code = tableBuilderMgtOpenBuilder(pMgt, seq, &pTable);
×
544
  TSDB_CHECK_CODE(code, lino, _error);
×
545

546
  if (pTable->offset > size) {
×
547
    code = tableBuilderTruncFile(pTable, size);
×
548
    TSDB_CHECK_CODE(code, lino, _error);
×
549
  }
550
_error:
×
551
  if (code != 0) {
×
552
    bseError("failed to open table builder at line %d since %s", lino, tstrerror(code));
×
553
  }
554

555
  return code;
×
556
}
557
int32_t tableBuilderMgtCommit(STableBuilderMgt *pMgt, SBseLiveFileInfo *pInfo) {
3,508✔
558
  int32_t        code = 0;
3,508✔
559
  int32_t        lino = 0;
3,508✔
560
  int8_t         flushIdx = -1;
3,508✔
561
  STableBuilder *pBuilder = NULL;
3,508✔
562

563
  (void)taosThreadRwlockWrlock(&pMgt->mutex);
3,508✔
564
  pBuilder = pMgt->p;
3,508✔
565

566
  bseInfo("start to commit bse table builder mem %p, immu mem %p", pBuilder->pMemTable, pBuilder->pImmuMemTable);
3,508✔
567
  pBuilder->pImmuMemTable = pBuilder->pMemTable;
3,508✔
568
  pBuilder->pMemTable = NULL;
3,508✔
569
  (void)taosThreadRwlockUnlock(&pMgt->mutex);
3,508✔
570

571
  code = tableBuilderCommit(pBuilder, pInfo);
3,508✔
572

573
_error:
3,508✔
574
  if (code != 0) {
3,508✔
575
    bseError("failed to commit table builder at line %d since %s", lino, tstrerror(code));
×
576
  } else {
577
    bseTrace("succ to commit bse table builder mem %p, immu mem %p", pBuilder->pMemTable, pBuilder->pImmuMemTable);
3,508✔
578
  }
579
  return code;
3,508✔
580
}
581

582
void tableBuilderMgtDestroy(STableBuilderMgt *pMgt) {
3,687✔
583
  tableBuilderClose(pMgt->p, 0);
3,687✔
584
  (void)taosThreadRwlockDestroy(&pMgt->mutex);
3,687✔
585
}
3,687✔
586

587
int32_t tableMetaMgtInit(STableMetaMgt *pMgt, SBse *pBse, int64_t timestamp) {
3,687✔
588
  int32_t code = 0;
3,687✔
589
  int32_t lino = 0;
3,687✔
590
  pMgt->pBse = pBse;
3,687✔
591

592
  code = tableMetaOpen(NULL, &pMgt->pTableMeta, pMgt);
3,687✔
593
  TSDB_CHECK_CODE(code, lino, _error);
3,687✔
594

595
  pMgt->timestamp = timestamp;
3,687✔
596
  pMgt->pTableMeta->timestamp = timestamp;
3,687✔
597
  pMgt->pTableMeta->pBse = pBse;
3,687✔
598

599
_error:
3,687✔
600
  if (code != 0) {
3,687✔
601
    bseError("failed to init table meta mgt at line %d since %s", lino, tstrerror(code));
×
602
  }
603
  return code;
3,687✔
604
}
605

606
static void tableMetaMgtDestroy(STableMetaMgt *pMgt) {
3,687✔
607
  if (pMgt->pTableMeta != NULL) {
3,687✔
608
    tableMetaClose(pMgt->pTableMeta);
3,687✔
609
    pMgt->pTableMeta = NULL;
3,687✔
610
  }
611
}
3,687✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc