• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4913

06 Jan 2026 01:30AM UTC coverage: 64.884% (-0.004%) from 64.888%
#4913

push

travis-ci

web-flow
merge: from main to 3.0 branch #34167

180 of 319 new or added lines in 14 files covered. (56.43%)

571 existing lines in 128 files now uncovered.

195016 of 300563 relevant lines covered (64.88%)

117540852.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.54
/source/dnode/vnode/src/bse/bseTable.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "bseTable.h"
17
#include "bse.h"
18
#include "bseCache.h"
19
#include "bseSnapshot.h"
20
#include "bseTableMgt.h"
21
#include "osMemPool.h"
22
#include "vnodeInt.h"
23

24
// table footer func
25
static int32_t footerEncode(STableFooter *pFooter, char *buf);
26
static int32_t footerDecode(STableFooter *pFooter, char *buf);
27

28
// block handle func
29
static int32_t blkHandleEncode(SBlkHandle *pHandle, char *buf);
30
static int32_t blkHandleDecode(SBlkHandle *pHandle, char *buf);
31

32
// table meta func
33
static int32_t metaBlockEncode(SMetaBlock *pMeta, char *buf);
34
static int32_t metaBlockDecode(SMetaBlock *pMeta, char *buf);
35

36
static int32_t metaBlockAdd(SBlock *p, SMetaBlock *pMeta);
37
static int32_t metaBlockGet(SBlock *p, SMetaBlock *pMeta);
38

39
// table footer func
40
static int32_t footerEncode(STableFooter *pFooter, char *buf);
41
static int32_t footerDecode(STableFooter *pFooter, char *buf);
42

43
// block func
44
static int32_t blockCreate(int32_t cap, SBlock **pBlock);
45
static void    blockDestroy(SBlock *pBlock);
46
static int32_t blockPut(SBlock *pBlock, int64_t seq, uint8_t *value, int32_t len);
47
static int32_t blockAppendBatch(SBlock *p, uint8_t *value, int32_t len);
48
static int32_t blockEsimateSize(SBlock *pBlock, int32_t extra);
49
static void    blockClear(SBlock *pBlock);
50
static int32_t blockSeek(SBlock *p, int64_t seq, uint8_t **pValue, int32_t *len);
51
static int8_t  blockGetType(SBlock *p);
52

53
static int32_t blockSeekMeta(SBlock *p, int64_t seq, SMetaBlock *pMeta);
54
static int32_t blockGetAllMeta(SBlock *p, SArray *pResult);
55
static int32_t metaBlockAddIndex(SBlock *p, SBlkHandle *pInfo);
56

57
static int32_t tableMetaWriterInit(SBTableMeta *pMeta, char *name, SBtableMetaWriter **ppWriter);
58
static int32_t tableMetaWriterCommit(SBtableMetaWriter *pMeta);
59
static void    tableMetaWriterClose(SBtableMetaWriter *p);
60
static int32_t tableMetaWriteAppendRawBlock(SBtableMetaWriter *pMeta, SBlockWrapper *pBlock, SBlkHandle *pBlkHandle);
61

62
static int32_t tableMetaReaderInit(SBTableMeta *pMeta, char *name, SBtableMetaReader **ppReader);
63
static void    tableMetaReaderClose(SBtableMetaReader *p);
64
static int32_t tableMetaReaderLoadIndex(SBtableMetaReader *p);
65

66
static int32_t tableMetaOpenFile(SBtableMetaWriter *pMeta, int8_t read, char *name);
67

68
static int32_t tableMetaReaderOpenIter(SBtableMetaReader *pReader, SBtableMetaReaderIter **pIter);
69
static int32_t tableMetaReaderIterNext(SBtableMetaReaderIter *pIter, SBlockWrapper *pDataWrapper,
70
                                       SBlkHandle *dstHandle);
71
static void    tableMetaReaderIterClose(SBtableMetaReaderIter *p);
72

73
// STable builder func
74
static int32_t tableBuilderGetBlockSize(STableBuilder *p);
75
static int32_t tableBuilderLoadBlock(STableBuilder *p, SBlkHandle *pHandle, SBlockWrapper *pBlkWrapper);
76
static int32_t tableBuilderSeek(STableBuilder *p, SBlkHandle *pHandle, int64_t seq, uint8_t **pValue, int32_t *len);
77
static void    tableBuilderUpdateBlockRange(STableBuilder *p, SBlockItemInfo *pInfo);
78
static void    tableBuildUpdateTableRange(STableBuilder *p, SBlockItemInfo *pInfo);
79

80
// STable pReaderMgt func
81

82
static int32_t tableReaderInitMeta(STableReader *p, SBlock *pBlock);
83

84
static int32_t tableReaderLoadRawBlock(STableReader *p, SBlkHandle *pHandle, SBlockWrapper *pBlkWrapper);
85
static int32_t tableReaderLoadRawMeta(STableReader *p, SBlkHandle *pHandle, SBlockWrapper *blkWrapper);
86
static int32_t tableReaderLoadRawMetaIndex(STableReader *p, SBlockWrapper *blkWrapper);
87
static int32_t tableReaderLoadRawFooter(STableReader *p, SBlockWrapper *blkWrapper);
88

89
static int32_t tableOpenFile(char *name, int8_t read, TdFilePtr *pFile, int64_t *size);
90
static int32_t tableFlushBlock(TdFilePtr pFile, SBlkHandle *pHandle, SBlockWrapper *pBlk, int32_t *nWrite);
91
static int32_t tableLoadBlock(TdFilePtr pFile, SBlkHandle *pHandle, SBlockWrapper *pBlk);
92
static int32_t tableLoadRawBlock(TdFilePtr pFile, SBlkHandle *pHandle, SBlockWrapper *pBlk, int8_t checkSum);
93

94
/*---block formate----*/
95
//---datatype--|---len---|--data---|--rawdatasize---|--compressType---|---checksum---|
96
//- int8_t   |  int32_t | uint8_t[] |    int32_t     |      int8_t     |    TSCKSUM|
97
#define BLOCK_ROW_SIZE_OFFSET(p)      (sizeof(SBlock) + (p)->len)
98
#define BLOCK_ROW_SIZE(p)             BLOCK_ROW_SIZE_OFFSET(p)
99
#define BLOCK_COMPRESS_TYPE_OFFSET(p) (BLOCK_ROW_SIZE_OFFSET(p) + sizeof(int32_t))
100
#define BLOCK_CHECKSUM_OFFSET(p)      (BLOCK_COMPRESS_TYPE_OFFSET(p) + sizeof(int8_t))
101
#define BLOCK_TOTAL_SIZE(p)           (BLOCK_CHECKSUM_OFFSET(p) + sizeof(TSCKSUM))
102

103
#define BLOCK_SET_ROW_SIZE(p, size) *(int32_t *)((char *)(p) + BLOCK_ROW_SIZE_OFFSET(p)) = (size)
104
#define BLOCK_GET_ROW_SIZE(p)       *(int32_t *)((char *)(p) + BLOCK_ROW_SIZE_OFFSET(p))
105

106
#define BLOCK_SET_COMPRESS_TYPE(p, type) *(int8_t *)((char *)(p) + BLOCK_COMPRESS_TYPE_OFFSET(p)) = (type)
107
#define BLOCK_GET_COMPRESS_TYPE(p)       *(int8_t *)((char *)(p) + BLOCK_COMPRESS_TYPE_OFFSET(p))
108

109
#define BLOCK_TAIL_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(TSCKSUM))
110

111
#define COMREPSS_DATA_SET_TYPE_AND_RAWLEN(p, len, type, rawLen) \
112
  do {                                                          \
113
    *(int32_t *)((char *)(p) + len) = (rawLen);                 \
114
    *(int8_t *)((char *)(p) + len + sizeof(int32_t)) = (type);  \
115
  } while (0);
116
#define COMPRESS_DATA_GET_TYPE_AND_RAWLEN(p, len, type, rawLen)                 \
117
  do {                                                                          \
118
    (rawLen) = *(int32_t *)((char *)(p) + len - BLOCK_TAIL_LEN);                \
119
    (type) = *(int8_t *)((char *)(p) + len - BLOCK_TAIL_LEN + sizeof(int32_t)); \
120
  } while (0);
121

UNCOV
122
int32_t tableBuilderSeek(STableBuilder *p, SBlkHandle *pHandle, int64_t seq, uint8_t **pValue, int32_t *len) {
×
UNCOV
123
  int32_t code = 0;
×
UNCOV
124
  int32_t lino = 0;
×
125

UNCOV
126
  SBlockWrapper blockWrapper = {0};
×
127

UNCOV
128
  code = tableBuilderLoadBlock(p, pHandle, &blockWrapper);
×
UNCOV
129
  TSDB_CHECK_CODE(code, lino, _error);
×
130

UNCOV
131
  code = blockSeek(blockWrapper.data, seq, pValue, len);
×
UNCOV
132
  TSDB_CHECK_CODE(code, lino, _error);
×
133

UNCOV
134
_error:
×
UNCOV
135
  if (code != 0) {
×
136
    bseError("failed to seek data from table builder at lino %d ince %s", lino, tstrerror(code));
×
137
  }
UNCOV
138
  blockWrapperCleanup(&blockWrapper);
×
UNCOV
139
  return code;
×
140
}
141

UNCOV
142
int32_t tableBuilderLoadBlock(STableBuilder *p, SBlkHandle *pHandle, SBlockWrapper *pBlkWrapper) {
×
UNCOV
143
  int32_t code = 0;
×
UNCOV
144
  int32_t lino = 0;
×
UNCOV
145
  code = blockWrapperInit(pBlkWrapper, pHandle->size);
×
UNCOV
146
  TSDB_CHECK_CODE(code, lino, _error);
×
147

UNCOV
148
  code = tableLoadBlock(p->pDataFile, pHandle, pBlkWrapper);
×
UNCOV
149
_error:
×
UNCOV
150
  if (code != 0) {
×
151
    bseError("failed to load block from table builder at lino %d since %s", lino, tstrerror(code));
×
152
  }
UNCOV
153
  return code;
×
154
}
155

156
int32_t tableBuilderOpen(int64_t ts, STableBuilder **pBuilder, SBse *pBse) {
3,135✔
157
  int32_t code = 0;
3,135✔
158
  int32_t lino = 0;
3,135✔
159

160
  char name[TSDB_FILENAME_LEN] = {0};
3,135✔
161
  char path[TSDB_FILENAME_LEN] = {0};
3,135✔
162
  bseBuildDataName(ts, name);
3,135✔
163
  bseBuildFullName(pBse, name, path);
3,135✔
164

165
  STableBuilder *p = taosMemoryCalloc(1, sizeof(STableBuilder));
3,135✔
166
  if (p == NULL) {
3,135✔
167
    TSDB_CHECK_CODE(terrno, lino, _error);
×
168
  }
169
  p->timestamp = ts;
3,135✔
170
  memcpy(p->name, name, strlen(name));
3,135✔
171

172
  p->blockCap = BSE_BLOCK_SIZE(pBse);
3,135✔
173

174
  code = bseMemTableCreate(&p->pMemTable, BSE_BLOCK_SIZE(pBse));
3,135✔
175
  TSDB_CHECK_CODE(code, lino, _error);
3,135✔
176

177
  p->compressType = BSE_COMPRESS_TYPE(pBse);
3,135✔
178
  TSDB_CHECK_CODE(code, lino, _error);
3,135✔
179

180
  seqRangeReset(&p->tableRange);
3,135✔
181
  seqRangeReset(&p->blockRange);
3,135✔
182

183
  p->pBse = pBse;
3,135✔
184
  code = tableOpenFile(path, 0, &p->pDataFile, &p->offset);
3,135✔
185
  p->blockCap = BSE_BLOCK_SIZE(pBse);
3,135✔
186

187
  *pBuilder = p;
3,135✔
188
  p->pMemTable->pTableBuilder = p;
3,135✔
189

190
_error:
3,135✔
191
  if (code != 0) {
3,135✔
192
    (void)tableBuilderClose(p, 0);
×
193
    bseError("failed to open table builder at line %d since %s", lino, tstrerror(code));
×
194
  }
195
  return code;
3,135✔
196
}
197

198
int32_t tableBuilderGetMetaBlock(STableBuilder *p, SArray **pMetaBlock) {
3,508✔
199
  return bseMemTablGetMetaBlock(p->pImmuMemTable, pMetaBlock);
3,508✔
200
}
201

202
int32_t tableBuilderAddMeta(STableBuilder *p, SBlkHandle *pHandle, int8_t immu) {
3,508✔
203
  int32_t code = 0;
3,508✔
204
  int32_t lino = 0;
3,508✔
205

206
  STableMemTable *pMemTable = immu ? p->pImmuMemTable : p->pMemTable;
3,508✔
207

208
  code = bseMemTableRef(pMemTable);
3,508✔
209
  TAOS_CHECK_GOTO(code, &lino, _error);
3,508✔
210

211
  code = bseMemTablePush(pMemTable, pHandle);
3,508✔
212
  TAOS_CHECK_GOTO(code, &lino, _error);
3,508✔
213

214
  seqRangeReset(&pMemTable->range);
3,508✔
215
_error:
3,508✔
216
  bseMemTableUnRef(pMemTable);
3,508✔
217
  return code;
3,508✔
218
}
219
int32_t tableBuilderSetBlockInfo(STableMemTable *pMemTable) {
3,508✔
220
  int32_t        code = 0;
3,508✔
221
  int32_t        lino = 0;
3,508✔
222
  SBlockWrapper *pWp = &pMemTable->pBlockWrapper;
3,508✔
223

224
  code = blockWrapperResize(pWp, BLOCK_TOTAL_SIZE((SBlock *)(pWp->data)) + pWp->kvSize);
3,508✔
225
  TSDB_CHECK_CODE(code, lino, _error);
3,508✔
226

227
  SBlock *pBlock = (SBlock *)pWp->data;
3,508✔
228

229
  pBlock->offset = pBlock->len;
3,508✔
230
  memcpy(pBlock->data + pBlock->len, pWp->kvBuffer, pWp->kvSize);
3,508✔
231
  pBlock->len += pWp->kvSize;
3,508✔
232
  pBlock->version = BSE_DATA_VER;
3,508✔
233
_error:
3,508✔
234
  return code;
3,508✔
235
}
236
int32_t tableBuilderFlush(STableBuilder *p, int8_t type, int8_t immutable) {
3,508✔
237
  int32_t code = 0;
3,508✔
238
  int32_t lino = 0;
3,508✔
239
  int8_t  inLock = 0;
3,508✔
240

241
  STableMemTable *pMemTable = immutable ? p->pImmuMemTable : p->pMemTable;
3,508✔
242
  if (p == NULL) return code;
3,508✔
243

244
  SBlockWrapper wrapper = {0};
3,508✔
245
  code = bseMemTableRef(pMemTable);
3,508✔
246
  TSDB_CHECK_CODE(code, lino, _error);
3,508✔
247

248
  if (immutable) {
3,508✔
249
    taosWLockLatch(&pMemTable->latch);
3,508✔
250
    inLock = 1;
3,508✔
251
  }
252

253
  code = tableBuilderSetBlockInfo(pMemTable);
3,508✔
254
  TSDB_CHECK_CODE(code, lino, _error);
3,508✔
255

256
  SBlock *pBlk = pMemTable->pBlockWrapper.data;
3,508✔
257
  if (pBlk->len == 0) {
3,508✔
258
    goto _error;
×
259
  }
260

261
  int8_t compressType = BSE_COMPRESS_TYPE(p->pBse);
3,508✔
262

263
  uint8_t *pWrite = (uint8_t *)pBlk;
3,508✔
264
  int32_t  len = BLOCK_TOTAL_SIZE(pBlk);
3,508✔
265

266
  pBlk->type = type;
3,508✔
267

268
  BLOCK_SET_COMPRESS_TYPE(pBlk, compressType);
3,508✔
269
  BLOCK_SET_ROW_SIZE(pBlk, BLOCK_ROW_SIZE(pBlk));
3,508✔
270

271
  if (compressType != kNoCompres) {
3,508✔
272
    code = blockWrapperInit(&wrapper, len + 16);
3,508✔
273
    TSDB_CHECK_CODE(code, lino, _error);
3,508✔
274

275
    int32_t compressSize = wrapper.cap;
3,508✔
276
    code = bseCompressData(compressType, pWrite, BLOCK_ROW_SIZE(pBlk), wrapper.data, &compressSize);
3,508✔
277
    if (code != 0) {
3,508✔
278
      bseWarn("failed to compress data since %s, not set compress", tstrerror(TSDB_CODE_THIRDPARTY_ERROR));
×
279

280
      blockWrapperCleanup(&wrapper);
×
281
      BLOCK_SET_COMPRESS_TYPE(pBlk, kNoCompres);
×
282
      BLOCK_SET_ROW_SIZE(pBlk, BLOCK_ROW_SIZE(pBlk));
×
283
    } else {
284
      int32_t rawSize = BLOCK_ROW_SIZE(pBlk);
3,508✔
285
      COMREPSS_DATA_SET_TYPE_AND_RAWLEN(wrapper.data, compressSize, compressType, rawSize);
3,508✔
286
      len = compressSize + BLOCK_TAIL_LEN;
3,508✔
287

288
      pWrite = (uint8_t *)wrapper.data;
3,508✔
289
    }
290
  }
291

292
  code = taosCalcChecksumAppend(0, (uint8_t *)pWrite, len);
3,508✔
293
  TSDB_CHECK_CODE(code, lino, _error);
3,508✔
294

295
  SBlkHandle handle = {.size = len, .offset = p->offset, .range = pMemTable->range};
3,508✔
296

297
  bseDebug("bse flush at offset %" PRId64 " len: %d, block range sseq:%" PRId64 ", eseq:%" PRId64 "", p->offset, len,
3,508✔
298
           handle.range.sseq, handle.range.eseq);
299

300
  int64_t n = taosLSeekFile(p->pDataFile, handle.offset, SEEK_SET);
3,508✔
301
  if (n < 0) {
3,508✔
302
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
303
  }
304

305
  int64_t nwrite = taosWriteFile(p->pDataFile, (uint8_t *)pWrite, len);
3,508✔
306
  if (nwrite != len) {
3,508✔
307
    code = terrno;
×
308
    TSDB_CHECK_CODE(code, lino, _error);
×
309
  }
310
  p->offset += len;
3,508✔
311

312
  code = tableBuilderAddMeta(p, &handle, immutable);
3,508✔
313

314
_error:
3,508✔
315
  if (code != 0) {
3,508✔
316
    bseError("failed to flush table builder at line %d since %s", lino, tstrerror(code));
×
317
  }
318

319
  if (pMemTable != NULL) {
3,508✔
320
    if (!immutable) blockWrapperClear(&pMemTable->pBlockWrapper);
3,508✔
321
    if (inLock) {
3,508✔
322
      taosWUnLockLatch(&pMemTable->latch);
3,508✔
323
    }
324
    bseMemTableUnRef(pMemTable);
3,508✔
325
  }
326
  blockWrapperCleanup(&wrapper);
3,508✔
327
  return code;
3,508✔
328
}
329

330
void tableBuildUpdateTableRange(STableBuilder *p, SBlockItemInfo *pInfo) {
17,979✔
331
  SSeqRange range = {.sseq = pInfo->seq, .eseq = pInfo->seq};
17,979✔
332
  seqRangeUpdate(&p->tableRange, &range);
17,979✔
333
}
17,979✔
334

335
void tableBuilderUpdateBlockRange(STableBuilder *p, SBlockItemInfo *pInfo) {
20,876,955✔
336
  SSeqRange range = {.sseq = pInfo->seq, .eseq = pInfo->seq};
20,876,955✔
337
  seqRangeUpdate(&p->blockRange, &range);
20,876,955✔
338
}
20,876,955✔
339
void memtableUpdateBlockRange(STableMemTable *p, SBlockItemInfo *pInfo) {
20,894,934✔
340
  SSeqRange range = {.sseq = pInfo->seq, .eseq = pInfo->seq};
20,894,934✔
341
  seqRangeUpdate(&p->range, &range);
20,894,934✔
342
  seqRangeUpdate(&p->tableRange, &range);
20,894,934✔
343
}
20,894,934✔
344

345
// table block data
346
// data1 data2 data3 data4 k1v1 k2v2, k3,v3 compresss size raw_size
347
//|seq len value|seq len value| seq len value| seq len value|
348
int32_t tableBuilderPut(STableBuilder *p, SBseBatch *pBatch) {
21,246✔
349
  int32_t code = 0;
21,246✔
350
  int32_t lino = 0;
21,246✔
351
  int32_t len = 0, offset = 0;
21,246✔
352
  int8_t  inLock = 0;
21,246✔
353

354
  code = bseMemTableRef(p->pMemTable);
21,246✔
355
  if (code != 0) {
21,246✔
356
    return code;
×
357
  }
358

359
  taosWLockLatch(&p->pMemTable->latch);
21,246✔
360
  inLock = 1;
21,246✔
361

362
  SBlockWrapper *pBlockWrapper = &p->pMemTable->pBlockWrapper;
21,246✔
363

364
  for (int32_t i = 0; i < taosArrayGetSize(pBatch->pSeq);) {
20,898,201✔
365
    SBlockItemInfo *pInfo = taosArrayGet(pBatch->pSeq, i);
20,876,955✔
366
    if (i == 0 || i == taosArrayGetSize(pBatch->pSeq) - 1) {
20,876,955✔
367
      tableBuildUpdateTableRange(p, pInfo);
17,979✔
368
      memtableUpdateBlockRange(p->pMemTable, pInfo);
17,979✔
369
    }
370

371
    if (atomic_load_8(&p->hasImmuMemTable) ||
41,753,910✔
372
        (blockWrapperSize(pBlockWrapper, len + pInfo->size) < tableBuilderGetBlockSize(p))) {
20,876,955✔
373
      i++;
20,876,955✔
374
      len += pInfo->size;
20,876,955✔
375
      tableBuilderUpdateBlockRange(p, pInfo);
20,876,955✔
376
      memtableUpdateBlockRange(p->pMemTable, pInfo);
20,876,955✔
377

378
      code = blockWrapperPushMeta(pBlockWrapper, pInfo->seq, NULL, pInfo->size);
20,876,955✔
379
      TSDB_CHECK_CODE(code, lino, _error);
20,876,955✔
380

381
      bseTrace("start to insert  bse table builder mem %p, idx %d", p->pMemTable, i);
20,876,955✔
382
      continue;
20,876,955✔
383
    } else {
384
      if (len > 0) {
×
385
        offset += blockAppendBatch(pBlockWrapper->data, pBatch->buf + offset, len);
×
386
      }
387
      bseTrace("start to flush bse table builder mem %p", p->pMemTable);
×
388
      code = tableBuilderFlush(p, BSE_TABLE_DATA_TYPE, 0);
×
389
      TSDB_CHECK_CODE(code, lino, _error);
×
390
      len = 0;
×
391
    }
392
  }
393

394
  if (offset < pBatch->len) {
21,246✔
395
    int32_t size = pBatch->len - offset;
14,667✔
396
    if (size > 0) {
14,667✔
397
      code = blockWrapperResize(pBlockWrapper,
14,667✔
398
                                size + BLOCK_TOTAL_SIZE((SBlock *)(pBlockWrapper->data)) + pBlockWrapper->kvSize);
14,667✔
399
      TSDB_CHECK_CODE(code, lino, _error);
14,667✔
400
    }
401

402
    if (blockAppendBatch(pBlockWrapper->data, pBatch->buf + offset, size) != size) {
14,667✔
403
      code = TSDB_CODE_INVALID_PARA;
×
404
    }
405
  }
406
_error:
21,246✔
407
  if (code != 0) {
21,246✔
408
    bseError("failed to append batch since %s", tstrerror(code));
×
409
  }
410
  if (inLock) {
21,246✔
411
    taosWUnLockLatch(&p->pMemTable->latch);
21,246✔
412
  }
413

414
  bseMemTableUnRef(p->pMemTable);
21,246✔
415
  return code;
21,246✔
416
}
417

418
int32_t tableBuilderTruncFile(STableBuilder *p, int64_t size) {
×
419
  int32_t code = 0;
×
420
  int32_t lino = 0;
×
421

422
  if (p->pDataFile == NULL) {
×
423
    return TSDB_CODE_INVALID_PARA;
×
424
  }
425
  code = taosFtruncateFile(p->pDataFile, size);
×
426
  TSDB_CHECK_CODE(code, lino, _error);
×
427

428
_error:
×
429
  if (code != 0) {
×
430
    bseError("failed to truncate file since %s", tstrerror(code));
×
431
  }
432
  return code;
×
433
}
434

435
int32_t compareFunc(const void *pLeft, const void *pRight) {
20,368,800✔
436
  SBlkHandle *p1 = (SBlkHandle *)pLeft;
20,368,800✔
437
  SBlkHandle *p2 = (SBlkHandle *)pRight;
20,368,800✔
438
  if (p1->range.sseq > p2->range.sseq) {
20,368,800✔
439
    return 1;
20,368,248✔
440
  } else if (p1->range.sseq < p2->range.sseq) {
552✔
441
    return -1;
×
442
  }
443
  return 0;
552✔
444
}
UNCOV
445
int32_t findTargetBlock(SArray *pMetaHandle, int64_t seq) {
×
UNCOV
446
  SBlkHandle handle = {.range = {.sseq = seq, .eseq = seq}};
×
UNCOV
447
  return taosArraySearchIdx(pMetaHandle, &handle, compareFunc, TD_LE);
×
448
}
449

450
int32_t findInMemtable(STableMemTable *p, int64_t seq, uint8_t **value, int32_t *len) {
20,876,955✔
451
  int32_t code = 0;
20,876,955✔
452
  int8_t  inBuf = 1;
20,876,955✔
453
  int32_t lino = 0;
20,876,955✔
454
  int8_t  inLock = 0;
20,876,955✔
455
  if (p == NULL) {
20,876,955✔
UNCOV
456
    return TSDB_CODE_NOT_FOUND;
×
457
  }
458

459
  code = bseMemTableRef(p);
20,876,955✔
460
  if (code != 0) {
20,876,955✔
461
    return code;
×
462
  }
463

464
  taosRLockLatch(&p->latch);
20,876,955✔
465
  inLock = 1;
20,876,955✔
466

467
  if (!seqRangeContains(&p->tableRange, seq)) {
20,876,955✔
468
    TSDB_CHECK_CODE(TSDB_CODE_NOT_FOUND, lino, _error);
×
469
  }
470

471
  if (taosArrayGetSize(p->pMetaHandle) > 0) {
20,876,955✔
UNCOV
472
    SBlkHandle *pHandle = taosArrayGetLast(p->pMetaHandle);
×
UNCOV
473
    if (!seqRangeIsGreater(&pHandle->range, seq)) {
×
UNCOV
474
      inBuf = 0;
×
UNCOV
475
      int32_t idx = findTargetBlock(p->pMetaHandle, seq);
×
UNCOV
476
      if (idx < 0) {
×
477
        TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_RANGE, lino, _error);
×
478
      }
UNCOV
479
      pHandle = taosArrayGet(p->pMetaHandle, idx);
×
UNCOV
480
      code = tableBuilderSeek(p->pTableBuilder, pHandle, seq, value, len);
×
UNCOV
481
      TSDB_CHECK_CODE(code, lino, _error);
×
482
    }
483
  }
484

485
  if (inBuf == 1) {
20,876,955✔
486
    code = blockWrapperSeek(&p->pBlockWrapper, seq, value, len);
20,876,955✔
487
    if (code != 0) {
20,876,955✔
488
      bseInfo("mem table range [%" PRId64 ", %" PRId64 "]", p->tableRange.sseq, p->tableRange.eseq);
×
489
    }
490
    TSDB_CHECK_CODE(code, lino, _error);
20,876,955✔
491
  }
492
_error:
20,876,955✔
493
  if (inLock) {
20,876,955✔
494
    taosRUnLockLatch(&p->latch);
20,876,955✔
495
  }
496
  bseMemTableUnRef(p);
20,876,955✔
497
  if (code != 0) {
20,876,955✔
498
    bseInfo("failed to find seq %" PRId64 " in memtable %p at line %d since %s", seq, p, lino, tstrerror(code));
×
499
  }
500
  return code;
20,876,955✔
501
}
502
int32_t tableBuilderGet(STableBuilder *p, int64_t seq, uint8_t **value, int32_t *len) {
20,876,955✔
503
  int32_t code = 0;
20,876,955✔
504
  if (p == NULL) {
20,876,955✔
505
    return TSDB_CODE_NOT_FOUND;
×
506
  }
507

508
  code = findInMemtable(p->pMemTable, seq, value, len);
20,876,955✔
509
  if (code != 0) {
20,876,955✔
UNCOV
510
    code = findInMemtable(p->pImmuMemTable, seq, value, len);
×
511
  }
512
  return code;
20,876,955✔
513
}
514

515
static void updateTableRange(SBTableMeta *pTableMeta, SArray *pMetaBlock) {
3,508✔
516
  if (pMetaBlock == NULL) {
3,508✔
517
    return;
×
518
  }
519

520
  for (int32_t i = 0; i < taosArrayGetSize(pMetaBlock); i++) {
7,016✔
521
    SMetaBlock *pMeta = taosArrayGet(pMetaBlock, i);
3,508✔
522
    seqRangeUpdate(&pTableMeta->range, &pMeta->range);
3,508✔
523
  }
524
}
525
static int32_t tableBuilderClearImmuMemTable(STableBuilder *p) {
3,508✔
526
  int32_t           code = 0;
3,508✔
527
  STableBuilderMgt *pMgt = p->pBuilderMgt;
3,508✔
528
  (void)taosThreadRwlockWrlock(&pMgt->mutex);
3,508✔
529
  atomic_store_8(&p->hasImmuMemTable, 0);
3,508✔
530
  bseMemTableUnRef(p->pImmuMemTable);
3,508✔
531
  p->pImmuMemTable = NULL;
3,508✔
532

533
  (void)taosThreadRwlockUnlock(&pMgt->mutex);
3,508✔
534
  return code;
3,508✔
535
}
536
static int32_t tableBuildeSwapMemTable(STableBuilder *p) {
×
537
  int32_t code = 0;
×
538
  (void)taosThreadRwlockWrlock(&p->pBse->rwlock);
×
539
  p->pImmuMemTable = p->pMemTable;
×
540
  p->pMemTable = NULL;
×
541

542
  atomic_store_8(&p->hasImmuMemTable, 1);
×
543

544
  (void)taosThreadRwlockUnlock(&p->pBse->rwlock);
×
545
  return code;
×
546
}
547

548
int32_t tableBuilderCommit(STableBuilder *p, SBseLiveFileInfo *pInfo) {
3,508✔
549
  int32_t code = 0;
3,508✔
550
  int32_t lino = 0;
3,508✔
551

552
  STableCommitInfo commitInfo = {0};
3,508✔
553
  SArray          *pMetaBlock = NULL;
3,508✔
554
  if (p == NULL) {
3,508✔
555
    return TSDB_CODE_INVALID_PARA;
×
556
  }
557

558
  code = tableBuilderFlush(p, BSE_TABLE_DATA_TYPE, 1);
3,508✔
559
  TSDB_CHECK_CODE(code, lino, _error);
3,508✔
560

561
  code = taosFsyncFile(p->pDataFile);
3,508✔
562
  TSDB_CHECK_CODE(code, lino, _error);
3,508✔
563

564
  code = tableBuilderGetMetaBlock(p, &pMetaBlock);
3,508✔
565
  TSDB_CHECK_CODE(code, lino, _error);
3,508✔
566

567
  if (taosArrayGetSize(pMetaBlock) == 0) {
3,508✔
568
    bseDebug("no meta block to commit for table %s", p->name);
×
569
    return code;
×
570
  }
571

572
  code = tableMetaCommit(p->pTableMeta, pMetaBlock);
3,508✔
573
  TSDB_CHECK_CODE(code, lino, _error);
3,508✔
574

575
  updateTableRange(p->pTableMeta, pMetaBlock);
3,508✔
576

577
  pInfo->level = 0;
3,508✔
578
  pInfo->range = p->pTableMeta->range;
3,508✔
579
  pInfo->timestamp = p->timestamp;
3,508✔
580
  pInfo->size = p->offset;
3,508✔
581

582
  code = tableBuilderClearImmuMemTable(p);
3,508✔
583
  TSDB_CHECK_CODE(code, lino, _error);
3,508✔
584

585
_error:
3,508✔
586
  if (code != 0) {
3,508✔
587
    bseError("failed to commit table builder at line %d since %s ", lino, tstrerror(code));
×
588
  } else {
589
    bseInfo("succ to commit table %s", p->name);
3,508✔
590
  }
591
  taosArrayDestroy(pMetaBlock);
3,508✔
592
  return code;
3,508✔
593
}
594

595
int32_t tableBuilderGetBlockSize(STableBuilder *p) { return p->blockCap; }
20,876,955✔
596

597
void tableBuilderClose(STableBuilder *p, int8_t commited) {
3,687✔
598
  if (p == NULL) {
3,687✔
599
    return;
552✔
600
  }
601

602
  bseMemTableUnRef(p->pMemTable);
3,135✔
603
  bseMemTableUnRef(p->pImmuMemTable);
3,135✔
604

605
  if (taosCloseFile(&p->pDataFile) != 0) {
3,135✔
606
    bseError("failed to close table builder file %s since %s", p->name, tstrerror(terrno));
×
607
  }
608
  taosMemoryFree(p);
3,135✔
609
}
610

611
static void addSnapshotMetaToBlock(SBlockWrapper *pBlkWrapper, SSeqRange range, int8_t fileType, int8_t blockType,
×
612
                                   int64_t timestamp) {
613
  SBseSnapMeta *pSnapMeta = pBlkWrapper->data;
×
614

615
  pSnapMeta->range = range;
×
616
  pSnapMeta->fileType = fileType;
×
617
  pSnapMeta->blockType = blockType;
×
618
  pSnapMeta->timestamp = timestamp;
×
619
  return;
×
620
}
621

622
static void updateSnapshotMeta(SBlockWrapper *pBlkWrapper, SSeqRange range, int8_t fileType, int8_t blockType,
×
623
                               int64_t timestamp) {
624
  SBseSnapMeta *pSnapMeta = (SBseSnapMeta *)pBlkWrapper->data;
×
625
  pSnapMeta->timestamp = timestamp;
×
626
  return;
×
627
}
628
int32_t tableReaderLoadRawBlock(STableReader *p, SBlkHandle *pHandle, SBlockWrapper *blkWrapper) {
×
629
  int32_t code = 0;
×
630
  int32_t lino = 0;
×
631

632
  code = blockWrapperResize(blkWrapper, pHandle->size + sizeof(SBseSnapMeta));
×
633
  TSDB_CHECK_CODE(code, lino, _error);
×
634

635
  code = tableLoadRawBlock(p->pDataFile, pHandle, blkWrapper, 1);
×
636
  TSDB_CHECK_CODE(code, lino, _error);
×
637

638
  addSnapshotMetaToBlock(blkWrapper, p->range, BSE_TABLE_SNAP, BSE_TABLE_DATA_TYPE, p->timestamp);
×
639

640
_error:
×
641
  if (code != 0) {
×
642
    bseError("table reader failed to load block at line %d since %s", lino, tstrerror(code));
×
643
  }
644
  return code;
×
645
}
646

647
int32_t tableReaderLoadRawMeta(STableReader *p, SBlkHandle *pHandle, SBlockWrapper *blkWrapper) {
×
648
  int32_t code = 0;
×
649
  int32_t lino = 0;
×
650

651
  SBtableMetaReader *pReader = p->pMetaReader;
×
652

653
  code = blockWrapperResize(blkWrapper, pHandle->size + sizeof(SBseSnapMeta));
×
654
  TSDB_CHECK_CODE(code, lino, _error);
×
655

656
  code = tableLoadRawBlock(pReader->pFile, pHandle, blkWrapper, 1);
×
657
  TSDB_CHECK_CODE(code, lino, _error);
×
658

659
  addSnapshotMetaToBlock(blkWrapper, p->range, BSE_TABLE_META_SNAP, BSE_TABLE_META_TYPE, p->timestamp);
×
660
_error:
×
661
  if (code != 0) {
×
662
    bseError("failed to load raw meta from table pReaderMgt at line %d lino since %s", lino, tstrerror(code));
×
663
  }
664
  return code;
×
665
}
666
int32_t tableReaderLoadRawMetaIndex(STableReader *p, SBlockWrapper *blkWrapper) {
×
667
  int32_t code = 0;
×
668
  int32_t lino = 0;
×
669

670
  SBtableMetaReader *pReader = p->pMetaReader;
×
671
  SBlkHandle        *pHandle = p->pMetaReader->footer.metaHandle;
×
672

673
  code = blockWrapperResize(blkWrapper, pHandle->size + sizeof(SBseSnapMeta));
×
674
  TSDB_CHECK_CODE(code, lino, _error);
×
675

676
  code = tableLoadRawBlock(pReader->pFile, pHandle, blkWrapper, 1);
×
677
  TSDB_CHECK_CODE(code, lino, _error);
×
678

679
  addSnapshotMetaToBlock(blkWrapper, p->range, BSE_TABLE_META_SNAP, BSE_TABLE_META_INDEX_TYPE, p->timestamp);
×
680
_error:
×
681
  if (code != 0) {
×
682
    bseError("failed to load raw meta from table pReaderMgt at line %d lino since %s", lino, tstrerror(code));
×
683
  }
684
  return code;
×
685
}
686
int32_t tableReaderLoadRawFooter(STableReader *p, SBlockWrapper *blkWrapper) {
×
687
  int32_t code = 0;
×
688
  int32_t lino = 0;
×
689
  char    buf[kEncodeLen] = {0};
×
690

691
  SBtableMetaReader *pReader = p->pMetaReader;
×
692
  code = footerEncode(&pReader->footer, buf);
×
693
  int32_t len = sizeof(buf);
×
694

695
  int64_t n = taosLSeekFile(pReader->pFile, -kEncodeLen, SEEK_END);
×
696
  if (n < 0) {
×
697
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
698
  }
699

700
  if (taosReadFile(pReader->pFile, buf, sizeof(buf)) != len) {
×
701
    TSDB_CHECK_CODE(terrno, lino, _error);
×
702
  }
703

704
  code = blockWrapperResize(blkWrapper, len + sizeof(SBseSnapMeta));
×
705
  TSDB_CHECK_CODE(code, lino, _error);
×
706

707
  memcpy((uint8_t *)blkWrapper->data + sizeof(SBseSnapMeta), buf, sizeof(buf));
×
708
  blkWrapper->size = len + sizeof(SBseSnapMeta);
×
709

710
  addSnapshotMetaToBlock(blkWrapper, p->range, BSE_TABLE_META_SNAP, BSE_TABLE_FOOTER_TYPE, p->timestamp);
×
711
_error:
×
712
  if (code != 0) {
×
713
    bseError("failed to load raw footer from table pReaderMgt at lino %d since %s", lino, tstrerror(code));
×
714
  }
715
  return code;
×
716
}
717

718
int32_t tableReaderOpen(int64_t timestamp, STableReader **pReader, void *pReaderMgt) {
10,432,800✔
719
  char data[TSDB_FILENAME_LEN] = {0};
10,432,800✔
720
  char meta[TSDB_FILENAME_LEN] = {0};
10,432,800✔
721

722
  char dataPath[TSDB_FILENAME_LEN] = {0};
10,432,800✔
723

724
  int32_t code = 0;
10,432,800✔
725
  int32_t lino = 0;
10,432,800✔
726
  int64_t size = 0;
10,432,800✔
727

728
  STableReaderMgt *pMgt = (STableReaderMgt *)pReaderMgt;
10,432,800✔
729
  if (pMgt == NULL) {
10,432,800✔
730
    return TSDB_CODE_INVALID_CFG;
×
731
  }
732

733
  SSubTableMgt *pMeta = pMgt->pMgt;
10,432,800✔
734

735
  STableReader *p = taosMemCalloc(1, sizeof(STableReader));
10,432,800✔
736
  if (p == NULL) {
10,432,800✔
737
    TSDB_CHECK_CODE(terrno, lino, _error);
×
738
  }
739

740
  bseBuildDataName(timestamp, data);
10,432,800✔
741

742
  p->timestamp = timestamp;
10,432,800✔
743
  p->blockCap = 1024;
10,432,800✔
744
  p->pReaderMgt = pReaderMgt;
10,432,800✔
745
  memcpy(p->name, data, strlen(data));
10,432,800✔
746

747
  bseBuildFullName(pMgt->pBse, data, dataPath);
10,432,800✔
748
  code = tableOpenFile(dataPath, 1, &p->pDataFile, &p->fileSize);
10,432,800✔
749
  TSDB_CHECK_CODE(code, lino, _error);
10,432,800✔
750

751
  code = blockWrapperInit(&p->blockWrapper, 1024);
10,432,800✔
752
  TSDB_CHECK_CODE(code, lino, _error);
10,432,800✔
753

754
  bseBuildMetaName(timestamp, meta);
10,432,800✔
755
  code = tableMetaReaderInit(pMeta->pTableMetaMgt->pTableMeta, meta, &p->pMetaReader);
10,432,800✔
756
  TSDB_CHECK_CODE(code, lino, _error);
10,432,800✔
757

758
  *pReader = p;
10,432,800✔
759

760
_error:
10,432,800✔
761
  if (code != 0) {
10,432,800✔
762
    tableReaderClose(p);
×
763
    bseError("failed to open table pReaderMgt at line %d since %s", lino, tstrerror(code));
×
764
  }
765
  return code;
10,432,800✔
766
}
767
void tableReaderShouldPutToCache(STableReader *p, int8_t cache) { p->putInCache = cache; }
×
768

769
int32_t tableReaderGet(STableReader *p, int64_t seq, uint8_t **pValue, int32_t *len) {
10,432,800✔
770
  int32_t    lino = 0;
10,432,800✔
771
  int32_t    code = 0;
10,432,800✔
772
  SMetaBlock block = {0};
10,432,800✔
773

774
  STableReaderMgt   *pMgt = (STableReaderMgt *)p->pReaderMgt;
10,432,800✔
775
  SBtableMetaReader *pMeta = p->pMetaReader;
10,432,800✔
776

777
  code = tableMetaReaderLoadBlockMeta(pMeta, seq, &block);
10,432,800✔
778
  TSDB_CHECK_CODE(code, lino, _error);
10,432,800✔
779

780
  SBlockWrapper wrapper = {0};
10,432,800✔
781
  SBlkHandle    blkhandle = {.offset = block.offset, .size = block.size, .range = block.range};
10,432,800✔
782

783
  SCacheItem *pItem = NULL;
10,432,800✔
784
  code = blockCacheGet(pMgt->pBlockCache, &blkhandle.range, (void **)&pItem);
10,432,800✔
785
  if (code != 0) {
10,432,800✔
786
    code = blockWrapperInit(&wrapper, block.size + 16);
1,656✔
787
    TSDB_CHECK_CODE(code, lino, _error);
1,656✔
788

789
    bseDebug("block size:%" PRId64 ", offset:%" PRId64 ", [sseq:%" PRId64 ", eseq:%" PRId64 "]", block.size,
1,656✔
790
             block.offset, block.range.sseq, block.range.eseq);
791

792
    code = tableLoadBlock(p->pDataFile, &blkhandle, &wrapper);
1,656✔
793
    if (code != 0) {
1,656✔
794
      blockWrapperCleanup(&wrapper);
×
795
      TSDB_CHECK_CODE(code, lino, _error);
×
796
    }
797

798
    code = blockCachePut(pMgt->pBlockCache, &block.range, wrapper.data);
1,656✔
799
    TSDB_CHECK_CODE(code, lino, _error);
1,656✔
800

801
  } else {
802
    wrapper.data = pItem->pItem;
10,431,144✔
803
    wrapper.pCachItem = pItem;
10,431,144✔
804
  }
805

806
  code = blockSeek(wrapper.data, seq, pValue, len);
10,432,800✔
807
  TSDB_CHECK_CODE(code, lino, _error);
10,432,800✔
808

809
  if (wrapper.pCachItem != NULL) {
10,432,800✔
810
    bseCacheUnrefItem(wrapper.pCachItem);
10,431,144✔
811
  }
812
  blockWrapperClearMeta(&wrapper);
10,432,800✔
813

814
_error:
10,432,800✔
815
  if (code != 0) {
10,432,800✔
816
    bseError("failed to get table reader data at line %d since %s", lino, tstrerror(code));
×
817
  }
818
  return code;
10,432,800✔
819
}
820
int32_t tableReaderGetMeta(STableReader *p, SArray **pMeta) {
×
821
  int32_t code = 0;
×
822
  int32_t lino = 0;
×
823

824
  SArray *pMetaHandle = taosArrayInit(128, sizeof(SBlkHandle));
×
825
  if (pMetaHandle == NULL) {
×
826
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
827
  }
828

829
  code = tableMetaReaderLoadAllDataHandle(p->pMetaReader, pMetaHandle);
×
830
  TSDB_CHECK_CODE(code, lino, _error);
×
831

832
  *pMeta = pMetaHandle;
×
833

834
_error:
×
835
  if (code != 0) {
×
836
    bseError("failed to get table reader meta at lino %d since %s", lino, tstrerror(code));
×
837
  }
838
  return code;
×
839
}
840

841
void tableReaderClose(STableReader *p) {
10,432,800✔
842
  if (p == NULL) return;
10,432,800✔
843
  int32_t code = 0;
10,432,800✔
844

845
  taosArrayDestroy(p->pMetaHandle);
10,432,800✔
846

847
  if (taosCloseFile(&p->pDataFile) != 0) {
10,432,800✔
848
    bseError("failed to close table reader file %s since %s", p->name, tstrerror(terrno));
×
849
  }
850
  tableMetaReaderClose(p->pMetaReader);
10,432,800✔
851
  blockWrapperCleanup(&p->blockWrapper);
10,432,800✔
852

853
  taosMemoryFree(p);
10,432,800✔
854
}
855

856
int32_t blockCreate(int32_t cap, SBlock **p) {
×
857
  int32_t code = 0;
×
858
  SBlock *t = taosMemCalloc(1, cap);
×
859
  if (t == NULL) {
×
860
    return terrno;
×
861
  }
862
  *p = t;
×
863
  return code;
×
864
}
865

866
int32_t blockEsimateSize(SBlock *p, int32_t extra) { return BLOCK_TOTAL_SIZE(p) + extra; }
20,880,463✔
867

868
int32_t blockWrapperSize(SBlockWrapper *p, int32_t extra) {
20,876,955✔
869
  if (p == NULL || p->data == NULL) {
20,876,955✔
870
    return 0;
×
871
  }
872

873
  return p->kvSize + blockEsimateSize(p->data, extra) + 12;
20,876,955✔
874
}
875
int32_t blockAppendBatch(SBlock *p, uint8_t *value, int32_t len) {
14,667✔
876
  int32_t  code = 0;
14,667✔
877
  int32_t  offset = 0;
14,667✔
878
  uint8_t *data = (uint8_t *)p->data + p->len;
14,667✔
879
  memcpy(data, value, len);
14,667✔
880
  p->len += len;
14,667✔
881
  return len;
14,667✔
882
}
883
int32_t blockPut(SBlock *p, int64_t seq, uint8_t *value, int32_t len) {
×
884
  int32_t  code = 0;
×
885
  uint8_t *data = (uint8_t *)p->data + p->len;
×
886

887
  int32_t offset = taosEncodeVariantI64((void **)&data, seq);
×
888
  offset += taosEncodeVariantI32((void **)&data, len);
×
889
  offset += taosEncodeBinary((void **)&data, value, len);
×
890
  p->len += len;
×
891
  return offset;
×
892
}
893
void blockClear(SBlock *p) {
7,016✔
894
  p->len = 0;
7,016✔
895
  p->type = 0;
7,016✔
896
  p->data[0] = 0;
7,016✔
897
}
7,016✔
898

899
int32_t blockSeek(SBlock *p, int64_t seq, uint8_t **pValue, int32_t *len) {
10,432,800✔
900
  int8_t  found = 0;
10,432,800✔
901
  int32_t code = 0;
10,432,800✔
902
  int32_t offset = 0;
10,432,800✔
903

904
  uint8_t *p1 = (uint8_t *)p->data;
10,432,800✔
905
  uint8_t *p2 = p1 + p->offset;
10,432,800✔
906
  while (p2 - p1 < p->len) {
2,147,483,647✔
907
    int64_t k;
2,147,483,647✔
908
    int32_t v;
2,147,483,647✔
909
    p2 = taosDecodeVariantI64(p2, &k);
2,147,483,647✔
910
    p2 = taosDecodeVariantI32(p2, &v);
2,147,483,647✔
911

912
    if (seq == k) {
2,147,483,647✔
913
      *len = v;
10,432,800✔
914
      found = 1;
10,432,800✔
915
      *pValue = taosMemoryCalloc(1, v);
10,432,800✔
916
      if (*pValue == NULL) {
10,432,800✔
917
        return terrno;
×
918
      }
919
      memcpy(*pValue, (uint8_t *)p->data + offset, v);
10,432,800✔
920
      break;
10,432,800✔
921
    }
922
    offset += v;
2,147,483,647✔
923
  }
924
  if (found == 0) {
10,432,800✔
925
    code = TSDB_CODE_NOT_FOUND;
×
926
  }
927
  return code;
10,432,800✔
928
}
929

930
int32_t blockWrapperSeek(SBlockWrapper *p, int64_t tgt, uint8_t **pValue, int32_t *len) {
20,876,955✔
931
  int32_t code = 0;
20,876,955✔
932
  if (p == NULL || p->data == NULL) {
20,876,955✔
933
    return TSDB_CODE_NOT_FOUND;
×
934
  }
935
  int32_t  offset = 0;
20,876,955✔
936
  uint8_t *p1 = p->kvBuffer;
20,876,955✔
937
  uint8_t *p2 = p1;
20,876,955✔
938
  SBlock  *pBlk = (SBlock *)p->data;
20,876,955✔
939
  while ((p2 - p1) < p->kvSize) {
2,147,483,647✔
940
    int64_t seq = 0;
2,147,483,647✔
941
    int32_t vlen = 0;
2,147,483,647✔
942
    p2 = taosDecodeVariantI64(p2, &seq);
2,147,483,647✔
943
    p2 = taosDecodeVariantI32(p2, &vlen);
2,147,483,647✔
944

945
    if (seq == tgt) {
2,147,483,647✔
946
      *len = vlen;
20,876,955✔
947
      *pValue = taosMemoryCalloc(1, vlen);
20,876,955✔
948
      if (*pValue == NULL) {
20,876,955✔
949
        return terrno;
×
950
      }
951
      uint8_t *pdata = (uint8_t *)pBlk->data + offset;
20,876,955✔
952
      memcpy(*pValue, pdata, vlen);
20,876,955✔
953
      return 0;
20,876,955✔
954
    }
955
    offset += vlen;
2,147,483,647✔
956
  }
957
  bseInfo("blockWrapperSeek not found seq:%" PRId64 ", tgt:%" PRId64 "", tgt, tgt);
×
958
  return TSDB_CODE_BLOB_SEQ_NOT_FOUND;
×
959
}
960

961
int8_t blockGetType(SBlock *p) { return p->type; }
10,435,560✔
962
void   blockDestroy(SBlock *pBlock) { taosMemoryFree(pBlock); }
×
963

964
int32_t metaBlockAddIndex(SBlock *p, SBlkHandle *pInfo) {
5,164✔
965
  int32_t  code = 0;
5,164✔
966
  uint8_t *data = (uint8_t *)p->data + p->len;
5,164✔
967
  int32_t  offset = blkHandleEncode(pInfo, (char *)data);
5,164✔
968
  p->len += offset;
5,164✔
969
  return offset;
5,164✔
970
}
971

972
int32_t blkHandleEncode(SBlkHandle *pHandle, char *buf) {
12,180✔
973
  char   *p = buf;
12,180✔
974
  int32_t tlen = 0;
12,180✔
975
  tlen += taosEncodeVariantU64((void **)&p, pHandle->offset);
12,180✔
976
  tlen += taosEncodeVariantU64((void **)&p, pHandle->size);
12,180✔
977
  tlen += taosEncodeVariantI64((void **)&p, pHandle->range.sseq);
12,180✔
978
  tlen += taosEncodeVariantI64((void **)&p, pHandle->range.eseq);
12,180✔
979
  return tlen;
12,180✔
980
}
981
int32_t blkHandleDecode(SBlkHandle *pHandle, char *buf) {
46,206,264✔
982
  char *p = buf;
46,206,264✔
983
  p = taosDecodeVariantU64(p, &pHandle->offset);
46,206,264✔
984
  p = taosDecodeVariantU64(p, &pHandle->size);
46,206,264✔
985
  p = taosDecodeVariantI64(p, &pHandle->range.sseq);
46,206,264✔
986
  p = taosDecodeVariantI64(p, &pHandle->range.eseq);
46,206,264✔
987
  return p - buf;
46,206,264✔
988
}
989

990
// | meta handle | index handle | padding | magic number high | magic number low |
991
int32_t footerEncode(STableFooter *pFooter, char *buf) {
3,508✔
992
  char   *p = buf;
3,508✔
993
  int32_t len = 0;
3,508✔
994
  len += blkHandleEncode(pFooter->metaHandle, p + len);
3,508✔
995
  len += blkHandleEncode(pFooter->indexHandle, p + len);
3,508✔
996

997
  p = buf + kEncodeLen - 8;
3,508✔
998
  len += taosEncodeFixedU32((void **)&p, kMagicNum);
3,508✔
999
  len += taosEncodeFixedU32((void **)&p, kMagicNum);
3,508✔
1000
  return 0;
3,508✔
1001
}
1002
int32_t footerDecode(STableFooter *pFooter, char *buf) {
10,433,904✔
1003
  int32_t  code = 0;
10,433,904✔
1004
  char    *p = buf;
10,433,904✔
1005
  char    *mp = buf + kEncodeLen - 8;
10,433,904✔
1006
  uint32_t ml, mh;
10,433,904✔
1007

1008
  if (taosDecodeFixedU32(mp, &ml) == NULL) {
10,433,904✔
1009
    return TSDB_CODE_FILE_CORRUPTED;
×
1010
  }
1011
  if (taosDecodeFixedU32(mp + 4, &mh) == NULL) {
20,867,808✔
1012
    return TSDB_CODE_FILE_CORRUPTED;
×
1013
  }
1014

1015
  if (ml != kMagicNum || mh != kMagicNum) {
10,433,904✔
1016
    return TSDB_CODE_FILE_CORRUPTED;
×
1017
  }
1018

1019
  int32_t len = blkHandleDecode(pFooter->metaHandle, buf);
10,433,904✔
1020
  if (len < 0) {
10,433,904✔
1021
    return TSDB_CODE_FILE_CORRUPTED;
×
1022
  }
1023

1024
  len = blkHandleDecode(pFooter->indexHandle, buf + len);
10,433,904✔
1025
  if (len < 0) {
10,433,904✔
1026
    return TSDB_CODE_FILE_CORRUPTED;
×
1027
  }
1028
  return code;
10,433,904✔
1029
}
1030

1031
int32_t blockSeekMeta(SBlock *pBlock, int64_t seq, SMetaBlock *pMeta) {
10,432,800✔
1032
  int32_t  code = 0;
10,432,800✔
1033
  int32_t  len = 0;
10,432,800✔
1034
  uint8_t *p = (uint8_t *)pBlock->data;
10,432,800✔
1035

1036
  while (len < pBlock->len) {
10,432,800✔
1037
    SMetaBlock meta = {0};
10,432,800✔
1038
    int32_t    offset = metaBlockDecode(&meta, (char *)p);
10,432,800✔
1039
    if (seqRangeContains(&meta.range, seq)) {
10,432,800✔
1040
      memcpy(pMeta, &meta, sizeof(SMetaBlock));
10,432,800✔
1041
      return 0;
10,432,800✔
1042
    }
1043
    len += offset;
×
1044
    p += offset;
×
1045
  }
1046
  return TSDB_CODE_NOT_FOUND;
×
1047
}
1048
int32_t blockGetAllMeta(SBlock *pBlock, SArray *pMeta) {
×
1049
  int32_t  code = 0;
×
1050
  int32_t  len = 0;
×
1051
  uint8_t *p = (uint8_t *)pBlock->data;
×
1052

1053
  while (len < pBlock->len) {
×
1054
    SMetaBlock meta = {0};
×
1055
    int32_t    offset = metaBlockDecode(&meta, (char *)p);
×
1056
    if (taosArrayPush(pMeta, &meta) == NULL) {
×
1057
      return terrno;
×
1058
    }
1059
    len += offset;
×
1060
    p += offset;
×
1061
  }
1062

1063
  return code;
×
1064
}
1065

1066
int32_t metaBlockEncode(SMetaBlock *pMeta, char *buf) {
3,508✔
1067
  char   *p = buf;
3,508✔
1068
  int32_t len = 0;
3,508✔
1069
  len += taosEncodeFixedI8((void **)&p, pMeta->type);
3,508✔
1070
  len += taosEncodeFixedI8((void **)&p, pMeta->version);
3,508✔
1071
  len += taosEncodeFixedI16((void **)&p, pMeta->reserve);
3,508✔
1072
  len += taosEncodeVariantI64((void **)&p, pMeta->offset);
3,508✔
1073
  len += taosEncodeVariantI64((void **)&p, pMeta->size);
3,508✔
1074
  len += taosEncodeVariantI64((void **)&p, pMeta->range.sseq);
3,508✔
1075
  len += taosEncodeVariantI64((void **)&p, pMeta->range.eseq);
3,508✔
1076
  return len;
3,508✔
1077
}
1078
int32_t metaBlockDecode(SMetaBlock *pMeta, char *buf) {
10,432,800✔
1079
  char   *p = buf;
10,432,800✔
1080
  int32_t len = 0;
10,432,800✔
1081
  p = taosDecodeFixedI8(p, &pMeta->type);
10,432,800✔
1082
  p = taosDecodeFixedI8(p, &pMeta->version);
10,432,800✔
1083
  p = taosDecodeFixedI16(p, &pMeta->reserve);
10,432,800✔
1084
  p = taosDecodeVariantI64(p, &pMeta->offset);
10,432,800✔
1085
  p = taosDecodeVariantI64(p, &pMeta->size);
10,432,800✔
1086
  p = taosDecodeVariantI64(p, &pMeta->range.sseq);
10,432,800✔
1087
  p = taosDecodeVariantI64(p, &pMeta->range.eseq);
10,432,800✔
1088
  return p - buf;
10,432,800✔
1089
}
1090
int32_t metaBlockAdd(SBlock *p, SMetaBlock *pBlk) {
3,508✔
1091
  int32_t  code = 0;
3,508✔
1092
  uint8_t *data = (uint8_t *)p->data + p->len;
3,508✔
1093
  int32_t  offset = metaBlockEncode(pBlk, (char *)data);
3,508✔
1094
  p->len += offset;
3,508✔
1095
  return offset;
3,508✔
1096
}
1097
int32_t metaBlockGet(SBlock *p, SMetaBlock *pBlk) {
×
1098
  int32_t  code = 0;
×
1099
  uint8_t *data = (uint8_t *)p->data + p->len;
×
1100
  int32_t  offset = metaBlockDecode(pBlk, (char *)data);
×
1101
  p->len += offset;
×
1102
  return offset;
×
1103
}
1104

1105
int32_t tableFlushBlock(TdFilePtr pFile, SBlkHandle *pHandle, SBlockWrapper *pBlkW, int32_t *nWrite) {
8,672✔
1106
  int32_t code = 0;
8,672✔
1107
  int32_t lino = 0;
8,672✔
1108

1109
  SBlock *pBlk = pBlkW->data;
8,672✔
1110
  if (pBlk->len == 0) {
8,672✔
1111
    return 0;
×
1112
  }
1113
  pBlk->version = BSE_META_VER;
8,672✔
1114
  int8_t compressType = kNoCompres;
8,672✔
1115

1116
  SBlockWrapper wrapper = {0};
8,672✔
1117

1118
  uint8_t *pWrite = (uint8_t *)pBlk;
8,672✔
1119
  int32_t  len = BLOCK_TOTAL_SIZE(pBlk);
8,672✔
1120

1121
  BLOCK_SET_COMPRESS_TYPE(pBlk, compressType);
8,672✔
1122
  BLOCK_SET_ROW_SIZE(pBlk, BLOCK_ROW_SIZE(pBlk));
8,672✔
1123

1124
  if (compressType != kNoCompres) {
8,672✔
1125
    code = blockWrapperInit(&wrapper, len + 4);
×
1126
    TSDB_CHECK_CODE(code, lino, _error);
×
1127

1128
    int32_t compressSize = wrapper.cap;
×
1129
    code = bseCompressData(compressType, pWrite, BLOCK_ROW_SIZE(pBlk), wrapper.data, &compressSize);
×
1130
    if (code != 0) {
×
1131
      bseWarn("failed to compress data since %s, not set compress", tstrerror(TSDB_CODE_THIRDPARTY_ERROR));
×
1132

1133
      blockWrapperCleanup(&wrapper);
×
1134
      BLOCK_SET_COMPRESS_TYPE(pBlk, kNoCompres);
×
1135
      BLOCK_SET_ROW_SIZE(pBlk, BLOCK_ROW_SIZE(pBlk));
×
1136
    } else {
1137
      int32_t rawSize = BLOCK_ROW_SIZE(pBlk);
×
1138
      COMREPSS_DATA_SET_TYPE_AND_RAWLEN(wrapper.data, compressSize, compressType, rawSize);
×
1139
      len = compressSize + BLOCK_TAIL_LEN;
×
1140

1141
      pWrite = (uint8_t *)wrapper.data;
×
1142
    }
1143
  }
1144

1145
  code = taosCalcChecksumAppend(0, (uint8_t *)pWrite, len);
8,672✔
1146
  TSDB_CHECK_CODE(code, lino, _error);
8,672✔
1147

1148
  int64_t n = taosLSeekFile(pFile, pHandle->offset, SEEK_SET);
8,672✔
1149
  if (n < 0) {
8,672✔
1150
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1151
  }
1152

1153
  int32_t nwrite = taosWriteFile(pFile, (uint8_t *)pWrite, len);
8,672✔
1154
  if (nwrite != len) {
8,672✔
1155
    code = terrno;
×
1156
    TSDB_CHECK_CODE(code, lino, _error);
×
1157
  }
1158
  *nWrite = nwrite;
8,672✔
1159
  blockWrapperCleanup(&wrapper);
8,672✔
1160
_error:
8,672✔
1161
  if (code != 0) {
8,672✔
1162
    bseError("failed to flush table builder at line %d since %s", lino, tstrerror(code));
×
1163
  } else {
1164
    bseDebug("flush at offset %" PRId64 ", size %d", pHandle->offset, len);
8,672✔
1165
  }
1166
  return code;
8,672✔
1167
}
1168
int32_t tableLoadBlock(TdFilePtr pFile, SBlkHandle *pHandle, SBlockWrapper *pBlkW) {
20,870,016✔
1169
  int32_t code = 0;
20,870,016✔
1170
  int32_t lino = 0;
20,870,016✔
1171

1172
  code = blockWrapperResize(pBlkW, pHandle->size + 16);
20,870,016✔
1173
  TSDB_CHECK_CODE(code, lino, _error);
20,870,016✔
1174

1175
  SBlock  *pBlk = pBlkW->data;
20,870,016✔
1176
  uint8_t *pRead = (uint8_t *)pBlk;
20,870,016✔
1177

1178
  SBlockWrapper pHelp = {0};
20,870,016✔
1179

1180
  int64_t n = taosLSeekFile(pFile, pHandle->offset, SEEK_SET);
20,870,016✔
1181
  if (n < 0) {
20,870,016✔
1182
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1183
  }
1184

1185
  int32_t nr = taosReadFile(pFile, pRead, pHandle->size);
20,870,016✔
1186
  if (nr != pHandle->size) {
20,870,016✔
1187
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _error);
×
1188
  }
1189

1190
  if (taosCheckChecksumWhole((uint8_t *)pRead, pHandle->size) != 1) {
41,740,032✔
1191
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _error);
×
1192
  }
1193
  uint8_t compressType = 0;
20,870,016✔
1194
  int32_t rawSize = 0;
20,870,016✔
1195

1196
  COMPRESS_DATA_GET_TYPE_AND_RAWLEN(pRead, pHandle->size, compressType, rawSize);
20,870,016✔
1197

1198
  if (compressType != kNoCompres) {
20,870,016✔
1199
    code = blockWrapperInit(&pHelp, rawSize);
1,656✔
1200
    TSDB_CHECK_CODE(code, lino, _error);
1,656✔
1201

1202
    int32_t unCompressSize = pHelp.cap;
1,656✔
1203
    code = bseDecompressData(compressType, pRead, pHandle->size - BLOCK_TAIL_LEN, pHelp.data, &unCompressSize);
1,656✔
1204
    if (code != 0) {
1,656✔
1205
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _error);
×
1206
    }
1207

1208
    SBlock *p = pHelp.data;
1,656✔
1209
    if (BLOCK_ROW_SIZE_OFFSET(p) != unCompressSize) {
1,656✔
1210
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _error);
×
1211
    }
1212
    blockWrapperCleanup(pBlkW);
1,656✔
1213

1214
    blockWrapperTransfer(pBlkW, &pHelp);
1,656✔
1215

1216
  } else {
1217
    if (pBlk->len != (pHandle->size - BLOCK_TAIL_LEN - sizeof(SBlock))) {
20,868,360✔
1218
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _error);
×
1219
    }
1220
  }
1221
_error:
20,870,016✔
1222
  if (code != 0) {
20,870,016✔
1223
    bseError("failed to load block at lino %d since %s, read at offset %" PRId64 ", size:%" PRId64 "", lino,
×
1224
             tstrerror(code), pHandle->offset, pHandle->size);
1225
  } else {
1226
    bseDebug("read at offset %" PRId64 ", size %" PRId64 "", pHandle->offset, pHandle->size);
20,870,016✔
1227
  }
1228

1229
  blockWrapperCleanup(&pHelp);
20,870,016✔
1230
  return code;
20,870,016✔
1231
}
1232
int32_t tableLoadRawBlock(TdFilePtr pFile, SBlkHandle *pHandle, SBlockWrapper *pBlkW, int8_t checkSum) {
×
1233
  int32_t code = 0;
×
1234
  int32_t lino = 0;
×
1235

1236
  SBlock  *pBlk = pBlkW->data;
×
1237
  uint8_t *pRead = (uint8_t *)pBlk + sizeof(SBseSnapMeta);
×
1238

1239
  int64_t n = taosLSeekFile(pFile, pHandle->offset, SEEK_SET);
×
1240
  if (n < 0) {
×
1241
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1242
  }
1243

1244
  int32_t nr = taosReadFile(pFile, pRead, pHandle->size);
×
1245
  if (nr != pHandle->size) {
×
1246
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _error);
×
1247
  }
1248

1249
  if (checkSum) {
×
1250
    if (taosCheckChecksumWhole((uint8_t *)pRead, pHandle->size) != 1) {
×
1251
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _error);
×
1252
    }
1253
  }
1254

1255
  pBlkW->size = pHandle->size + sizeof(SBseSnapMeta);
×
1256
_error:
×
1257
  if (code != 0) {
×
1258
    bseError("failed to load block at lino %d since %s", lino, tstrerror(code));
×
1259
  }
1260
  return code;
×
1261
}
1262

1263
int8_t seqRangeContains(SSeqRange *p, int64_t seq) { return seq >= p->sseq && seq <= p->eseq; }
62,118,665✔
1264

1265
void seqRangeReset(SSeqRange *p) {
22,284✔
1266
  p->sseq = -1;
22,284✔
1267
  p->eseq = -1;
22,284✔
1268
}
22,284✔
1269

UNCOV
1270
int8_t seqRangeIsGreater(SSeqRange *p, int64_t seq) { return seq > p->eseq; }
×
1271

1272
void seqRangeUpdate(SSeqRange *dst, SSeqRange *src) {
62,695,130✔
1273
  if (dst->sseq == -1) {
62,695,130✔
1274
    dst->sseq = src->sseq;
19,360✔
1275
  }
1276
  dst->eseq = src->eseq;
62,695,130✔
1277
}
62,695,130✔
1278

1279
int32_t blockWrapperInit(SBlockWrapper *p, int32_t cap) {
20,889,197✔
1280
  int32_t code = 0;
20,889,197✔
1281
  int32_t lino = 0;
20,889,197✔
1282
  p->data = taosMemoryCalloc(1, cap);
20,889,197✔
1283
  if (p->data == NULL) {
20,889,197✔
1284
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1285
  }
1286

1287
  p->kvSize = 0;
20,889,197✔
1288
  p->kvCap = 128;
20,889,197✔
1289
  p->kvBuffer = taosMemoryCalloc(1, p->kvCap);
20,889,197✔
1290
  if (p->kvBuffer == NULL) {
20,889,197✔
1291
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1292
  }
1293

1294
  SBlock *block = (SBlock *)p->data;
20,889,197✔
1295
  block->offset = 0;
20,889,197✔
1296
  block->version = 0;
20,889,197✔
1297
  p->cap = cap;
20,889,197✔
1298
_error:
20,889,197✔
1299
  if (code != 0) {
20,889,197✔
1300
    blockWrapperCleanup(p);
×
1301
  }
1302
  return code;
20,889,197✔
1303
}
1304
int32_t blockWrapperPushMeta(SBlockWrapper *p, int64_t seq, uint8_t *value, int32_t len) {
20,876,955✔
1305
  int32_t code = 0;
20,876,955✔
1306
  if ((p->kvSize + 12) > p->kvCap) {
20,876,955✔
1307
    if (p->kvCap == 0) {
17,664✔
1308
      p->kvCap = 128;
×
1309
    } else {
1310
      p->kvCap *= 2;
17,664✔
1311
    }
1312

1313
    void *data = taosMemoryRealloc(p->kvBuffer, p->kvCap);
17,664✔
1314
    if (data == NULL) {
17,664✔
1315
      return terrno;
×
1316
    }
1317
    p->kvBuffer = data;
17,664✔
1318
  }
1319
  uint8_t *data = (uint8_t *)p->kvBuffer + p->kvSize;
20,876,955✔
1320
  p->kvSize += taosEncodeVariantI64((void **)&data, seq);
20,876,955✔
1321
  p->kvSize += taosEncodeVariantI32((void **)&data, len);
20,876,955✔
1322
  return code;
20,876,955✔
1323
}
1324

1325
void blockWrapperClearMeta(SBlockWrapper *p) {
10,432,800✔
1326
  if (p->kvBuffer != NULL) {
10,432,800✔
1327
    taosMemoryFree(p->kvBuffer);
1,656✔
1328
  }
1329
  p->kvSize = 0;
10,432,800✔
1330
  p->kvCap = 0;
10,432,800✔
1331
}
10,432,800✔
1332

1333
void blockWrapperCleanup(SBlockWrapper *p) {
41,766,229✔
1334
  if (p->data != NULL) {
41,766,229✔
1335
    taosMemoryFree(p->data);
20,887,541✔
1336
    p->data = NULL;
20,887,541✔
1337
  }
1338
  p->kvSize = 0;
41,766,229✔
1339
  taosMemoryFreeClear(p->kvBuffer);
41,766,229✔
1340
  p->cap = 0;
41,766,229✔
1341
}
41,766,229✔
1342

1343
void blockWrapperTransfer(SBlockWrapper *dst, SBlockWrapper *src) {
1,656✔
1344
  if (dst == NULL || src == NULL) {
1,656✔
1345
    return;
×
1346
  }
1347
  dst->data = src->data;
1,656✔
1348
  dst->cap = src->cap;
1,656✔
1349

1350
  dst->kvBuffer = src->kvBuffer;
1,656✔
1351
  dst->kvSize = src->kvSize;
1,656✔
1352
  dst->kvCap = src->kvCap;
1,656✔
1353

1354
  src->kvBuffer = NULL;
1,656✔
1355
  src->kvSize = 0;
1,656✔
1356
  src->kvCap = 0;
1,656✔
1357

1358
  src->data = NULL;
1,656✔
1359
  src->cap = 0;
1,656✔
1360
}
1361

1362
int32_t blockWrapperResize(SBlockWrapper *p, int32_t newCap) {
20,896,863✔
1363
  if (p->cap < newCap) {
20,896,863✔
1364
    int32_t cap = p->cap;
3,508✔
1365
    if (cap == 0) cap = 1024;
3,508✔
1366
    while (cap < newCap) {
52,620✔
1367
      cap = cap * 2;
49,112✔
1368
    }
1369
    void *data = taosMemoryRealloc(p->data, cap);
3,508✔
1370
    if (data == NULL) {
3,508✔
1371
      return terrno;
×
1372
    }
1373
    p->data = data;
3,508✔
1374
    p->cap = cap;
3,508✔
1375
  }
1376
  return 0;
20,896,863✔
1377
}
1378

1379
void blockWrapperClear(SBlockWrapper *p) {
7,016✔
1380
  if (p->data == NULL) {
7,016✔
1381
    return;
×
1382
  }
1383
  SBlock *block = (SBlock *)p->data;
7,016✔
1384
  p->kvSize = 0;
7,016✔
1385
  p->size = 0;
7,016✔
1386
  blockClear(block);
7,016✔
1387
}
1388

1389
void blockWrapperSetType(SBlockWrapper *p, int8_t type) {
8,672✔
1390
  SBlock *block = (SBlock *)p->data;
8,672✔
1391
  block->type = type;
8,672✔
1392
}
8,672✔
1393

1394
int32_t tableReaderIterInit(int64_t timestamp, int8_t type, STableReaderIter **ppIter, SBse *pBse) {
×
1395
  int32_t    code = 0;
×
1396
  int32_t    lino = 0;
×
1397
  STableMgt *pTableMgt = pBse->pTableMgt;
×
1398

1399
  STableReaderIter *p = taosMemCalloc(1, sizeof(STableReaderIter));
×
1400
  if (p == NULL) {
×
1401
    return terrno;
×
1402
  }
1403

1404
  p->timestamp = timestamp;
×
1405
  SSubTableMgt *retentionMgt = NULL;
×
1406

1407
  code = createSubTableMgt(timestamp, 1, pBse->pTableMgt, &retentionMgt);
×
1408
  TSDB_CHECK_CODE(code, lino, _error);
×
1409

1410
  p->pSubMgt = retentionMgt;
×
1411

1412
  code = tableReaderOpen(timestamp, &p->pTableReader, retentionMgt->pReaderMgt);
×
1413
  TSDB_CHECK_CODE(code, lino, _error);
×
1414

1415
  tableReaderShouldPutToCache(p->pTableReader, 0);
×
1416

1417
  p->blockIndex = 0;
×
1418
  p->blockType = type;
×
1419

1420
  if (p->blockType == BSE_TABLE_DATA_TYPE) {
×
1421
    code = tableReaderGetMeta(p->pTableReader, &p->pMetaHandle);
×
1422
    TSDB_CHECK_CODE(code, lino, _error);
×
1423

1424
  } else if (p->blockType == BSE_TABLE_META_TYPE) {
×
1425
    p->pMetaHandle = taosArrayInit(8, sizeof(SBlkHandle));
×
1426
    if (p->pMetaHandle == NULL) {
×
1427
      TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1428
    }
1429
    code = tableMetaReaderLoadMetaHandle(p->pTableReader->pMetaReader, p->pMetaHandle);
×
1430
  } else {
1431
    p->isOver = 1;
×
1432
  }
1433
  *ppIter = p;
×
1434

1435
_error:
×
1436
  if (code != 0) {
×
1437
    bseError("failed to init table reader iter since %s", tstrerror(code));
×
1438
    tableReaderIterDestroy(p);
×
1439
  }
1440
  return code;
×
1441
}
1442

1443
int32_t tableReaderIterNext(STableReaderIter *pIter, uint8_t **pValue, int32_t *len) {
×
1444
  int32_t      code = 0;
×
1445
  int32_t      lino = 0;
×
1446
  SBseSnapMeta snapMeta = {0};
×
1447
  snapMeta.range.sseq = -1;
×
1448
  snapMeta.range.eseq = -1;
×
1449
  snapMeta.timestamp = pIter->timestamp;
×
1450
  snapMeta.fileType = pIter->fileType;
×
1451
  snapMeta.blockType = pIter->blockType;
×
1452

1453
  if (pIter->blockType == BSE_TABLE_DATA_TYPE) {
×
1454
    SBlkHandle *pHandle = NULL;
×
1455
    if (pIter->blockIndex >= taosArrayGetSize(pIter->pMetaHandle)) {
×
1456
      taosArrayDestroy(pIter->pMetaHandle);
×
1457
      pIter->pMetaHandle = NULL;
×
1458
      pIter->blockIndex = 0;
×
1459
      pIter->isOver = 1;
×
1460
      return 0;
×
1461
    } else {
1462
      pHandle = taosArrayGet(pIter->pMetaHandle, pIter->blockIndex);
×
1463
      bseDebug("file type %d, block type: %d,block index %d, offset %" PRId64 ", size %" PRId64 ", range [%" PRId64
×
1464
               ", %" PRId64 "]",
1465
               pIter->fileType, pIter->blockType, pIter->blockIndex, pHandle->offset, pHandle->size,
1466
               pHandle->range.sseq, pHandle->range.eseq);
1467
      code = tableReaderLoadRawBlock(pIter->pTableReader, pHandle, &pIter->blockWrapper);
×
1468
      TSDB_CHECK_CODE(code, lino, _error);
×
1469

1470
      pIter->blockIndex++;
×
1471
    }
1472

1473
  } else if (pIter->blockType == BSE_TABLE_META_TYPE) {
×
1474
    SBlkHandle *pHandle = NULL;
×
1475
    if (pIter->blockIndex >= taosArrayGetSize(pIter->pMetaHandle)) {
×
1476
      taosArrayDestroy(pIter->pMetaHandle);
×
1477
      pIter->pMetaHandle = NULL;
×
1478
      pIter->blockIndex = 0;
×
1479
      pIter->blockType = BSE_TABLE_META_INDEX_TYPE;
×
1480
    } else {
1481
      pHandle = taosArrayGet(pIter->pMetaHandle, pIter->blockIndex);
×
1482

1483
      bseDebug("file type %d, block type: %d,block index %d, offset %" PRId64 ", size %" PRId64 ", range [%" PRId64
×
1484
               ", %" PRId64 "]",
1485
               pIter->fileType, pIter->blockType, pIter->blockIndex, pHandle->offset, pHandle->size,
1486
               pHandle->range.sseq, pHandle->range.eseq);
1487
      code = tableReaderLoadRawMeta(pIter->pTableReader, pHandle, &pIter->blockWrapper);
×
1488
      TSDB_CHECK_CODE(code, lino, _error);
×
1489
      pIter->blockIndex++;
×
1490
    }
1491
  }
1492

1493
  if (pIter->blockType == BSE_TABLE_META_INDEX_TYPE) {
×
1494
    code = tableReaderLoadRawMetaIndex(pIter->pTableReader, &pIter->blockWrapper);
×
1495
    TSDB_CHECK_CODE(code, lino, _error);
×
1496

1497
    pIter->blockType = BSE_TABLE_FOOTER_TYPE;
×
1498
  } else if (pIter->blockType == BSE_TABLE_FOOTER_TYPE) {
×
1499
    code = tableReaderLoadRawFooter(pIter->pTableReader, &pIter->blockWrapper);
×
1500
    TSDB_CHECK_CODE(code, lino, _error);
×
1501

1502
    pIter->blockType = BSE_TABLE_END_TYPE;
×
1503
  } else if (pIter->blockType == BSE_TABLE_END_TYPE) {
×
1504
    pIter->isOver = 1;
×
1505
  }
1506

1507
_error:
×
1508
  if (code != 0) {
×
1509
    bseError("failed to load block since %s", tstrerror(code));
×
1510
    pIter->isOver = 1;
×
1511
  }
1512
  SSeqRange range = {0};
×
1513
  if (pIter->blockWrapper.data != NULL) {
×
1514
    updateSnapshotMeta(&pIter->blockWrapper, range, pIter->fileType, pIter->blockType, snapMeta.timestamp);
×
1515
    *pValue = pIter->blockWrapper.data;
×
1516
    *len = pIter->blockWrapper.size;
×
1517
  }
1518
  return code;
×
1519
}
1520

1521
int8_t tableReaderIterValid(STableReaderIter *pIter) { return pIter->isOver == 0; }
×
1522

1523
int32_t bseReadCurrentSnap(SBse *pBse, uint8_t **pValue, int32_t *len) {
×
1524
  int32_t   code = 0;
×
1525
  char      path[128] = {0};
×
1526
  int32_t   lino = 0;
×
1527
  TdFilePtr fd = NULL;
×
1528
  int64_t   sz = 0;
×
1529
  char      name[TSDB_FILENAME_LEN] = {0};
×
1530

1531
  uint8_t *pCurrent = NULL;
×
1532

1533
  bseBuildCurrentFullName(pBse, name);
×
1534
  if (taosCheckExistFile(name) == 0) {
×
1535
    bseInfo("vgId:%d, no current meta file found, skip recover", BSE_VGID(pBse));
×
1536
    return 0;
×
1537
  }
1538
  code = taosStatFile(name, &sz, NULL, NULL);
×
1539
  TSDB_CHECK_CODE(code, lino, _error);
×
1540

1541
  fd = taosOpenFile(name, TD_FILE_READ);
×
1542
  if (fd == NULL) {
×
1543
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1544
  }
1545
  pCurrent = (uint8_t *)taosMemoryCalloc(1, sizeof(SBseSnapMeta) + sz);
×
1546
  if (pCurrent == NULL) {
×
1547
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1548
  }
1549

1550
  int64_t nread = taosReadFile(fd, pCurrent + sizeof(SBseSnapMeta), sz);
×
1551
  if (nread != sz) {
×
1552
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1553
  }
1554
  if (taosCloseFile(&fd) != 0) {
×
1555
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1556
  }
1557

1558
  SBseSnapMeta *pMeta = (SBseSnapMeta *)(pCurrent);
×
1559
  pMeta->fileType = BSE_CURRENT_SNAP;
×
1560

1561
  *pValue = pCurrent;
×
1562

1563
  *len = sz + sizeof(SBseSnapMeta);
×
1564
_error:
×
1565
  if (code != 0) {
×
1566
    bseError("vgId:%d, failed to read current at line %d since %s", BSE_VGID(pBse), lino, tstrerror(code));
×
1567
    if (taosCloseFile(&fd) != 0) {
×
1568
      bseError("failed to close file %s since %s", name, tstrerror(terrno));
×
1569
    }
1570
    taosMemoryFree(pCurrent);
×
1571
  }
1572
  return code;
×
1573
}
1574

1575
void tableReaderIterDestroy(STableReaderIter *pIter) {
×
1576
  if (pIter == NULL) return;
×
1577

1578
  taosArrayDestroy(pIter->pMetaHandle);
×
1579
  tableReaderClose(pIter->pTableReader);
×
1580
  blockWrapperCleanup(&pIter->blockWrapper);
×
1581
  destroySubTableMgt(pIter->pSubMgt);
×
1582
  taosMemoryFree(pIter);
×
1583
}
1584

1585
int32_t blockWithMetaInit(SBlock *pBlock, SBlockWithMeta **pMeta) {
×
1586
  int32_t code = 0;
×
1587
  int32_t lino = 0;
×
1588

1589
  SBlockWithMeta *p = taosMemCalloc(1, sizeof(SBlockWithMeta));
×
1590
  if (p == NULL) {
×
1591
    return terrno;
×
1592
  }
1593
  p->pBlock = pBlock;
×
1594
  p->pMeta = taosArrayInit(8, sizeof(SBlockIndexMeta));
×
1595
  if (p->pMeta == NULL) {
×
1596
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1597
  }
1598

1599
  uint8_t *p1 = (uint8_t *)pBlock->data;
×
1600
  uint8_t *p2 = (uint8_t *)p1;
×
1601
  while (p2 - p1 < pBlock->len) {
×
1602
    int64_t         k;
×
1603
    int32_t         vlen = 0;
×
1604
    SBlockIndexMeta meta = {0};
×
1605
    int32_t         offset = 0;
×
1606
    p2 = taosDecodeVariantI64((void **)p2, &k);
×
1607
    offset = p2 - p1;
×
1608
    p2 = taosDecodeVariantI32((void **)p2, &vlen);
×
1609

1610
    meta.seq = k;
×
1611
    meta.offset = offset;
×
1612
    if (taosArrayPush(p->pMeta, &meta) == NULL) {
×
1613
      TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1614
    }
1615
    p2 += vlen;
×
1616
  }
1617

1618
  *pMeta = p;
×
1619
_error:
×
1620
  if (code != 0) {
×
1621
    bseError("failed to init block with meta since %s", tstrerror(code));
×
1622
    blockWithMetaCleanup(p);
×
1623
  }
1624
  return code;
×
1625
}
1626

1627
void blockWithMetaCleanup(SBlockWithMeta *p) {
×
1628
  if (p == NULL) return;
×
1629
  taosArrayDestroy(p->pMeta);
×
1630
  taosMemoryFree(p);
×
1631
  return;
×
1632
}
1633

1634
int comprareFunc(const void *pLeft, const void *pRight) {
×
1635
  SBlockIndexMeta *p1 = (SBlockIndexMeta *)pLeft;
×
1636
  SBlockIndexMeta *p2 = (SBlockIndexMeta *)pRight;
×
1637
  if (p1->seq > p2->seq) {
×
1638
    return 1;
×
1639
  } else if (p1->seq < p2->seq) {
×
1640
    return -1;
×
1641
  }
1642
  return 0;
×
1643
}
1644

1645
int32_t blockWithMetaSeek(SBlockWithMeta *p, int64_t seq, uint8_t **pValue, int32_t *len) {
×
1646
  int32_t         code = 0;
×
1647
  SBlockIndexMeta key = {.seq = seq, .offset = 0};
×
1648
  int32_t         idx = taosArraySearchIdx(p->pMeta, &seq, comprareFunc, TD_EQ);
×
1649
  if (idx < 0) {
×
1650
    return TSDB_CODE_NOT_FOUND;
×
1651
  }
1652
  SBlockIndexMeta *pMeta = taosArrayGet(p->pMeta, idx);
×
1653
  if (pMeta == NULL) {
×
1654
    return TSDB_CODE_NOT_FOUND;
×
1655
  }
1656

1657
  uint8_t *data = (uint8_t *)p->pBlock->data + pMeta->offset;
×
1658

1659
  data = taosDecodeVariantI32((void *)data, len);
×
1660
  if (*len <= 0) {
×
1661
    return TSDB_CODE_NOT_FOUND;
×
1662
  }
1663
  *pValue = taosMemCalloc(1, *len);
×
1664
  if (*pValue == NULL) {
×
1665
    return terrno;
×
1666
  }
1667
  memcpy(*pValue, data, *len);
×
1668

1669
  return code;
×
1670
}
1671

1672
int32_t tableMetaOpen(char *name, SBTableMeta **pMeta, void *pMetaMgt) {
3,687✔
1673
  int32_t code = 0;
3,687✔
1674
  int32_t lino = 0;
3,687✔
1675

1676
  SBTableMeta *p = taosMemCalloc(1, sizeof(SBTableMeta));
3,687✔
1677
  if (p == NULL) {
3,687✔
1678
    TSDB_CHECK_CODE(code, lino, _error);
×
1679
  }
1680

1681
  if (name != NULL) {
3,687✔
1682
    memcpy(p->name, name, strlen(name) + 1);
×
1683
  }
1684
  p->pBse = ((STableMetaMgt *)pMetaMgt)->pBse;
3,687✔
1685

1686
  p->blockCap = BSE_BLOCK_SIZE((SBse *)p->pBse);
3,687✔
1687

1688
  *pMeta = p;
3,687✔
1689
_error:
3,687✔
1690
  if (code != 0) {
3,687✔
1691
    bseError("failed to open table meta %s at line %d since %s", name, lino, tstrerror(code));
×
1692
    tableMetaClose(p);
×
1693
  }
1694

1695
  return code;
3,687✔
1696
}
1697

1698
int32_t tableMetaCommit(SBTableMeta *pMeta, SArray *pBlock) {
3,508✔
1699
  int32_t                code = 0;
3,508✔
1700
  int32_t                lino = 0;
3,508✔
1701
  SBtableMetaWriter     *pWriter = NULL;
3,508✔
1702
  SBtableMetaReader     *pReader = NULL;
3,508✔
1703
  SBtableMetaReaderIter *pIter = NULL;
3,508✔
1704

1705
  char tempMetaName[TSDB_FILENAME_LEN] = {0};
3,508✔
1706
  char metaName[TSDB_FILENAME_LEN] = {0};
3,508✔
1707

1708
  char tempMetaPath[TSDB_FILENAME_LEN] = {0};
3,508✔
1709
  char metaPath[TSDB_FILENAME_LEN] = {0};
3,508✔
1710

1711
  bseBuildTempMetaName(pMeta->timestamp, tempMetaName);
3,508✔
1712
  bseBuildMetaName(pMeta->timestamp, metaName);
3,508✔
1713

1714
  code = tableMetaWriterInit(pMeta, tempMetaName, &pWriter);
3,508✔
1715
  TSDB_CHECK_CODE(code, lino, _error);
3,508✔
1716

1717
  code = tableMetaReaderInit(pMeta, metaName, &pReader);
3,508✔
1718
  TSDB_CHECK_CODE(code, lino, _error);
3,508✔
1719

1720
  code = tableMetaReaderOpenIter(pReader, &pIter);
3,508✔
1721
  TSDB_CHECK_CODE(code, lino, _error);
3,508✔
1722

1723
  while (!pIter->isOver) {
5,164✔
1724
    SBlkHandle    blkHandle = {0};
2,760✔
1725
    SBlockWrapper wrapper;
2,760✔
1726

1727
    code = tableMetaReaderIterNext(pIter, &wrapper, &blkHandle);
2,760✔
1728
    TSDB_CHECK_CODE(code, lino, _error);
2,760✔
1729

1730
    if (pIter->isOver) {
2,760✔
1731
      break;
1,104✔
1732
    }
1733

1734
    blockWrapperSetType(&wrapper, BSE_TABLE_META_TYPE);
1,656✔
1735

1736
    code = tableMetaWriteAppendRawBlock(pWriter, &wrapper, &blkHandle);
1,656✔
1737
    TSDB_CHECK_CODE(code, lino, _error);
1,656✔
1738

1739
    seqRangeUpdate(&pMeta->range, &blkHandle.range);
1,656✔
1740
  }
1741

1742
  code = tableMetaWriterAppendBlock(pWriter, pBlock);
3,508✔
1743
  TSDB_CHECK_CODE(code, lino, _error);
3,508✔
1744

1745
  code = tableMetaWriterCommit(pWriter);
3,508✔
1746
  TSDB_CHECK_CODE(code, lino, _error);
3,508✔
1747

1748
  tableMetaWriterClose(pWriter);
3,508✔
1749
  tableMetaReaderClose(pReader);
3,508✔
1750

1751
  pWriter = NULL;
3,508✔
1752
  pReader = NULL;
3,508✔
1753

1754
  bseBuildFullName(pMeta->pBse, tempMetaName, tempMetaPath);
3,508✔
1755
  bseBuildFullName(pMeta->pBse, metaName, metaPath);
3,508✔
1756

1757
  code = taosRenameFile(tempMetaPath, metaPath);
3,508✔
1758
  TSDB_CHECK_CODE(code, lino, _error);
3,508✔
1759

1760
_error:
3,508✔
1761
  if (code != 0) {
3,508✔
1762
    bseError("failed to commit table meta %s at line %d since %s", pMeta->name, lino, tstrerror(code));
×
1763
  }
1764
  tableMetaReaderIterClose(pIter);
3,508✔
1765
  tableMetaWriterClose(pWriter);
3,508✔
1766
  tableMetaReaderClose(pReader);
3,508✔
1767

1768
  return code;
3,508✔
1769
}
1770
int32_t tableMetaWriterAppendBlock(SBtableMetaWriter *pMeta, SArray *pBlock) {
3,508✔
1771
  int32_t code = 0;
3,508✔
1772
  if (taosArrayAddAll(pMeta->pBlock, pBlock) == NULL) {
3,508✔
1773
    return terrno;
×
1774
  }
1775
  return code;
3,508✔
1776
}
1777

1778
int32_t tableMetaWriterFlushBlock(SBtableMetaWriter *pMeta) {
3,508✔
1779
  int32_t   code = 0;
3,508✔
1780
  int32_t   lino = 0;
3,508✔
1781
  SSeqRange range = {.sseq = -1, .eseq = -1};
3,508✔
1782

1783
  int64_t offset = 0;
3,508✔
1784
  int32_t nWrite = 0;
3,508✔
1785
  int32_t size = pMeta->blockCap;
3,508✔
1786

1787
  blockWrapperClear(&pMeta->blockWrapper);
3,508✔
1788
  code = blockWrapperResize(&pMeta->blockWrapper, size);
3,508✔
1789
  TSDB_CHECK_CODE(code, lino, _error);
3,508✔
1790

1791
  for (int32_t i = 0; i < taosArrayGetSize(pMeta->pBlock); i++) {
7,016✔
1792
    SMetaBlock *pBlk = taosArrayGet(pMeta->pBlock, i);
3,508✔
1793
    if (blockEsimateSize(pMeta->blockWrapper.data, sizeof(SMetaBlock)) >= pMeta->blockCap) {
3,508✔
1794
      SBlkHandle handle = {.offset = pMeta->offset, .size = offset, .range = range};
×
1795

1796
      blockWrapperSetType(&pMeta->blockWrapper, BSE_TABLE_META_TYPE);
×
1797

1798
      code = tableFlushBlock(pMeta->pFile, &handle, &pMeta->blockWrapper, &nWrite);
×
1799
      TSDB_CHECK_CODE(code, lino, _error);
×
1800

1801
      pMeta->offset += nWrite;
×
1802
      handle.size = nWrite;
×
1803

1804
      blockWrapperClear(&pMeta->blockWrapper);
×
1805
      code = blockWrapperResize(&pMeta->blockWrapper, size);
×
1806
      TSDB_CHECK_CODE(code, lino, _error);
×
1807

1808
      if (taosArrayPush(pMeta->pBlkHandle, &handle) == NULL) {
×
1809
        TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1810
      }
1811
      range.sseq = -1;
×
1812
      offset = 0;
×
1813
    }
1814

1815
    offset += metaBlockAdd(pMeta->blockWrapper.data, pBlk);
3,508✔
1816

1817
    if (range.sseq == -1) {
3,508✔
1818
      range.sseq = pBlk->range.sseq;
3,508✔
1819
    }
1820
    range.eseq = pBlk->range.eseq;
3,508✔
1821
  }
1822
  if (offset == 0) {
3,508✔
1823
    return 0;
×
1824
  }
1825

1826
  blockWrapperSetType(&pMeta->blockWrapper, BSE_TABLE_META_TYPE);
3,508✔
1827

1828
  SBlkHandle handle = {.offset = pMeta->offset, .size = offset, .range = range};
3,508✔
1829
  code = tableFlushBlock(pMeta->pFile, &handle, &pMeta->blockWrapper, &nWrite);
3,508✔
1830
  TSDB_CHECK_CODE(code, lino, _error);
3,508✔
1831

1832
  pMeta->offset += nWrite;
3,508✔
1833
  handle.size = nWrite;
3,508✔
1834

1835
  if (taosArrayPush(pMeta->pBlkHandle, &handle) == NULL) {
7,016✔
1836
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1837
  }
1838
_error:
3,508✔
1839
  if (code != 0) {
3,508✔
1840
    bseError("failed to flush table meta %s at line %d since %s", pMeta->name, 0, tstrerror(code));
×
1841
    tableMetaWriterClose(pMeta);
×
1842
  }
1843
  return code;
3,508✔
1844
}
1845

1846
int32_t tableMetaWriterFlushIndex(SBtableMetaWriter *pMeta) {
3,508✔
1847
  int32_t code = 0;
3,508✔
1848
  int32_t lino = 0;
3,508✔
1849

1850
  int32_t nWrite = 0;
3,508✔
1851
  int64_t lastOffset = pMeta->offset;
3,508✔
1852
  int32_t blkHandleSize = 0;
3,508✔
1853

1854
  int32_t extra = 8;
3,508✔
1855
  int32_t size = taosArrayGetSize(pMeta->pBlkHandle) * sizeof(SBlkHandle);
3,508✔
1856

1857
  SSeqRange range = {-1, -1};
3,508✔
1858

1859
  blockWrapperClear(&pMeta->blockWrapper);
3,508✔
1860
  code = blockWrapperResize(&pMeta->blockWrapper, size + extra);
3,508✔
1861
  TSDB_CHECK_CODE(code, lino, _error);
3,508✔
1862

1863
  for (int32_t i = 0; i < taosArrayGetSize(pMeta->pBlkHandle); i++) {
8,672✔
1864
    SBlkHandle *pHandle = taosArrayGet(pMeta->pBlkHandle, i);
5,164✔
1865
    if (pHandle == NULL) {
5,164✔
1866
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _error);
×
1867
    }
1868
    blkHandleSize += metaBlockAddIndex(pMeta->blockWrapper.data, pHandle);
5,164✔
1869

1870
    seqRangeUpdate(&range, &pHandle->range);
5,164✔
1871
  }
1872

1873
  blockWrapperSetType(&pMeta->blockWrapper, BSE_TABLE_META_INDEX_TYPE);
3,508✔
1874

1875
  SBlkHandle handle = {.offset = lastOffset, .size = blkHandleSize, .range = range};
3,508✔
1876
  code = tableFlushBlock(pMeta->pFile, &handle, &pMeta->blockWrapper, &nWrite);
3,508✔
1877
  TSDB_CHECK_CODE(code, lino, _error);
3,508✔
1878

1879
  SBlkHandle metaHandle = {.offset = pMeta->offset, .size = nWrite, .range = range};
3,508✔
1880
  SBlkHandle indexHandle = {.offset = pMeta->offset + nWrite, .size = 0, .range = range};
3,508✔
1881
  pMeta->offset += nWrite;
3,508✔
1882

1883
  memcpy(pMeta->footer.metaHandle, &metaHandle, sizeof(SBlkHandle));
3,508✔
1884
  memcpy(pMeta->footer.indexHandle, &metaHandle, sizeof(SBlkHandle));
3,508✔
1885
_error:
3,508✔
1886
  if (code != 0) {
3,508✔
1887
    bseError("failed to build table meta index at line %d since %s", lino, tstrerror(code));
×
1888
  }
1889
  return code;
3,508✔
1890
}
1891

1892
int32_t tableMetaWriterFlushFooter(SBtableMetaWriter *p) {
3,508✔
1893
  char buf[kEncodeLen] = {0};
3,508✔
1894

1895
  int32_t code = 0;
3,508✔
1896
  int32_t lino = 0;
3,508✔
1897

1898
  code = footerEncode(&p->footer, buf);
3,508✔
1899
  TSDB_CHECK_CODE(code, lino, _error);
3,508✔
1900

1901
  p->offset += sizeof(buf);
3,508✔
1902

1903
  int32_t nwrite = taosWriteFile(p->pFile, buf, sizeof(buf));
3,508✔
1904
  if (nwrite != sizeof(buf)) {
3,508✔
1905
    code = terrno;
×
1906
    TSDB_CHECK_CODE(code, lino, _error);
×
1907
  }
1908

1909
_error:
3,508✔
1910
  if (code != 0) {
3,508✔
1911
    bseError("failed to add footer to table builder at line %d since %s", lino, tstrerror(code));
×
1912
  }
1913
  return code;
3,508✔
1914
}
1915
int32_t tableMetaWriterCommit(SBtableMetaWriter *pMeta) {
3,508✔
1916
  int32_t code = 0;
3,508✔
1917
  int32_t lino = 0;
3,508✔
1918

1919
  code = tableMetaWriterFlushBlock(pMeta);
3,508✔
1920
  TSDB_CHECK_CODE(code, lino, _error);
3,508✔
1921

1922
  code = tableMetaWriterFlushIndex(pMeta);
3,508✔
1923
  TSDB_CHECK_CODE(code, lino, _error);
3,508✔
1924

1925
  code = tableMetaWriterFlushFooter(pMeta);
3,508✔
1926
  TSDB_CHECK_CODE(code, lino, _error);
3,508✔
1927
_error:
3,508✔
1928
  if (code != 0) {
3,508✔
1929
    bseError("failed to commit table meta %s at line %d since %s", pMeta->name, lino, tstrerror(code));
×
1930
    tableMetaWriterClose(pMeta);
×
1931
  }
1932
  return code;
3,508✔
1933
}
1934
int32_t tableMetaWriteAppendRawBlock(SBtableMetaWriter *pMeta, SBlockWrapper *pBlock, SBlkHandle *pBlkHandle) {
1,656✔
1935
  int32_t code = 0;
1,656✔
1936
  int32_t lino = 0;
1,656✔
1937

1938
  int32_t nwrite = 0;
1,656✔
1939
  code = tableFlushBlock(pMeta->pFile, pBlkHandle, pBlock, &nwrite);
1,656✔
1940
  TSDB_CHECK_CODE(code, lino, _error);
1,656✔
1941

1942
  SBlkHandle handle = {.offset = pMeta->offset, .size = nwrite, .range = pBlkHandle->range};
1,656✔
1943
  if (taosArrayPush(pMeta->pBlkHandle, &handle) == NULL) {
3,312✔
1944
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1945
  }
1946
  pMeta->offset += nwrite;
1,656✔
1947
_error:
1,656✔
1948
  if (code != 0) {
1,656✔
1949
    bseError("failed to append block to table meta %s at line %d since %s", pMeta->name, lino, tstrerror(code));
×
1950
    tableMetaWriterClose(pMeta);
×
1951
  }
1952
  return code;
1,656✔
1953
}
1954

1955
int32_t tableMetaReaderLoadFooter(SBtableMetaReader *pMeta) {
10,436,308✔
1956
  int32_t code = 0;
10,436,308✔
1957
  int32_t lino = 0;
10,436,308✔
1958
  char    footer[kEncodeLen] = {0};
10,436,308✔
1959

1960
  if (pMeta->pFile == NULL) {
10,436,308✔
1961
    return 0;
2,404✔
1962
  }
1963
  int64_t n = taosLSeekFile(pMeta->pFile, -kEncodeLen, SEEK_END);
10,433,904✔
1964
  if (n < 0) {
10,433,904✔
1965
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1966
  }
1967

1968
  if (taosReadFile(pMeta->pFile, footer, kEncodeLen) != kEncodeLen) {
10,433,904✔
1969
    code = terrno;
×
1970
    TSDB_CHECK_CODE(code, lino, _error);
×
1971
  }
1972

1973
  code = footerDecode(&pMeta->footer, footer);
10,433,904✔
1974
  TSDB_CHECK_CODE(code, lino, _error);
10,433,904✔
1975
_error:
10,433,904✔
1976
  if (code != 0) {
10,433,904✔
1977
    bseError("failed to load table meta footer %s at line %d since %s", pMeta->name, lino, tstrerror(code));
×
1978
  }
1979
  return code;
10,433,904✔
1980
}
1981

1982
int32_t tableOpenFile(char *name, int8_t read, TdFilePtr *pFile, int64_t *size) {
20,875,751✔
1983
  int32_t lino = 0;
20,875,751✔
1984
  int32_t code = 0;
20,875,751✔
1985
  int32_t opt = 0;
20,875,751✔
1986

1987
  TdFilePtr p = NULL;
20,875,751✔
1988
  if (read) {
20,875,751✔
1989
    opt = TD_FILE_READ;
20,869,108✔
1990
  } else {
1991
    opt = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_APPEND;
6,643✔
1992
  }
1993

1994
  if (!taosCheckExistFile(name)) {
20,875,751✔
1995
    if (read) {
9,047✔
1996
      return 0;
2,404✔
1997
    }
1998

1999
    p = taosOpenFile(name, opt);
6,643✔
2000
    if (p == NULL) {
6,643✔
2001
      TSDB_CHECK_CODE(code = terrno, lino, _error);
×
2002
    }
2003

2004
    *pFile = p;
6,643✔
2005
    return code;
6,643✔
2006
  }
2007

2008
  code = taosStatFile(name, size, NULL, NULL);
20,866,704✔
2009
  TSDB_CHECK_CODE(code, lino, _error);
20,866,704✔
2010
  if (*size <= 0) {
20,866,704✔
2011
    TSDB_CHECK_CODE(code = TSDB_CODE_NOT_FOUND, lino, _error);
×
2012
  }
2013

2014
  p = taosOpenFile(name, opt);
20,866,704✔
2015
  if (p == NULL) {
20,866,704✔
2016
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
2017
  }
2018
  *pFile = p;
20,866,704✔
2019

2020
_error:
20,866,704✔
2021
  if (code != 0) {
20,866,704✔
2022
    bseError("failed to open table meta %s at line %d since %s", name, lino, tstrerror(code));
×
2023
  }
2024
  return code;
20,866,704✔
2025
}
2026
int32_t tableMetaOpenFile(SBtableMetaWriter *pMeta, int8_t read, char *name) {
10,439,816✔
2027
  int32_t code = 0;
10,439,816✔
2028
  int64_t size = 0;
10,439,816✔
2029
  int32_t lino = 0;
10,439,816✔
2030

2031
  code = tableOpenFile(name, read, &pMeta->pFile, &size);
10,439,816✔
2032
  TSDB_CHECK_CODE(code, lino, _error);
10,439,816✔
2033

2034
_error:
10,439,816✔
2035
  if (code != 0) {
10,439,816✔
2036
    bseError("failed to open table meta %s at line %d since %s", pMeta->name, lino, tstrerror(code));
×
2037
  }
2038

2039
  return code;
10,439,816✔
2040
}
2041

2042
int32_t tableMetaReaderLoad(SBtableMetaReader *pMeta) {
10,436,308✔
2043
  int32_t code = 0;
10,436,308✔
2044
  int32_t lino = 0;
10,436,308✔
2045

2046
  code = tableMetaOpenFile(pMeta, 1, pMeta->name);
10,436,308✔
2047
  TSDB_CHECK_CODE(code, lino, _error);
10,436,308✔
2048

2049
  code = tableMetaReaderLoadFooter(pMeta);
10,436,308✔
2050
  TSDB_CHECK_CODE(code, lino, _error);
10,436,308✔
2051

2052
  code = tableMetaReaderLoadIndex(pMeta);
10,436,308✔
2053
  TSDB_CHECK_CODE(code, lino, _error);
10,436,308✔
2054

2055
_error:
10,436,308✔
2056
  if (code != 0) {
10,436,308✔
2057
    bseError("failed to load table meta %s at line %d since %s", pMeta->name, lino, tstrerror(code));
×
2058
  }
2059
  return code;
10,436,308✔
2060
}
2061

2062
void tableMetaClose(SBTableMeta *p) {
3,687✔
2063
  if (p == NULL) return;
3,687✔
2064
  taosMemoryFree(p);
3,687✔
2065
}
2066

2067
int32_t tableMetaWriterInit(SBTableMeta *pMeta, char *name, SBtableMetaWriter **ppWriter) {
3,508✔
2068
  int32_t code = 0;
3,508✔
2069
  int32_t lino = 0;
3,508✔
2070

2071
  char path[TSDB_FILENAME_LEN] = {0};
3,508✔
2072
  bseBuildFullName(pMeta->pBse, name, path);
3,508✔
2073

2074
  SBtableMetaWriter *p = taosMemCalloc(1, sizeof(SBtableMetaWriter));
3,508✔
2075
  if (p == NULL) {
3,508✔
2076
    return terrno;
×
2077
  }
2078
  p->pTableMeta = pMeta;
3,508✔
2079

2080
  p->blockCap = pMeta->blockCap;
3,508✔
2081

2082
  p->pBlkHandle = taosArrayInit(128, sizeof(SBlkHandle));
3,508✔
2083
  if (p->pBlkHandle == NULL) {
3,508✔
2084
    TSDB_CHECK_CODE(code, lino, _error);
×
2085
  }
2086

2087
  p->pBlock = taosArrayInit(128, sizeof(SMetaBlock));
3,508✔
2088
  if (p->pBlock == NULL) {
3,508✔
2089
    TSDB_CHECK_CODE(code, lino, _error);
×
2090
  }
2091

2092
  code = blockWrapperInit(&p->blockWrapper, 1024);
3,508✔
2093
  TSDB_CHECK_CODE(code, lino, _error);
3,508✔
2094

2095
  code = tableMetaOpenFile(p, 0, path);
3,508✔
2096
  TSDB_CHECK_CODE(code, lino, _error);
3,508✔
2097

2098
  *ppWriter = p;
3,508✔
2099
_error:
3,508✔
2100
  if (code != 0) {
3,508✔
2101
    bseError("failed to init table meta writer %s at line %d since %s", pMeta->name, lino, tstrerror(code));
×
2102
    tableMetaWriterClose(p);
×
2103
  }
2104
  return code;
3,508✔
2105
}
2106

2107
void tableMetaWriterClose(SBtableMetaWriter *p) {
7,016✔
2108
  if (p == NULL) return;
7,016✔
2109
  if (taosCloseFile(&p->pFile) != 0) {
3,508✔
2110
    bseError("failed to close table meta writer file since %s", tstrerror(terrno));
×
2111
  }
2112
  taosArrayDestroy(p->pBlkHandle);
3,508✔
2113
  taosArrayDestroy(p->pBlock);
3,508✔
2114
  blockWrapperCleanup(&p->blockWrapper);
3,508✔
2115
  taosMemoryFree(p);
3,508✔
2116
}
2117

2118
int32_t tableMetaReaderInit(SBTableMeta *pMeta, char *name, SBtableMetaReader **ppReader) {
10,436,308✔
2119
  int32_t code = 0;
10,436,308✔
2120
  int32_t lino = 0;
10,436,308✔
2121
  char    path[TSDB_FILENAME_LEN] = {0};
10,436,308✔
2122
  bseBuildFullName(pMeta->pBse, name, path);
10,436,308✔
2123

2124
  SBtableMetaReader *p = taosMemCalloc(1, sizeof(SBtableMetaReader));
10,436,308✔
2125
  if (p == NULL) {
10,436,308✔
2126
    return terrno;
×
2127
  }
2128
  memcpy(p->name, path, sizeof(path));
10,436,308✔
2129
  p->pTableMeta = pMeta;
10,436,308✔
2130

2131
  p->pBlkHandle = taosArrayInit(128, sizeof(SBlkHandle));
10,436,308✔
2132
  if (p->pBlkHandle == NULL) {
10,436,308✔
2133
    TSDB_CHECK_CODE(code, lino, _error);
×
2134
  }
2135

2136
  code = blockWrapperInit(&p->blockWrapper, 1024);
10,436,308✔
2137
  TSDB_CHECK_CODE(code, lino, _error);
10,436,308✔
2138

2139
  code = tableMetaReaderLoad(p);
10,436,308✔
2140
  TSDB_CHECK_CODE(code, lino, _error);
10,436,308✔
2141

2142
  *ppReader = p;
10,436,308✔
2143
_error:
10,436,308✔
2144
  if (code != 0) {
10,436,308✔
2145
    bseError("failed to init table meta reader %s at line %d since %s", pMeta->name, lino, tstrerror(code));
×
2146
    tableMetaReaderClose(p);
×
2147
  }
2148
  return code;
10,436,308✔
2149
}
2150

2151
void tableMetaReaderClose(SBtableMetaReader *p) {
10,439,816✔
2152
  if (p == NULL) return;
10,439,816✔
2153
  if (taosCloseFile(&p->pFile) != 0) {
10,436,308✔
2154
    bseError("failed to close table meta reader file since %s", tstrerror(terrno));
×
2155
  }
2156
  taosArrayDestroy(p->pBlkHandle);
10,436,308✔
2157
  blockWrapperCleanup(&p->blockWrapper);
10,436,308✔
2158
  taosMemoryFree(p);
10,436,308✔
2159
}
2160
int32_t tableMetaReaderLoadBlockMeta(SBtableMetaReader *p, int64_t seq, SMetaBlock *pMetaBlock) {
10,432,800✔
2161
  int32_t            code = 0;
10,432,800✔
2162
  int32_t            lino = 0;
10,432,800✔
2163
  SBtableMetaReader *pMeta = p;
10,432,800✔
2164
  SSeqRange          range = {.sseq = seq, .eseq = seq};
10,432,800✔
2165

2166
  SBlkHandle  handle = {.range = range};
10,432,800✔
2167
  int32_t     index = taosArraySearchIdx(p->pBlkHandle, &handle, compareFunc, TD_LE);
10,432,800✔
2168
  SBlkHandle *pHandle = taosArrayGet(p->pBlkHandle, index);
10,432,800✔
2169
  if (pHandle == NULL) {
10,432,800✔
2170
    return TSDB_CODE_NOT_FOUND;
×
2171
  }
2172

2173
  code = tableLoadBlock(p->pFile, pHandle, &p->blockWrapper);
10,432,800✔
2174
  TSDB_CHECK_CODE(code, lino, _error);
10,432,800✔
2175

2176
  code = blockSeekMeta(p->blockWrapper.data, seq, pMetaBlock);
10,432,800✔
2177
  TSDB_CHECK_CODE(code, lino, _error);
10,432,800✔
2178

2179
_error:
10,432,800✔
2180
  return code;
10,432,800✔
2181
}
2182
int32_t tableMetaReaderLoadAllDataHandle(SBtableMetaReader *p, SArray *dataHandle) {
×
2183
  int32_t lino = 0;
×
2184
  int32_t code = 0;
×
2185

2186
  SArray *pMeta = taosArrayInit(8, sizeof(SMetaBlock));
×
2187
  if (pMeta == NULL) {
×
2188
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
2189
  }
2190

2191
  for (int32_t i = 0; i < taosArrayGetSize(p->pBlkHandle); i++) {
×
2192
    SBlkHandle *pHandle = taosArrayGet(p->pBlkHandle, i);
×
2193
    if (pHandle == NULL) {
×
2194
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
2195
    }
2196

2197
    code = tableLoadBlock(p->pFile, pHandle, &p->blockWrapper);
×
2198
    TSDB_CHECK_CODE(code, lino, _exit);
×
2199

2200
    code = blockGetAllMeta(p->blockWrapper.data, pMeta);
×
2201
    TSDB_CHECK_CODE(code, lino, _exit);
×
2202

2203
    if (taosArrayGetSize(pMeta) == 0) {
×
2204
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
2205
    }
2206

2207
    for (int32_t j = 0; j < taosArrayGetSize(pMeta); j++) {
×
2208
      SMetaBlock *pBlk = taosArrayGet(pMeta, j);
×
2209
      SBlkHandle  handle = {.offset = pBlk->offset, .size = pBlk->size, .range = pBlk->range};
×
2210
      if (taosArrayPush(dataHandle, &handle) == NULL) {
×
2211
        TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
2212
      }
2213
    }
2214
  }
2215
_exit:
×
2216
  taosArrayDestroy(pMeta);
×
2217
  return code;
×
2218
}
2219

2220
int32_t tableMetaReaderLoadMetaHandle(SBtableMetaReader *p, SArray *pMetaHandle) {
×
2221
  int32_t code = 0;
×
2222
  int32_t lino = 0;
×
2223

2224
  if (taosArrayGetSize(p->pBlkHandle) == 0) {
×
2225
    return TSDB_CODE_NOT_FOUND;
×
2226
  }
2227

2228
  for (int32_t i = 0; i < taosArrayGetSize(p->pBlkHandle); i++) {
×
2229
    SBlkHandle *pHandle = taosArrayGet(p->pBlkHandle, i);
×
2230
    if (pHandle == NULL) {
×
2231
      return TSDB_CODE_FILE_CORRUPTED;
×
2232
    }
2233
    if (taosArrayPush(pMetaHandle, pHandle) == NULL) {
×
2234
      TSDB_CHECK_CODE(code = terrno, lino, _error);
×
2235
    }
2236
  }
2237
_error:
×
2238
  return code;
×
2239
}
2240

2241
int32_t tableMetaReaderLoadIndex(SBtableMetaReader *p) {
10,436,308✔
2242
  int32_t code = 0;
10,436,308✔
2243
  int32_t lino = 0;
10,436,308✔
2244
  int32_t offset = 0;
10,436,308✔
2245
  SBtableMetaReader *pMeta = p;
10,436,308✔
2246

2247
  if (pMeta->pFile == NULL) {
10,436,308✔
2248
    return 0;
2,404✔
2249
  }
2250

2251
  pMeta->blockWrapper.type = BSE_TABLE_META_TYPE;
10,433,904✔
2252

2253
  code = tableLoadBlock(pMeta->pFile, pMeta->footer.metaHandle, &pMeta->blockWrapper);
10,433,904✔
2254
  TSDB_CHECK_CODE(code, lino, _error);
10,433,904✔
2255

2256
  if (blockGetType(p->blockWrapper.data) != BSE_TABLE_META_INDEX_TYPE) {
10,433,904✔
2257
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _error);
×
2258
  }
2259

2260
  SBlock  *pBlk = (SBlock *)pMeta->blockWrapper.data;
10,433,904✔
2261
  uint8_t *data = (uint8_t *)pBlk->data;
10,433,904✔
2262

2263
  do {
2264
    SBlkHandle handle = {0};
25,338,456✔
2265
    offset += blkHandleDecode(&handle, (char *)data + offset);
25,338,456✔
2266
    if (taosArrayPush(pMeta->pBlkHandle, &handle) == NULL) {
50,676,912✔
2267
      TSDB_CHECK_CODE(terrno, lino, _error);
×
2268
    }
2269
  } while (offset < pBlk->len);
25,338,456✔
2270

2271
_error:
10,433,904✔
2272
  if (code != 0) {
10,433,904✔
2273
    bseError("failed to load table meta blk handle %s at line %d since %s", pMeta->name, lino, tstrerror(code));
×
2274
  }
2275
  return code;
10,433,904✔
2276
}
2277

2278
int32_t tableMetaReaderOpenIter(SBtableMetaReader *pReader, SBtableMetaReaderIter **pIter) {
3,508✔
2279
  int32_t code = 0;
3,508✔
2280
  int32_t lino = 0;
3,508✔
2281

2282
  SBtableMetaReaderIter *p = taosMemCalloc(1, sizeof(SBtableMetaReaderIter));
3,508✔
2283
  if (p == NULL) {
3,508✔
2284
    return terrno;
×
2285
  }
2286
  p->pReader = pReader;
3,508✔
2287

2288
  code = blockWrapperInit(&p->pBlockWrapper, 1024);
3,508✔
2289
  if (code != 0) {
3,508✔
2290
    return code;
×
2291
  }
2292

2293
  *pIter = p;
3,508✔
2294
  if (taosArrayGetSize(pReader->pBlkHandle) == 0) {
3,508✔
2295
    p->isOver = 1;
2,404✔
2296
    return 0;
2,404✔
2297
  }
2298

2299
  return 0;
1,104✔
2300
}
2301

2302
int32_t tableMetaReaderIterNext(SBtableMetaReaderIter *pIter, SBlockWrapper *pDataWrapper, SBlkHandle *dstHandle) {
2,760✔
2303
  int32_t code = 0;
2,760✔
2304
  int32_t lino = 0;
2,760✔
2305

2306
  if (pIter->blkIdx >= taosArrayGetSize(pIter->pReader->pBlkHandle)) {
2,760✔
2307
    pIter->isOver = 1;
1,104✔
2308
    return 0;
1,104✔
2309
  }
2310

2311
  SBlkHandle *pHandle = taosArrayGet(pIter->pReader->pBlkHandle, pIter->blkIdx);
1,656✔
2312
  if (pHandle == NULL) {
1,656✔
2313
    return TSDB_CODE_FILE_CORRUPTED;
×
2314
  }
2315

2316
  SBlockWrapper *pWrapper = &pIter->pBlockWrapper;
1,656✔
2317
  code = blockWrapperResize(pWrapper, pHandle->size);
1,656✔
2318
  TSDB_CHECK_CODE(code, lino, _error);
1,656✔
2319

2320
  code = tableLoadBlock(pIter->pReader->pFile, pHandle, pWrapper);
1,656✔
2321
  TSDB_CHECK_CODE(code, lino, _error);
1,656✔
2322

2323
  pIter->blkIdx++;
1,656✔
2324

2325
  if (blockGetType(pWrapper->data) != BSE_TABLE_META_TYPE) {
1,656✔
2326
    pIter->isOver = 1;
×
2327
    return 0;
×
2328
  }
2329

2330
  *pDataWrapper = *pWrapper;
1,656✔
2331
  *dstHandle = *pHandle;
1,656✔
2332

2333
_error:
1,656✔
2334
  if (code != 0) {
1,656✔
2335
    bseError("failed to load table meta blk handle %s at line %d since %s", pIter->pReader->name, lino,
×
2336
             tstrerror(code));
2337
    pIter->pReader = NULL;
×
2338
  }
2339
  return code;
1,656✔
2340
}
2341

2342
void tableMetaReaderIterClose(SBtableMetaReaderIter *p) {
3,508✔
2343
  if (p == NULL) return;
3,508✔
2344
  blockWrapperCleanup(&p->pBlockWrapper);
3,508✔
2345
  taosMemoryFree(p);
3,508✔
2346
}
2347

2348
int32_t bseMemTableCreate(STableMemTable **pMemTable, int32_t cap) {
6,253✔
2349
  int32_t code = 0;
6,253✔
2350
  int32_t lino = 0;
6,253✔
2351

2352
  STableMemTable *p = taosMemoryCalloc(1, sizeof(STableMemTable));
6,253✔
2353
  if (p == NULL) {
6,253✔
2354
    return terrno;
×
2355
  }
2356

2357
  p->pMetaHandle = taosArrayInit(8, sizeof(SBlkHandle));
6,253✔
2358
  if (p->pMetaHandle == NULL) {
6,253✔
2359
    TAOS_CHECK_GOTO(terrno, &lino, _error);
×
2360
  }
2361

2362
  code = blockWrapperInit(&p->pBlockWrapper, cap);
6,253✔
2363
  TAOS_CHECK_GOTO(code, &lino, _error);
6,253✔
2364

2365
  taosInitRWLatch(&p->latch);
6,253✔
2366

2367
  seqRangeReset(&p->range);
6,253✔
2368
  seqRangeReset(&p->tableRange);
6,253✔
2369
  p->ref = 1;
6,253✔
2370
  bseTrace("create mem table %p", p);
6,253✔
2371

2372
_error:
6,253✔
2373
  if (code != 0) {
6,253✔
2374
    bseMemTableDestroy(p);
×
2375
  }
2376
  *pMemTable = p;
6,253✔
2377

2378
  return code;
6,253✔
2379
}
2380

2381
int32_t bseMemTableRef(STableMemTable *pMemTable) {
20,908,725✔
2382
  int32_t code = 0;
20,908,725✔
2383
  if (pMemTable == NULL) {
20,908,725✔
2384
    return TSDB_CODE_INVALID_CFG;
×
2385
  }
2386

2387
  SBse *pBse = (SBse *)pMemTable->pBse;
20,908,725✔
2388
  bseTrace("ref mem table %p", pMemTable);
20,908,725✔
2389

2390
  int32_t nRef = atomic_fetch_add_32(&pMemTable->ref, 1);
20,908,725✔
2391
  if (nRef <= 0) {
20,908,725✔
2392
    bseError("vgId:%d, memtable ref count is invalid, ref:%d", BSE_VGID(pBse), nRef);
×
2393
    return TSDB_CODE_INVALID_CFG;
×
2394
  }
2395
  return code;
20,908,725✔
2396
}
2397

2398
void bseMemTableUnRef(STableMemTable *pMemTable) {
20,918,503✔
2399
  int32_t code = 0;
20,918,503✔
2400

2401
  bseTrace("unref mem table %p", pMemTable);
20,918,503✔
2402
  if (pMemTable == NULL) {
20,918,503✔
2403
    return;
3,525✔
2404
  }
2405
  if (atomic_sub_fetch_32(&pMemTable->ref, 1) == 0) {
20,914,978✔
2406
    bseMemTableDestroy(pMemTable);
6,253✔
2407
    bseTrace("destroy mem table %p", pMemTable);
6,253✔
2408
  }
2409
}
2410
void bseMemTableDestroy(STableMemTable *pMemTable) {
6,253✔
2411
  if (pMemTable == NULL) return;
6,253✔
2412
  taosArrayDestroy(pMemTable->pMetaHandle);
6,253✔
2413
  blockWrapperCleanup(&pMemTable->pBlockWrapper);
6,253✔
2414
  taosMemoryFree(pMemTable);
6,253✔
2415
}
2416
int32_t bseMemTablePush(STableMemTable *pMemTable, void *pHandle) {
3,508✔
2417
  int32_t code = 0;
3,508✔
2418
  if (pMemTable == NULL || pHandle == NULL) {
3,508✔
2419
    code = TSDB_CODE_INVALID_PARA;
×
2420
    return code;
×
2421
  }
2422

2423
  if (taosArrayPush(pMemTable->pMetaHandle, pHandle) == NULL) {
7,016✔
2424
    code = terrno;
×
2425
    bseError("Failed to push handle to memtable since %s", tstrerror(code));
×
2426

2427
    return code;
×
2428
  }
2429
  return code;
3,508✔
2430
}
2431
int32_t bseMemTablGetMetaBlock(STableMemTable *p, SArray **pMetaBlock) {
3,508✔
2432
  int32_t inLock = 0;
3,508✔
2433
  int32_t lino = 0;
3,508✔
2434
  int32_t code = bseMemTableRef(p);
3,508✔
2435
  if (code != 0) {
3,508✔
2436
    bseError("Failed to ref memtable since %s", tstrerror(code));
×
2437
    return code;
×
2438
  }
2439

2440
  SArray *pBlock = taosArrayInit(8, sizeof(SMetaBlock));
3,508✔
2441
  if (pBlock == NULL) {
3,508✔
2442
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
2443
  }
2444
  taosRLockLatch(&p->latch);
3,508✔
2445
  inLock = 1;
3,508✔
2446

2447
  for (int32_t i = 0; i < taosArrayGetSize(p->pMetaHandle); i++) {
7,016✔
2448
    SBlkHandle *handle = taosArrayGet(p->pMetaHandle, i);
3,508✔
2449
    SMetaBlock  block = {.type = BSE_TABLE_META_TYPE,
7,016✔
2450
                         .version = BSE_DATA_VER,
2451
                         .range = handle->range,
2452
                         .offset = handle->offset,
3,508✔
2453
                         .size = handle->size};
3,508✔
2454
    if (taosArrayPush(pBlock, &block) == NULL) {
3,508✔
2455
      TSDB_CHECK_CODE(code = terrno, lino, _error);
×
2456
    }
2457
  }
2458
_error:
3,508✔
2459
  if (inLock) taosRUnLockLatch(&p->latch);
3,508✔
2460
  if (code != 0) {
3,508✔
2461
    bseError("failed to get meta block from memtable since %s", tstrerror(code));
×
2462
    taosArrayDestroy(pBlock);
×
2463
    pBlock = NULL;
×
2464
  }
2465
  bseMemTableUnRef(p);
3,508✔
2466

2467
  *pMetaBlock = pBlock;
3,508✔
2468

2469
  return code;
3,508✔
2470
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc