• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4933

20 Jan 2026 10:44AM UTC coverage: 66.671% (+0.03%) from 66.646%
#4933

push

travis-ci

web-flow
merge: from main to 3.0 #34340

73 of 178 new or added lines in 9 files covered. (41.01%)

1199 existing lines in 124 files now uncovered.

203121 of 304663 relevant lines covered (66.67%)

132228377.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.5
/source/dnode/vnode/src/bse/bseTable.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "bse.h"
17
#include "bseCache.h"
18
#include "bseSnapshot.h"
19
#include "bseTable.h"
20
#include "bseTableMgt.h"
21
#include "crypt.h"
22
#include "osMemPool.h"
23
#include "vnodeInt.h"
24

25
// table footer func
26
static int32_t footerEncode(STableFooter *pFooter, char *buf);
27
static int32_t footerDecode(STableFooter *pFooter, char *buf);
28

29
// block handle func
30
static int32_t blkHandleEncode(SBlkHandle *pHandle, char *buf);
31
static int32_t blkHandleDecode(SBlkHandle *pHandle, char *buf);
32

33
// table meta func
34
static int32_t metaBlockEncode(SMetaBlock *pMeta, char *buf);
35
static int32_t metaBlockDecode(SMetaBlock *pMeta, char *buf);
36

37
static int32_t metaBlockAdd(SBlock *p, SMetaBlock *pMeta);
38
static int32_t metaBlockGet(SBlock *p, SMetaBlock *pMeta);
39

40
// table footer func
41
static int32_t footerEncode(STableFooter *pFooter, char *buf);
42
static int32_t footerDecode(STableFooter *pFooter, char *buf);
43

44
// block func
45
static int32_t blockCreate(int32_t cap, SBlock **pBlock);
46
static void    blockDestroy(SBlock *pBlock);
47
static int32_t blockPut(SBlock *pBlock, int64_t seq, uint8_t *value, int32_t len);
48
static int32_t blockAppendBatch(SBlock *p, uint8_t *value, int32_t len);
49
static int32_t blockEsimateSize(SBlock *pBlock, int32_t extra);
50
static void    blockClear(SBlock *pBlock);
51
static int32_t blockSeek(SBlock *p, int64_t seq, uint8_t **pValue, int32_t *len);
52
static int8_t  blockGetType(SBlock *p);
53

54
static int32_t blockSeekMeta(SBlock *p, int64_t seq, SMetaBlock *pMeta);
55
static int32_t blockGetAllMeta(SBlock *p, SArray *pResult);
56
static int32_t metaBlockAddIndex(SBlock *p, SBlkHandle *pInfo);
57

58
static int32_t tableMetaWriterInit(SBTableMeta *pMeta, char *name, SBtableMetaWriter **ppWriter);
59
static int32_t tableMetaWriterCommit(SBtableMetaWriter *pMeta);
60
static void    tableMetaWriterClose(SBtableMetaWriter *p);
61
static int32_t tableMetaWriteAppendRawBlock(SBtableMetaWriter *pMeta, SBlockWrapper *pBlock, SBlkHandle *pBlkHandle);
62

63
static int32_t tableMetaReaderInit(SBTableMeta *pMeta, char *name, SBtableMetaReader **ppReader);
64
static void    tableMetaReaderClose(SBtableMetaReader *p);
65
static int32_t tableMetaReaderLoadIndex(SBtableMetaReader *p);
66

67
static int32_t tableMetaOpenFile(SBtableMetaWriter *pMeta, int8_t read, char *name);
68

69
static int32_t tableMetaReaderOpenIter(SBtableMetaReader *pReader, SBtableMetaReaderIter **pIter);
70
static int32_t tableMetaReaderIterNext(SBtableMetaReaderIter *pIter, SBlockWrapper *pDataWrapper,
71
                                       SBlkHandle *dstHandle);
72
static void    tableMetaReaderIterClose(SBtableMetaReaderIter *p);
73

74
// STable builder func
75
static int32_t tableBuilderGetBlockSize(STableBuilder *p);
76
static int32_t tableBuilderLoadBlock(STableBuilder *p, SBlkHandle *pHandle, SBlockWrapper *pBlkWrapper);
77
static int32_t tableBuilderSeek(STableBuilder *p, SBlkHandle *pHandle, int64_t seq, uint8_t **pValue, int32_t *len);
78
static void    tableBuilderUpdateBlockRange(STableBuilder *p, SBlockItemInfo *pInfo);
79
static void    tableBuildUpdateTableRange(STableBuilder *p, SBlockItemInfo *pInfo);
80

81
// STable pReaderMgt func
82

83
static int32_t tableReaderInitMeta(STableReader *p, SBlock *pBlock);
84

85
static int32_t tableReaderLoadRawBlock(STableReader *p, SBlkHandle *pHandle, SBlockWrapper *pBlkWrapper);
86
static int32_t tableReaderLoadRawMeta(STableReader *p, SBlkHandle *pHandle, SBlockWrapper *blkWrapper);
87
static int32_t tableReaderLoadRawMetaIndex(STableReader *p, SBlockWrapper *blkWrapper);
88
static int32_t tableReaderLoadRawFooter(STableReader *p, SBlockWrapper *blkWrapper);
89

90
static int32_t tableOpenFile(char *name, int8_t read, TdFilePtr *pFile, int64_t *size);
91
static int32_t tableFlushBlock(TdFilePtr pFile, SBlkHandle *pHandle, SBlockWrapper *pBlk, int32_t *nWrite);
92
static int32_t tableLoadBlock(TdFilePtr pFile, SBlkHandle *pHandle, SBlockWrapper *pBlk);
93
static int32_t tableLoadRawBlock(TdFilePtr pFile, SBlkHandle *pHandle, SBlockWrapper *pBlk, int8_t checkSum);
94

95
/*---block formate----*/
96
//---datatype--|---len---|--data---|--rawdatasize---|--compressType---|---checksum---|
97
//- int8_t   |  int32_t | uint8_t[] |    int32_t     |      int8_t     |    TSCKSUM|
98
#define BLOCK_ROW_SIZE_OFFSET(p)      (sizeof(SBlock) + (p)->len)
99
#define BLOCK_ROW_SIZE(p)             BLOCK_ROW_SIZE_OFFSET(p)
100
#define BLOCK_COMPRESS_TYPE_OFFSET(p) (BLOCK_ROW_SIZE_OFFSET(p) + sizeof(int32_t))
101
#define BLOCK_CHECKSUM_OFFSET(p)      (BLOCK_COMPRESS_TYPE_OFFSET(p) + sizeof(int8_t))
102
#define BLOCK_TOTAL_SIZE(p)           (BLOCK_CHECKSUM_OFFSET(p) + sizeof(TSCKSUM))
103

104
#define BLOCK_SET_ROW_SIZE(p, size) *(int32_t *)((char *)(p) + BLOCK_ROW_SIZE_OFFSET(p)) = (size)
105
#define BLOCK_GET_ROW_SIZE(p)       *(int32_t *)((char *)(p) + BLOCK_ROW_SIZE_OFFSET(p))
106

107
#define BLOCK_SET_COMPRESS_TYPE(p, type) *(int8_t *)((char *)(p) + BLOCK_COMPRESS_TYPE_OFFSET(p)) = (type)
108
#define BLOCK_GET_COMPRESS_TYPE(p)       *(int8_t *)((char *)(p) + BLOCK_COMPRESS_TYPE_OFFSET(p))
109

110
#define BLOCK_TAIL_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(TSCKSUM))
111

112
#define COMREPSS_DATA_SET_TYPE_AND_RAWLEN(p, len, type, rawLen) \
113
  do {                                                          \
114
    *(int32_t *)((char *)(p) + len) = (rawLen);                 \
115
    *(int8_t *)((char *)(p) + len + sizeof(int32_t)) = (type);  \
116
  } while (0);
117
#define COMPRESS_DATA_GET_TYPE_AND_RAWLEN(p, len, type, rawLen)                 \
118
  do {                                                                          \
119
    (rawLen) = *(int32_t *)((char *)(p) + len - BLOCK_TAIL_LEN);                \
120
    (type) = *(int8_t *)((char *)(p) + len - BLOCK_TAIL_LEN + sizeof(int32_t)); \
121
  } while (0);
122

123
int32_t tableBuilderSeek(STableBuilder *p, SBlkHandle *pHandle, int64_t seq, uint8_t **pValue, int32_t *len) {
×
124
  int32_t code = 0;
×
125
  int32_t lino = 0;
×
126

127
  SBlockWrapper blockWrapper = {0};
×
128

129
  code = tableBuilderLoadBlock(p, pHandle, &blockWrapper);
×
130
  TSDB_CHECK_CODE(code, lino, _error);
×
131

132
  code = blockSeek(blockWrapper.data, seq, pValue, len);
×
133
  TSDB_CHECK_CODE(code, lino, _error);
×
134

135
_error:
×
136
  if (code != 0) {
×
137
    bseError("failed to seek data from table builder at lino %d ince %s", lino, tstrerror(code));
×
138
  }
139
  blockWrapperCleanup(&blockWrapper);
×
140
  return code;
×
141
}
142

143
int32_t tableBuilderLoadBlock(STableBuilder *p, SBlkHandle *pHandle, SBlockWrapper *pBlkWrapper) {
×
144
  int32_t code = 0;
×
145
  int32_t lino = 0;
×
146
  code = blockWrapperInit(pBlkWrapper, pHandle->size);
×
147
  TSDB_CHECK_CODE(code, lino, _error);
×
148

149
  // Set pBse pointer for encryption/decryption
NEW
150
  pBlkWrapper->pBse = p->pBse;
×
151

152
  code = tableLoadBlock(p->pDataFile, pHandle, pBlkWrapper);
×
153
_error:
×
154
  if (code != 0) {
×
155
    bseError("failed to load block from table builder at lino %d since %s", lino, tstrerror(code));
×
156
  }
157
  return code;
×
158
}
159

160
int32_t tableBuilderOpen(int64_t ts, STableBuilder **pBuilder, SBse *pBse) {
3,164✔
161
  int32_t code = 0;
3,164✔
162
  int32_t lino = 0;
3,164✔
163

164
  char name[TSDB_FILENAME_LEN] = {0};
3,164✔
165
  char path[TSDB_FILENAME_LEN] = {0};
3,164✔
166
  bseBuildDataName(ts, name);
3,164✔
167
  bseBuildFullName(pBse, name, path);
3,164✔
168

169
  STableBuilder *p = taosMemoryCalloc(1, sizeof(STableBuilder));
3,164✔
170
  if (p == NULL) {
3,164✔
171
    TSDB_CHECK_CODE(terrno, lino, _error);
×
172
  }
173
  p->timestamp = ts;
3,164✔
174
  memcpy(p->name, name, strlen(name));
3,164✔
175

176
  p->blockCap = BSE_BLOCK_SIZE(pBse);
3,164✔
177

178
  code = bseMemTableCreate(&p->pMemTable, BSE_BLOCK_SIZE(pBse));
3,164✔
179
  TSDB_CHECK_CODE(code, lino, _error);
3,164✔
180

181
  p->compressType = BSE_COMPRESS_TYPE(pBse);
3,164✔
182
  TSDB_CHECK_CODE(code, lino, _error);
3,164✔
183

184
  seqRangeReset(&p->tableRange);
3,164✔
185
  seqRangeReset(&p->blockRange);
3,164✔
186

187
  p->pBse = pBse;
3,164✔
188
  code = tableOpenFile(path, 0, &p->pDataFile, &p->offset);
3,164✔
189
  p->blockCap = BSE_BLOCK_SIZE(pBse);
3,164✔
190

191
  *pBuilder = p;
3,164✔
192
  p->pMemTable->pTableBuilder = p;
3,164✔
193

194
_error:
3,164✔
195
  if (code != 0) {
3,164✔
196
    (void)tableBuilderClose(p, 0);
×
197
    bseError("failed to open table builder at line %d since %s", lino, tstrerror(code));
×
198
  }
199
  return code;
3,164✔
200
}
201

202
int32_t tableBuilderGetMetaBlock(STableBuilder *p, SArray **pMetaBlock) {
3,537✔
203
  return bseMemTablGetMetaBlock(p->pImmuMemTable, pMetaBlock);
3,537✔
204
}
205

206
int32_t tableBuilderAddMeta(STableBuilder *p, SBlkHandle *pHandle, int8_t immu) {
3,537✔
207
  int32_t code = 0;
3,537✔
208
  int32_t lino = 0;
3,537✔
209

210
  STableMemTable *pMemTable = immu ? p->pImmuMemTable : p->pMemTable;
3,537✔
211

212
  code = bseMemTableRef(pMemTable);
3,537✔
213
  TAOS_CHECK_GOTO(code, &lino, _error);
3,537✔
214

215
  code = bseMemTablePush(pMemTable, pHandle);
3,537✔
216
  TAOS_CHECK_GOTO(code, &lino, _error);
3,537✔
217

218
  seqRangeReset(&pMemTable->range);
3,537✔
219
_error:
3,537✔
220
  bseMemTableUnRef(pMemTable);
3,537✔
221
  return code;
3,537✔
222
}
223
int32_t tableBuilderSetBlockInfo(STableMemTable *pMemTable) {
3,537✔
224
  int32_t        code = 0;
3,537✔
225
  int32_t        lino = 0;
3,537✔
226
  SBlockWrapper *pWp = &pMemTable->pBlockWrapper;
3,537✔
227

228
  code = blockWrapperResize(pWp, BLOCK_TOTAL_SIZE((SBlock *)(pWp->data)) + pWp->kvSize);
3,537✔
229
  TSDB_CHECK_CODE(code, lino, _error);
3,537✔
230

231
  SBlock *pBlock = (SBlock *)pWp->data;
3,537✔
232

233
  pBlock->offset = pBlock->len;
3,537✔
234
  memcpy(pBlock->data + pBlock->len, pWp->kvBuffer, pWp->kvSize);
3,537✔
235
  pBlock->len += pWp->kvSize;
3,537✔
236
  pBlock->version = BSE_DATA_VER;
3,537✔
237
_error:
3,537✔
238
  return code;
3,537✔
239
}
240
int32_t tableBuilderFlush(STableBuilder *p, int8_t type, int8_t immutable) {
3,537✔
241
  int32_t code = 0;
3,537✔
242
  int32_t lino = 0;
3,537✔
243
  int8_t  inLock = 0;
3,537✔
244

245
  STableMemTable *pMemTable = immutable ? p->pImmuMemTable : p->pMemTable;
3,537✔
246
  if (p == NULL) return code;
3,537✔
247

248
  SBlockWrapper wrapper = {0};
3,537✔
249
  code = bseMemTableRef(pMemTable);
3,537✔
250
  TSDB_CHECK_CODE(code, lino, _error);
3,537✔
251

252
  if (immutable) {
3,537✔
253
    taosWLockLatch(&pMemTable->latch);
3,537✔
254
    inLock = 1;
3,537✔
255
  }
256

257
  code = tableBuilderSetBlockInfo(pMemTable);
3,537✔
258
  TSDB_CHECK_CODE(code, lino, _error);
3,537✔
259

260
  SBlock *pBlk = pMemTable->pBlockWrapper.data;
3,537✔
261
  if (pBlk->len == 0) {
3,537✔
262
    goto _error;
×
263
  }
264

265
  int8_t compressType = BSE_COMPRESS_TYPE(p->pBse);
3,537✔
266

267
  uint8_t *pWrite = (uint8_t *)pBlk;
3,537✔
268
  int32_t  len = BLOCK_TOTAL_SIZE(pBlk);
3,537✔
269

270
  pBlk->type = type;
3,537✔
271

272
  BLOCK_SET_COMPRESS_TYPE(pBlk, compressType);
3,537✔
273
  BLOCK_SET_ROW_SIZE(pBlk, BLOCK_ROW_SIZE(pBlk));
3,537✔
274

275
  if (compressType != kNoCompres) {
3,537✔
276
    code = blockWrapperInit(&wrapper, len + 16);
3,537✔
277
    TSDB_CHECK_CODE(code, lino, _error);
3,537✔
278

279
    int32_t compressSize = wrapper.cap;
3,537✔
280
    code = bseCompressData(compressType, pWrite, BLOCK_ROW_SIZE(pBlk), wrapper.data, &compressSize);
3,537✔
281
    if (code != 0) {
3,537✔
282
      bseWarn("failed to compress data since %s, not set compress", tstrerror(TSDB_CODE_THIRDPARTY_ERROR));
×
283

284
      blockWrapperCleanup(&wrapper);
×
285
      BLOCK_SET_COMPRESS_TYPE(pBlk, kNoCompres);
×
286
      BLOCK_SET_ROW_SIZE(pBlk, BLOCK_ROW_SIZE(pBlk));
×
287
    } else {
288
      int32_t rawSize = BLOCK_ROW_SIZE(pBlk);
3,537✔
289
      COMREPSS_DATA_SET_TYPE_AND_RAWLEN(wrapper.data, compressSize, compressType, rawSize);
3,537✔
290
      len = compressSize + BLOCK_TAIL_LEN;
3,537✔
291

292
      pWrite = (uint8_t *)wrapper.data;
3,537✔
293
    }
294
  }
295

296
  code = taosCalcChecksumAppend(0, (uint8_t *)pWrite, len);
3,537✔
297
  TSDB_CHECK_CODE(code, lino, _error);
3,537✔
298

299
  SBlkHandle handle = {.size = len, .offset = p->offset, .range = pMemTable->range};
3,537✔
300

301
  bseDebug("bse flush at offset %" PRId64 " len: %d, block range sseq:%" PRId64 ", eseq:%" PRId64 "", p->offset, len,
3,537✔
302
           handle.range.sseq, handle.range.eseq);
303

304
  int64_t n = taosLSeekFile(p->pDataFile, handle.offset, SEEK_SET);
3,537✔
305
  if (n < 0) {
3,537✔
306
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
307
  }
308

309
  int64_t nwrite = taosWriteFile(p->pDataFile, (uint8_t *)pWrite, len);
3,537✔
310
  if (nwrite != len) {
3,537✔
311
    code = terrno;
×
312
    TSDB_CHECK_CODE(code, lino, _error);
×
313
  }
314
  p->offset += len;
3,537✔
315

316
  code = tableBuilderAddMeta(p, &handle, immutable);
3,537✔
317

318
_error:
3,537✔
319
  if (code != 0) {
3,537✔
320
    bseError("failed to flush table builder at line %d since %s", lino, tstrerror(code));
×
321
  }
322

323
  if (pMemTable != NULL) {
3,537✔
324
    if (!immutable) blockWrapperClear(&pMemTable->pBlockWrapper);
3,537✔
325
    if (inLock) {
3,537✔
326
      taosWUnLockLatch(&pMemTable->latch);
3,537✔
327
    }
328
    bseMemTableUnRef(pMemTable);
3,537✔
329
  }
330
  blockWrapperCleanup(&wrapper);
3,537✔
331
  return code;
3,537✔
332
}
333

334
void tableBuildUpdateTableRange(STableBuilder *p, SBlockItemInfo *pInfo) {
18,183✔
335
  SSeqRange range = {.sseq = pInfo->seq, .eseq = pInfo->seq};
18,183✔
336
  seqRangeUpdate(&p->tableRange, &range);
18,183✔
337
}
18,183✔
338

339
void tableBuilderUpdateBlockRange(STableBuilder *p, SBlockItemInfo *pInfo) {
21,066,099✔
340
  SSeqRange range = {.sseq = pInfo->seq, .eseq = pInfo->seq};
21,066,099✔
341
  seqRangeUpdate(&p->blockRange, &range);
21,066,099✔
342
}
21,066,099✔
343
void memtableUpdateBlockRange(STableMemTable *p, SBlockItemInfo *pInfo) {
21,084,282✔
344
  SSeqRange range = {.sseq = pInfo->seq, .eseq = pInfo->seq};
21,084,282✔
345
  seqRangeUpdate(&p->range, &range);
21,084,282✔
346
  seqRangeUpdate(&p->tableRange, &range);
21,084,282✔
347
}
21,084,282✔
348

349
// table block data
350
// data1 data2 data3 data4 k1v1 k2v2, k3,v3 compresss size raw_size
351
//|seq len value|seq len value| seq len value| seq len value|
352
int32_t tableBuilderPut(STableBuilder *p, SBseBatch *pBatch) {
21,510✔
353
  int32_t code = 0;
21,510✔
354
  int32_t lino = 0;
21,510✔
355
  int32_t len = 0, offset = 0;
21,510✔
356
  int8_t  inLock = 0;
21,510✔
357

358
  code = bseMemTableRef(p->pMemTable);
21,510✔
359
  if (code != 0) {
21,510✔
360
    return code;
×
361
  }
362

363
  taosWLockLatch(&p->pMemTable->latch);
21,510✔
364
  inLock = 1;
21,510✔
365

366
  SBlockWrapper *pBlockWrapper = &p->pMemTable->pBlockWrapper;
21,510✔
367

368
  for (int32_t i = 0; i < taosArrayGetSize(pBatch->pSeq);) {
21,087,609✔
369
    SBlockItemInfo *pInfo = taosArrayGet(pBatch->pSeq, i);
21,066,099✔
370
    if (i == 0 || i == taosArrayGetSize(pBatch->pSeq) - 1) {
21,066,099✔
371
      tableBuildUpdateTableRange(p, pInfo);
18,183✔
372
      memtableUpdateBlockRange(p->pMemTable, pInfo);
18,183✔
373
    }
374

375
    if (atomic_load_8(&p->hasImmuMemTable) ||
42,132,198✔
376
        (blockWrapperSize(pBlockWrapper, len + pInfo->size) < tableBuilderGetBlockSize(p))) {
21,066,099✔
377
      i++;
21,066,099✔
378
      len += pInfo->size;
21,066,099✔
379
      tableBuilderUpdateBlockRange(p, pInfo);
21,066,099✔
380
      memtableUpdateBlockRange(p->pMemTable, pInfo);
21,066,099✔
381

382
      code = blockWrapperPushMeta(pBlockWrapper, pInfo->seq, NULL, pInfo->size);
21,066,099✔
383
      TSDB_CHECK_CODE(code, lino, _error);
21,066,099✔
384

385
      bseTrace("start to insert  bse table builder mem %p, idx %d", p->pMemTable, i);
21,066,099✔
386
      continue;
21,066,099✔
387
    } else {
388
      if (len > 0) {
×
389
        offset += blockAppendBatch(pBlockWrapper->data, pBatch->buf + offset, len);
×
390
      }
391
      bseTrace("start to flush bse table builder mem %p", p->pMemTable);
×
392
      code = tableBuilderFlush(p, BSE_TABLE_DATA_TYPE, 0);
×
393
      TSDB_CHECK_CODE(code, lino, _error);
×
394
      len = 0;
×
395
    }
396
  }
397

398
  if (offset < pBatch->len) {
21,510✔
399
    int32_t size = pBatch->len - offset;
14,841✔
400
    if (size > 0) {
14,841✔
401
      code = blockWrapperResize(pBlockWrapper,
14,841✔
402
                                size + BLOCK_TOTAL_SIZE((SBlock *)(pBlockWrapper->data)) + pBlockWrapper->kvSize);
14,841✔
403
      TSDB_CHECK_CODE(code, lino, _error);
14,841✔
404
    }
405

406
    if (blockAppendBatch(pBlockWrapper->data, pBatch->buf + offset, size) != size) {
14,841✔
407
      code = TSDB_CODE_INVALID_PARA;
×
408
    }
409
  }
410
_error:
21,510✔
411
  if (code != 0) {
21,510✔
412
    bseError("failed to append batch since %s", tstrerror(code));
×
413
  }
414
  if (inLock) {
21,510✔
415
    taosWUnLockLatch(&p->pMemTable->latch);
21,510✔
416
  }
417

418
  bseMemTableUnRef(p->pMemTable);
21,510✔
419
  return code;
21,510✔
420
}
421

422
int32_t tableBuilderTruncFile(STableBuilder *p, int64_t size) {
×
423
  int32_t code = 0;
×
424
  int32_t lino = 0;
×
425

426
  if (p->pDataFile == NULL) {
×
427
    return TSDB_CODE_INVALID_PARA;
×
428
  }
429
  code = taosFtruncateFile(p->pDataFile, size);
×
430
  TSDB_CHECK_CODE(code, lino, _error);
×
431

432
_error:
×
433
  if (code != 0) {
×
434
    bseError("failed to truncate file since %s", tstrerror(code));
×
435
  }
436
  return code;
×
437
}
438

439
int32_t compareFunc(const void *pLeft, const void *pRight) {
20,553,300✔
440
  SBlkHandle *p1 = (SBlkHandle *)pLeft;
20,553,300✔
441
  SBlkHandle *p2 = (SBlkHandle *)pRight;
20,553,300✔
442
  if (p1->range.sseq > p2->range.sseq) {
20,553,300✔
443
    return 1;
20,552,743✔
444
  } else if (p1->range.sseq < p2->range.sseq) {
557✔
445
    return -1;
×
446
  }
447
  return 0;
557✔
448
}
449
int32_t findTargetBlock(SArray *pMetaHandle, int64_t seq) {
×
450
  SBlkHandle handle = {.range = {.sseq = seq, .eseq = seq}};
×
451
  return taosArraySearchIdx(pMetaHandle, &handle, compareFunc, TD_LE);
×
452
}
453

454
int32_t findInMemtable(STableMemTable *p, int64_t seq, uint8_t **value, int32_t *len) {
21,066,099✔
455
  int32_t code = 0;
21,066,099✔
456
  int8_t  inBuf = 1;
21,066,099✔
457
  int32_t lino = 0;
21,066,099✔
458
  int8_t  inLock = 0;
21,066,099✔
459
  if (p == NULL) {
21,066,099✔
460
    return TSDB_CODE_NOT_FOUND;
×
461
  }
462

463
  code = bseMemTableRef(p);
21,066,099✔
464
  if (code != 0) {
21,066,099✔
465
    return code;
×
466
  }
467

468
  taosRLockLatch(&p->latch);
21,066,099✔
469
  inLock = 1;
21,066,099✔
470

471
  if (!seqRangeContains(&p->tableRange, seq)) {
21,066,099✔
472
    TSDB_CHECK_CODE(TSDB_CODE_NOT_FOUND, lino, _error);
×
473
  }
474

475
  if (taosArrayGetSize(p->pMetaHandle) > 0) {
21,066,099✔
476
    SBlkHandle *pHandle = taosArrayGetLast(p->pMetaHandle);
×
477
    if (!seqRangeIsGreater(&pHandle->range, seq)) {
×
478
      inBuf = 0;
×
479
      int32_t idx = findTargetBlock(p->pMetaHandle, seq);
×
480
      if (idx < 0) {
×
481
        TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_RANGE, lino, _error);
×
482
      }
483
      pHandle = taosArrayGet(p->pMetaHandle, idx);
×
484
      code = tableBuilderSeek(p->pTableBuilder, pHandle, seq, value, len);
×
485
      TSDB_CHECK_CODE(code, lino, _error);
×
486
    }
487
  }
488

489
  if (inBuf == 1) {
21,066,099✔
490
    code = blockWrapperSeek(&p->pBlockWrapper, seq, value, len);
21,066,099✔
491
    if (code != 0) {
21,066,099✔
492
      bseInfo("mem table range [%" PRId64 ", %" PRId64 "]", p->tableRange.sseq, p->tableRange.eseq);
×
493
    }
494
    TSDB_CHECK_CODE(code, lino, _error);
21,066,099✔
495
  }
496
_error:
21,066,099✔
497
  if (inLock) {
21,066,099✔
498
    taosRUnLockLatch(&p->latch);
21,066,099✔
499
  }
500
  bseMemTableUnRef(p);
21,066,099✔
501
  if (code != 0) {
21,066,099✔
502
    bseInfo("failed to find seq %" PRId64 " in memtable %p at line %d since %s", seq, p, lino, tstrerror(code));
×
503
  }
504
  return code;
21,066,099✔
505
}
506
int32_t tableBuilderGet(STableBuilder *p, int64_t seq, uint8_t **value, int32_t *len) {
21,066,099✔
507
  int32_t code = 0;
21,066,099✔
508
  if (p == NULL) {
21,066,099✔
509
    return TSDB_CODE_NOT_FOUND;
×
510
  }
511

512
  code = findInMemtable(p->pMemTable, seq, value, len);
21,066,099✔
513
  if (code != 0) {
21,066,099✔
514
    code = findInMemtable(p->pImmuMemTable, seq, value, len);
×
515
  }
516
  return code;
21,066,099✔
517
}
518

519
static void updateTableRange(SBTableMeta *pTableMeta, SArray *pMetaBlock) {
3,537✔
520
  if (pMetaBlock == NULL) {
3,537✔
521
    return;
×
522
  }
523

524
  for (int32_t i = 0; i < taosArrayGetSize(pMetaBlock); i++) {
7,074✔
525
    SMetaBlock *pMeta = taosArrayGet(pMetaBlock, i);
3,537✔
526
    seqRangeUpdate(&pTableMeta->range, &pMeta->range);
3,537✔
527
  }
528
}
529
static int32_t tableBuilderClearImmuMemTable(STableBuilder *p) {
3,537✔
530
  int32_t           code = 0;
3,537✔
531
  STableBuilderMgt *pMgt = p->pBuilderMgt;
3,537✔
532
  (void)taosThreadRwlockWrlock(&pMgt->mutex);
3,537✔
533
  atomic_store_8(&p->hasImmuMemTable, 0);
3,537✔
534
  bseMemTableUnRef(p->pImmuMemTable);
3,537✔
535
  p->pImmuMemTable = NULL;
3,537✔
536

537
  (void)taosThreadRwlockUnlock(&pMgt->mutex);
3,537✔
538
  return code;
3,537✔
539
}
540
static int32_t tableBuildeSwapMemTable(STableBuilder *p) {
×
541
  int32_t code = 0;
×
542
  (void)taosThreadRwlockWrlock(&p->pBse->rwlock);
×
543
  p->pImmuMemTable = p->pMemTable;
×
544
  p->pMemTable = NULL;
×
545

546
  atomic_store_8(&p->hasImmuMemTable, 1);
×
547

548
  (void)taosThreadRwlockUnlock(&p->pBse->rwlock);
×
549
  return code;
×
550
}
551

552
int32_t tableBuilderCommit(STableBuilder *p, SBseLiveFileInfo *pInfo) {
3,537✔
553
  int32_t code = 0;
3,537✔
554
  int32_t lino = 0;
3,537✔
555

556
  STableCommitInfo commitInfo = {0};
3,537✔
557
  SArray          *pMetaBlock = NULL;
3,537✔
558
  if (p == NULL) {
3,537✔
559
    return TSDB_CODE_INVALID_PARA;
×
560
  }
561

562
  code = tableBuilderFlush(p, BSE_TABLE_DATA_TYPE, 1);
3,537✔
563
  TSDB_CHECK_CODE(code, lino, _error);
3,537✔
564

565
  code = taosFsyncFile(p->pDataFile);
3,537✔
566
  TSDB_CHECK_CODE(code, lino, _error);
3,537✔
567

568
  code = tableBuilderGetMetaBlock(p, &pMetaBlock);
3,537✔
569
  TSDB_CHECK_CODE(code, lino, _error);
3,537✔
570

571
  if (taosArrayGetSize(pMetaBlock) == 0) {
3,537✔
572
    bseDebug("no meta block to commit for table %s", p->name);
×
573
    return code;
×
574
  }
575

576
  code = tableMetaCommit(p->pTableMeta, pMetaBlock);
3,537✔
577
  TSDB_CHECK_CODE(code, lino, _error);
3,537✔
578

579
  updateTableRange(p->pTableMeta, pMetaBlock);
3,537✔
580

581
  pInfo->level = 0;
3,537✔
582
  pInfo->range = p->pTableMeta->range;
3,537✔
583
  pInfo->timestamp = p->timestamp;
3,537✔
584
  pInfo->size = p->offset;
3,537✔
585

586
  code = tableBuilderClearImmuMemTable(p);
3,537✔
587
  TSDB_CHECK_CODE(code, lino, _error);
3,537✔
588

589
_error:
3,537✔
590
  if (code != 0) {
3,537✔
591
    bseError("failed to commit table builder at line %d since %s ", lino, tstrerror(code));
×
592
  } else {
593
    bseInfo("succ to commit table %s", p->name);
3,537✔
594
  }
595
  taosArrayDestroy(pMetaBlock);
3,537✔
596
  return code;
3,537✔
597
}
598

599
int32_t tableBuilderGetBlockSize(STableBuilder *p) { return p->blockCap; }
21,066,099✔
600

601
void tableBuilderClose(STableBuilder *p, int8_t commited) {
3,721✔
602
  if (p == NULL) {
3,721✔
603
    return;
557✔
604
  }
605

606
  bseMemTableUnRef(p->pMemTable);
3,164✔
607
  bseMemTableUnRef(p->pImmuMemTable);
3,164✔
608

609
  if (taosCloseFile(&p->pDataFile) != 0) {
3,164✔
610
    bseError("failed to close table builder file %s since %s", p->name, tstrerror(terrno));
×
611
  }
612
  taosMemoryFree(p);
3,164✔
613
}
614

615
static void addSnapshotMetaToBlock(SBlockWrapper *pBlkWrapper, SSeqRange range, int8_t fileType, int8_t blockType,
×
616
                                   int64_t timestamp) {
617
  SBseSnapMeta *pSnapMeta = pBlkWrapper->data;
×
618

619
  pSnapMeta->range = range;
×
620
  pSnapMeta->fileType = fileType;
×
621
  pSnapMeta->blockType = blockType;
×
622
  pSnapMeta->timestamp = timestamp;
×
623
  return;
×
624
}
625

626
static void updateSnapshotMeta(SBlockWrapper *pBlkWrapper, SSeqRange range, int8_t fileType, int8_t blockType,
×
627
                               int64_t timestamp) {
628
  SBseSnapMeta *pSnapMeta = (SBseSnapMeta *)pBlkWrapper->data;
×
629
  pSnapMeta->timestamp = timestamp;
×
630
  return;
×
631
}
632
int32_t tableReaderLoadRawBlock(STableReader *p, SBlkHandle *pHandle, SBlockWrapper *blkWrapper) {
×
633
  int32_t code = 0;
×
634
  int32_t lino = 0;
×
635

636
  code = blockWrapperResize(blkWrapper, pHandle->size + sizeof(SBseSnapMeta));
×
637
  TSDB_CHECK_CODE(code, lino, _error);
×
638

639
  code = tableLoadRawBlock(p->pDataFile, pHandle, blkWrapper, 1);
×
640
  TSDB_CHECK_CODE(code, lino, _error);
×
641

642
  addSnapshotMetaToBlock(blkWrapper, p->range, BSE_TABLE_SNAP, BSE_TABLE_DATA_TYPE, p->timestamp);
×
643

644
_error:
×
645
  if (code != 0) {
×
646
    bseError("table reader failed to load block at line %d since %s", lino, tstrerror(code));
×
647
  }
648
  return code;
×
649
}
650

651
int32_t tableReaderLoadRawMeta(STableReader *p, SBlkHandle *pHandle, SBlockWrapper *blkWrapper) {
×
652
  int32_t code = 0;
×
653
  int32_t lino = 0;
×
654

655
  SBtableMetaReader *pReader = p->pMetaReader;
×
656

657
  code = blockWrapperResize(blkWrapper, pHandle->size + sizeof(SBseSnapMeta));
×
658
  TSDB_CHECK_CODE(code, lino, _error);
×
659

660
  code = tableLoadRawBlock(pReader->pFile, pHandle, blkWrapper, 1);
×
661
  TSDB_CHECK_CODE(code, lino, _error);
×
662

663
  addSnapshotMetaToBlock(blkWrapper, p->range, BSE_TABLE_META_SNAP, BSE_TABLE_META_TYPE, p->timestamp);
×
664
_error:
×
665
  if (code != 0) {
×
666
    bseError("failed to load raw meta from table pReaderMgt at line %d lino since %s", lino, tstrerror(code));
×
667
  }
668
  return code;
×
669
}
670
int32_t tableReaderLoadRawMetaIndex(STableReader *p, SBlockWrapper *blkWrapper) {
×
671
  int32_t code = 0;
×
672
  int32_t lino = 0;
×
673

674
  SBtableMetaReader *pReader = p->pMetaReader;
×
675
  SBlkHandle        *pHandle = p->pMetaReader->footer.metaHandle;
×
676

677
  code = blockWrapperResize(blkWrapper, pHandle->size + sizeof(SBseSnapMeta));
×
678
  TSDB_CHECK_CODE(code, lino, _error);
×
679

680
  code = tableLoadRawBlock(pReader->pFile, pHandle, blkWrapper, 1);
×
681
  TSDB_CHECK_CODE(code, lino, _error);
×
682

683
  addSnapshotMetaToBlock(blkWrapper, p->range, BSE_TABLE_META_SNAP, BSE_TABLE_META_INDEX_TYPE, p->timestamp);
×
684
_error:
×
685
  if (code != 0) {
×
686
    bseError("failed to load raw meta from table pReaderMgt at line %d lino since %s", lino, tstrerror(code));
×
687
  }
688
  return code;
×
689
}
690
int32_t tableReaderLoadRawFooter(STableReader *p, SBlockWrapper *blkWrapper) {
×
691
  int32_t code = 0;
×
692
  int32_t lino = 0;
×
693
  char    buf[kEncodeLen] = {0};
×
694

695
  SBtableMetaReader *pReader = p->pMetaReader;
×
696
  code = footerEncode(&pReader->footer, buf);
×
697
  int32_t len = sizeof(buf);
×
698

699
  int64_t n = taosLSeekFile(pReader->pFile, -kEncodeLen, SEEK_END);
×
700
  if (n < 0) {
×
701
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
702
  }
703

704
  if (taosReadFile(pReader->pFile, buf, sizeof(buf)) != len) {
×
705
    TSDB_CHECK_CODE(terrno, lino, _error);
×
706
  }
707

708
  code = blockWrapperResize(blkWrapper, len + sizeof(SBseSnapMeta));
×
709
  TSDB_CHECK_CODE(code, lino, _error);
×
710

711
  memcpy((uint8_t *)blkWrapper->data + sizeof(SBseSnapMeta), buf, sizeof(buf));
×
712
  blkWrapper->size = len + sizeof(SBseSnapMeta);
×
713

714
  addSnapshotMetaToBlock(blkWrapper, p->range, BSE_TABLE_META_SNAP, BSE_TABLE_FOOTER_TYPE, p->timestamp);
×
715
_error:
×
716
  if (code != 0) {
×
717
    bseError("failed to load raw footer from table pReaderMgt at lino %d since %s", lino, tstrerror(code));
×
718
  }
719
  return code;
×
720
}
721

722
int32_t tableReaderOpen(int64_t timestamp, STableReader **pReader, void *pReaderMgt) {
10,527,300✔
723
  char data[TSDB_FILENAME_LEN] = {0};
10,527,300✔
724
  char meta[TSDB_FILENAME_LEN] = {0};
10,527,300✔
725

726
  char dataPath[TSDB_FILENAME_LEN] = {0};
10,527,300✔
727

728
  int32_t code = 0;
10,527,300✔
729
  int32_t lino = 0;
10,527,300✔
730
  int64_t size = 0;
10,527,300✔
731

732
  STableReaderMgt *pMgt = (STableReaderMgt *)pReaderMgt;
10,527,300✔
733
  if (pMgt == NULL) {
10,527,300✔
734
    return TSDB_CODE_INVALID_CFG;
×
735
  }
736

737
  SSubTableMgt *pMeta = pMgt->pMgt;
10,527,300✔
738

739
  STableReader *p = taosMemCalloc(1, sizeof(STableReader));
10,527,300✔
740
  if (p == NULL) {
10,527,300✔
741
    TSDB_CHECK_CODE(terrno, lino, _error);
×
742
  }
743

744
  bseBuildDataName(timestamp, data);
10,527,300✔
745

746
  p->timestamp = timestamp;
10,527,300✔
747
  p->blockCap = 1024;
10,527,300✔
748
  p->pReaderMgt = pReaderMgt;
10,527,300✔
749
  memcpy(p->name, data, strlen(data));
10,527,300✔
750

751
  bseBuildFullName(pMgt->pBse, data, dataPath);
10,527,300✔
752
  code = tableOpenFile(dataPath, 1, &p->pDataFile, &p->fileSize);
10,527,300✔
753
  TSDB_CHECK_CODE(code, lino, _error);
10,527,300✔
754

755
  code = blockWrapperInit(&p->blockWrapper, 1024);
10,527,300✔
756
  TSDB_CHECK_CODE(code, lino, _error);
10,527,300✔
757

758
  // Set pBse pointer for encryption/decryption
759
  p->blockWrapper.pBse = pMgt->pBse;
10,527,300✔
760

761
  bseBuildMetaName(timestamp, meta);
10,527,300✔
762
  code = tableMetaReaderInit(pMeta->pTableMetaMgt->pTableMeta, meta, &p->pMetaReader);
10,527,300✔
763
  TSDB_CHECK_CODE(code, lino, _error);
10,527,300✔
764

765
  *pReader = p;
10,527,300✔
766

767
_error:
10,527,300✔
768
  if (code != 0) {
10,527,300✔
769
    tableReaderClose(p);
×
770
    bseError("failed to open table pReaderMgt at line %d since %s", lino, tstrerror(code));
×
771
  }
772
  return code;
10,527,300✔
773
}
774
void tableReaderShouldPutToCache(STableReader *p, int8_t cache) { p->putInCache = cache; }
×
775

776
int32_t tableReaderGet(STableReader *p, int64_t seq, uint8_t **pValue, int32_t *len) {
10,527,300✔
777
  int32_t    lino = 0;
10,527,300✔
778
  int32_t    code = 0;
10,527,300✔
779
  SMetaBlock block = {0};
10,527,300✔
780

781
  STableReaderMgt   *pMgt = (STableReaderMgt *)p->pReaderMgt;
10,527,300✔
782
  SBtableMetaReader *pMeta = p->pMetaReader;
10,527,300✔
783

784
  code = tableMetaReaderLoadBlockMeta(pMeta, seq, &block);
10,527,300✔
785
  TSDB_CHECK_CODE(code, lino, _error);
10,527,300✔
786

787
  SBlockWrapper wrapper = {0};
10,527,300✔
788
  SBlkHandle    blkhandle = {.offset = block.offset, .size = block.size, .range = block.range};
10,527,300✔
789

790
  SCacheItem *pItem = NULL;
10,527,300✔
791
  code = blockCacheGet(pMgt->pBlockCache, &blkhandle.range, (void **)&pItem);
10,527,300✔
792
  if (code != 0) {
10,527,300✔
793
    code = blockWrapperInit(&wrapper, block.size + 16);
1,671✔
794
    TSDB_CHECK_CODE(code, lino, _error);
1,671✔
795

796
    bseDebug("block size:%" PRId64 ", offset:%" PRId64 ", [sseq:%" PRId64 ", eseq:%" PRId64 "]", block.size,
1,671✔
797
             block.offset, block.range.sseq, block.range.eseq);
798

799
    code = tableLoadBlock(p->pDataFile, &blkhandle, &wrapper);
1,671✔
800
    if (code != 0) {
1,671✔
801
      blockWrapperCleanup(&wrapper);
×
802
      TSDB_CHECK_CODE(code, lino, _error);
×
803
    }
804

805
    code = blockCachePut(pMgt->pBlockCache, &block.range, wrapper.data);
1,671✔
806
    TSDB_CHECK_CODE(code, lino, _error);
1,671✔
807

808
  } else {
809
    wrapper.data = pItem->pItem;
10,525,629✔
810
    wrapper.pCachItem = pItem;
10,525,629✔
811
  }
812

813
  code = blockSeek(wrapper.data, seq, pValue, len);
10,527,300✔
814
  TSDB_CHECK_CODE(code, lino, _error);
10,527,300✔
815

816
  if (wrapper.pCachItem != NULL) {
10,527,300✔
817
    bseCacheUnrefItem(wrapper.pCachItem);
10,525,629✔
818
  }
819
  blockWrapperClearMeta(&wrapper);
10,527,300✔
820

821
_error:
10,527,300✔
822
  if (code != 0) {
10,527,300✔
823
    bseError("failed to get table reader data at line %d since %s", lino, tstrerror(code));
×
824
  }
825
  return code;
10,527,300✔
826
}
827
int32_t tableReaderGetMeta(STableReader *p, SArray **pMeta) {
×
828
  int32_t code = 0;
×
829
  int32_t lino = 0;
×
830

831
  SArray *pMetaHandle = taosArrayInit(128, sizeof(SBlkHandle));
×
832
  if (pMetaHandle == NULL) {
×
833
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
834
  }
835

836
  code = tableMetaReaderLoadAllDataHandle(p->pMetaReader, pMetaHandle);
×
837
  TSDB_CHECK_CODE(code, lino, _error);
×
838

839
  *pMeta = pMetaHandle;
×
840

841
_error:
×
842
  if (code != 0) {
×
843
    bseError("failed to get table reader meta at lino %d since %s", lino, tstrerror(code));
×
844
  }
845
  return code;
×
846
}
847

848
void tableReaderClose(STableReader *p) {
10,527,300✔
849
  if (p == NULL) return;
10,527,300✔
850
  int32_t code = 0;
10,527,300✔
851

852
  taosArrayDestroy(p->pMetaHandle);
10,527,300✔
853

854
  if (taosCloseFile(&p->pDataFile) != 0) {
10,527,300✔
855
    bseError("failed to close table reader file %s since %s", p->name, tstrerror(terrno));
×
856
  }
857
  tableMetaReaderClose(p->pMetaReader);
10,527,300✔
858
  blockWrapperCleanup(&p->blockWrapper);
10,527,300✔
859

860
  taosMemoryFree(p);
10,527,300✔
861
}
862

863
int32_t blockCreate(int32_t cap, SBlock **p) {
×
864
  int32_t code = 0;
×
865
  SBlock *t = taosMemCalloc(1, cap);
×
866
  if (t == NULL) {
×
867
    return terrno;
×
868
  }
869
  *p = t;
×
870
  return code;
×
871
}
872

873
int32_t blockEsimateSize(SBlock *p, int32_t extra) { return BLOCK_TOTAL_SIZE(p) + extra; }
21,069,636✔
874

875
int32_t blockWrapperSize(SBlockWrapper *p, int32_t extra) {
21,066,099✔
876
  if (p == NULL || p->data == NULL) {
21,066,099✔
877
    return 0;
×
878
  }
879

880
  return p->kvSize + blockEsimateSize(p->data, extra) + 12;
21,066,099✔
881
}
882
int32_t blockAppendBatch(SBlock *p, uint8_t *value, int32_t len) {
14,841✔
883
  int32_t  code = 0;
14,841✔
884
  int32_t  offset = 0;
14,841✔
885
  uint8_t *data = (uint8_t *)p->data + p->len;
14,841✔
886
  memcpy(data, value, len);
14,841✔
887
  p->len += len;
14,841✔
888
  return len;
14,841✔
889
}
890
int32_t blockPut(SBlock *p, int64_t seq, uint8_t *value, int32_t len) {
×
891
  int32_t  code = 0;
×
892
  uint8_t *data = (uint8_t *)p->data + p->len;
×
893

894
  int32_t offset = taosEncodeVariantI64((void **)&data, seq);
×
895
  offset += taosEncodeVariantI32((void **)&data, len);
×
896
  offset += taosEncodeBinary((void **)&data, value, len);
×
897
  p->len += len;
×
898
  return offset;
×
899
}
900
void blockClear(SBlock *p) {
7,074✔
901
  p->len = 0;
7,074✔
902
  p->type = 0;
7,074✔
903
  p->data[0] = 0;
7,074✔
904
}
7,074✔
905

906
int32_t blockSeek(SBlock *p, int64_t seq, uint8_t **pValue, int32_t *len) {
10,527,300✔
907
  int8_t  found = 0;
10,527,300✔
908
  int32_t code = 0;
10,527,300✔
909
  int32_t offset = 0;
10,527,300✔
910

911
  uint8_t *p1 = (uint8_t *)p->data;
10,527,300✔
912
  uint8_t *p2 = p1 + p->offset;
10,527,300✔
913
  while (p2 - p1 < p->len) {
2,147,483,647✔
914
    int64_t k;
2,147,483,647✔
915
    int32_t v;
2,147,483,647✔
916
    p2 = taosDecodeVariantI64(p2, &k);
2,147,483,647✔
917
    p2 = taosDecodeVariantI32(p2, &v);
2,147,483,647✔
918

919
    if (seq == k) {
2,147,483,647✔
920
      *len = v;
10,527,300✔
921
      found = 1;
10,527,300✔
922
      *pValue = taosMemoryCalloc(1, v);
10,527,300✔
923
      if (*pValue == NULL) {
10,527,300✔
924
        return terrno;
×
925
      }
926
      memcpy(*pValue, (uint8_t *)p->data + offset, v);
10,527,300✔
927
      break;
10,527,300✔
928
    }
929
    offset += v;
2,147,483,647✔
930
  }
931
  if (found == 0) {
10,527,300✔
932
    code = TSDB_CODE_NOT_FOUND;
×
933
  }
934
  return code;
10,527,300✔
935
}
936

937
int32_t blockWrapperSeek(SBlockWrapper *p, int64_t tgt, uint8_t **pValue, int32_t *len) {
21,066,099✔
938
  int32_t code = 0;
21,066,099✔
939
  if (p == NULL || p->data == NULL) {
21,066,099✔
940
    return TSDB_CODE_NOT_FOUND;
×
941
  }
942
  int32_t  offset = 0;
21,066,099✔
943
  uint8_t *p1 = p->kvBuffer;
21,066,099✔
944
  uint8_t *p2 = p1;
21,066,099✔
945
  SBlock  *pBlk = (SBlock *)p->data;
21,066,099✔
946
  while ((p2 - p1) < p->kvSize) {
2,147,483,647✔
947
    int64_t seq = 0;
2,147,483,647✔
948
    int32_t vlen = 0;
2,147,483,647✔
949
    p2 = taosDecodeVariantI64(p2, &seq);
2,147,483,647✔
950
    p2 = taosDecodeVariantI32(p2, &vlen);
2,147,483,647✔
951

952
    if (seq == tgt) {
2,147,483,647✔
953
      *len = vlen;
21,066,099✔
954
      *pValue = taosMemoryCalloc(1, vlen);
21,066,099✔
955
      if (*pValue == NULL) {
21,066,099✔
956
        return terrno;
×
957
      }
958
      uint8_t *pdata = (uint8_t *)pBlk->data + offset;
21,066,099✔
959
      memcpy(*pValue, pdata, vlen);
21,066,099✔
960
      return 0;
21,066,099✔
961
    }
962
    offset += vlen;
2,147,483,647✔
963
  }
964
  bseInfo("blockWrapperSeek not found seq:%" PRId64 ", tgt:%" PRId64 "", tgt, tgt);
×
965
  return TSDB_CODE_BLOB_SEQ_NOT_FOUND;
×
966
}
967

968
int8_t blockGetType(SBlock *p) { return p->type; }
10,530,085✔
969
void   blockDestroy(SBlock *pBlock) { taosMemoryFree(pBlock); }
×
970

971
int32_t metaBlockAddIndex(SBlock *p, SBlkHandle *pInfo) {
5,208✔
972
  int32_t  code = 0;
5,208✔
973
  uint8_t *data = (uint8_t *)p->data + p->len;
5,208✔
974
  int32_t  offset = blkHandleEncode(pInfo, (char *)data);
5,208✔
975
  p->len += offset;
5,208✔
976
  return offset;
5,208✔
977
}
978

979
int32_t blkHandleEncode(SBlkHandle *pHandle, char *buf) {
12,282✔
980
  char   *p = buf;
12,282✔
981
  int32_t tlen = 0;
12,282✔
982
  tlen += taosEncodeVariantU64((void **)&p, pHandle->offset);
12,282✔
983
  tlen += taosEncodeVariantU64((void **)&p, pHandle->size);
12,282✔
984
  tlen += taosEncodeVariantI64((void **)&p, pHandle->range.sseq);
12,282✔
985
  tlen += taosEncodeVariantI64((void **)&p, pHandle->range.eseq);
12,282✔
986
  return tlen;
12,282✔
987
}
988
int32_t blkHandleDecode(SBlkHandle *pHandle, char *buf) {
46,624,799✔
989
  char *p = buf;
46,624,799✔
990
  p = taosDecodeVariantU64(p, &pHandle->offset);
46,624,799✔
991
  p = taosDecodeVariantU64(p, &pHandle->size);
46,624,799✔
992
  p = taosDecodeVariantI64(p, &pHandle->range.sseq);
46,624,799✔
993
  p = taosDecodeVariantI64(p, &pHandle->range.eseq);
46,624,799✔
994
  return p - buf;
46,624,799✔
995
}
996

997
// | meta handle | index handle | padding | magic number high | magic number low |
998
int32_t footerEncode(STableFooter *pFooter, char *buf) {
3,537✔
999
  char   *p = buf;
3,537✔
1000
  int32_t len = 0;
3,537✔
1001
  len += blkHandleEncode(pFooter->metaHandle, p + len);
3,537✔
1002
  len += blkHandleEncode(pFooter->indexHandle, p + len);
3,537✔
1003

1004
  p = buf + kEncodeLen - 8;
3,537✔
1005
  len += taosEncodeFixedU32((void **)&p, kMagicNum);
3,537✔
1006
  len += taosEncodeFixedU32((void **)&p, kMagicNum);
3,537✔
1007
  return 0;
3,537✔
1008
}
1009
int32_t footerDecode(STableFooter *pFooter, char *buf) {
10,528,414✔
1010
  int32_t  code = 0;
10,528,414✔
1011
  char    *p = buf;
10,528,414✔
1012
  char    *mp = buf + kEncodeLen - 8;
10,528,414✔
1013
  uint32_t ml, mh;
10,528,414✔
1014

1015
  if (taosDecodeFixedU32(mp, &ml) == NULL) {
10,528,414✔
1016
    return TSDB_CODE_FILE_CORRUPTED;
×
1017
  }
1018
  if (taosDecodeFixedU32(mp + 4, &mh) == NULL) {
21,056,828✔
1019
    return TSDB_CODE_FILE_CORRUPTED;
×
1020
  }
1021

1022
  if (ml != kMagicNum || mh != kMagicNum) {
10,528,414✔
1023
    return TSDB_CODE_FILE_CORRUPTED;
×
1024
  }
1025

1026
  int32_t len = blkHandleDecode(pFooter->metaHandle, buf);
10,528,414✔
1027
  if (len < 0) {
10,528,414✔
1028
    return TSDB_CODE_FILE_CORRUPTED;
×
1029
  }
1030

1031
  len = blkHandleDecode(pFooter->indexHandle, buf + len);
10,528,414✔
1032
  if (len < 0) {
10,528,414✔
1033
    return TSDB_CODE_FILE_CORRUPTED;
×
1034
  }
1035
  return code;
10,528,414✔
1036
}
1037

1038
int32_t blockSeekMeta(SBlock *pBlock, int64_t seq, SMetaBlock *pMeta) {
10,527,300✔
1039
  int32_t  code = 0;
10,527,300✔
1040
  int32_t  len = 0;
10,527,300✔
1041
  uint8_t *p = (uint8_t *)pBlock->data;
10,527,300✔
1042

1043
  while (len < pBlock->len) {
10,527,300✔
1044
    SMetaBlock meta = {0};
10,527,300✔
1045
    int32_t    offset = metaBlockDecode(&meta, (char *)p);
10,527,300✔
1046
    if (seqRangeContains(&meta.range, seq)) {
10,527,300✔
1047
      memcpy(pMeta, &meta, sizeof(SMetaBlock));
10,527,300✔
1048
      return 0;
10,527,300✔
1049
    }
1050
    len += offset;
×
1051
    p += offset;
×
1052
  }
1053
  return TSDB_CODE_NOT_FOUND;
×
1054
}
1055
int32_t blockGetAllMeta(SBlock *pBlock, SArray *pMeta) {
×
1056
  int32_t  code = 0;
×
1057
  int32_t  len = 0;
×
1058
  uint8_t *p = (uint8_t *)pBlock->data;
×
1059

1060
  while (len < pBlock->len) {
×
1061
    SMetaBlock meta = {0};
×
1062
    int32_t    offset = metaBlockDecode(&meta, (char *)p);
×
1063
    if (taosArrayPush(pMeta, &meta) == NULL) {
×
1064
      return terrno;
×
1065
    }
1066
    len += offset;
×
1067
    p += offset;
×
1068
  }
1069

1070
  return code;
×
1071
}
1072

1073
int32_t metaBlockEncode(SMetaBlock *pMeta, char *buf) {
3,537✔
1074
  char   *p = buf;
3,537✔
1075
  int32_t len = 0;
3,537✔
1076
  len += taosEncodeFixedI8((void **)&p, pMeta->type);
3,537✔
1077
  len += taosEncodeFixedI8((void **)&p, pMeta->version);
3,537✔
1078
  len += taosEncodeFixedI16((void **)&p, pMeta->reserve);
3,537✔
1079
  len += taosEncodeVariantI64((void **)&p, pMeta->offset);
3,537✔
1080
  len += taosEncodeVariantI64((void **)&p, pMeta->size);
3,537✔
1081
  len += taosEncodeVariantI64((void **)&p, pMeta->range.sseq);
3,537✔
1082
  len += taosEncodeVariantI64((void **)&p, pMeta->range.eseq);
3,537✔
1083
  return len;
3,537✔
1084
}
1085
int32_t metaBlockDecode(SMetaBlock *pMeta, char *buf) {
10,527,300✔
1086
  char   *p = buf;
10,527,300✔
1087
  int32_t len = 0;
10,527,300✔
1088
  p = taosDecodeFixedI8(p, &pMeta->type);
10,527,300✔
1089
  p = taosDecodeFixedI8(p, &pMeta->version);
10,527,300✔
1090
  p = taosDecodeFixedI16(p, &pMeta->reserve);
10,527,300✔
1091
  p = taosDecodeVariantI64(p, &pMeta->offset);
10,527,300✔
1092
  p = taosDecodeVariantI64(p, &pMeta->size);
10,527,300✔
1093
  p = taosDecodeVariantI64(p, &pMeta->range.sseq);
10,527,300✔
1094
  p = taosDecodeVariantI64(p, &pMeta->range.eseq);
10,527,300✔
1095
  return p - buf;
10,527,300✔
1096
}
1097
int32_t metaBlockAdd(SBlock *p, SMetaBlock *pBlk) {
3,537✔
1098
  int32_t  code = 0;
3,537✔
1099
  uint8_t *data = (uint8_t *)p->data + p->len;
3,537✔
1100
  int32_t  offset = metaBlockEncode(pBlk, (char *)data);
3,537✔
1101
  p->len += offset;
3,537✔
1102
  return offset;
3,537✔
1103
}
1104
int32_t metaBlockGet(SBlock *p, SMetaBlock *pBlk) {
×
1105
  int32_t  code = 0;
×
1106
  uint8_t *data = (uint8_t *)p->data + p->len;
×
1107
  int32_t  offset = metaBlockDecode(pBlk, (char *)data);
×
1108
  p->len += offset;
×
1109
  return offset;
×
1110
}
1111

1112
int32_t tableFlushBlock(TdFilePtr pFile, SBlkHandle *pHandle, SBlockWrapper *pBlkW, int32_t *nWrite) {
8,745✔
1113
  int32_t code = 0;
8,745✔
1114
  int32_t lino = 0;
8,745✔
1115

1116
  SBlock *pBlk = pBlkW->data;
8,745✔
1117
  if (pBlk->len == 0) {
8,745✔
1118
    return 0;
×
1119
  }
1120
  pBlk->version = BSE_META_VER;
8,745✔
1121
  int8_t compressType = kNoCompres;
8,745✔
1122

1123
  SBlockWrapper wrapper = {0};
8,745✔
1124
  uint8_t      *encryptBuf = NULL;
8,745✔
1125

1126
  uint8_t *pWrite = (uint8_t *)pBlk;
8,745✔
1127
  int32_t  len = BLOCK_TOTAL_SIZE(pBlk);
8,745✔
1128
  int32_t  plainLen = len;
8,745✔
1129

1130
  BLOCK_SET_COMPRESS_TYPE(pBlk, compressType);
8,745✔
1131
  BLOCK_SET_ROW_SIZE(pBlk, BLOCK_ROW_SIZE(pBlk));
8,745✔
1132

1133
  if (compressType != kNoCompres) {
8,745✔
1134
    code = blockWrapperInit(&wrapper, len + 4);
×
1135
    TSDB_CHECK_CODE(code, lino, _error);
×
1136

1137
    int32_t compressSize = wrapper.cap;
×
1138
    code = bseCompressData(compressType, pWrite, BLOCK_ROW_SIZE(pBlk), wrapper.data, &compressSize);
×
1139
    if (code != 0) {
×
1140
      bseWarn("failed to compress data since %s, not set compress", tstrerror(TSDB_CODE_THIRDPARTY_ERROR));
×
1141

1142
      blockWrapperCleanup(&wrapper);
×
1143
      BLOCK_SET_COMPRESS_TYPE(pBlk, kNoCompres);
×
1144
      BLOCK_SET_ROW_SIZE(pBlk, BLOCK_ROW_SIZE(pBlk));
×
1145
    } else {
1146
      int32_t rawSize = BLOCK_ROW_SIZE(pBlk);
×
1147
      COMREPSS_DATA_SET_TYPE_AND_RAWLEN(wrapper.data, compressSize, compressType, rawSize);
×
1148
      len = compressSize + BLOCK_TAIL_LEN;
×
1149

1150
      pWrite = (uint8_t *)wrapper.data;
×
NEW
1151
      plainLen = len;
×
1152
    }
1153
  }
1154

1155
  // Encrypt data if encryption is enabled (before checksum)
1156
  SBse *pBse = (SBse *)pBlkW->pBse;
8,745✔
1157
  if (pBse != NULL && pBse->cfg.encryptKey[0] != '\0') {
8,745✔
1158
    // Encrypt data excluding BLOCK_TAIL_LEN (to keep compression info readable)
1159
    // plainLen includes checksum, subtract BLOCK_TAIL_LEN
NEW
1160
    int32_t plainDataLen = plainLen - BLOCK_TAIL_LEN;
×
NEW
1161
    int32_t cryptedLen = ENCRYPTED_LEN(plainDataLen);
×
1162

1163
    // Allocate buffer for encrypted data + BLOCK_TAIL_LEN
NEW
1164
    encryptBuf = taosMemoryMalloc(cryptedLen + BLOCK_TAIL_LEN);
×
NEW
1165
    if (encryptBuf == NULL) {
×
NEW
1166
      TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1167
    }
1168

1169
    // Pad and encrypt data part only
NEW
1170
    (void)memset(encryptBuf, 0, cryptedLen);
×
NEW
1171
    (void)memcpy(encryptBuf, pWrite, plainDataLen);
×
1172

1173
    // Encrypt using CBC
NEW
1174
    SCryptOpts opts = {0};
×
NEW
1175
    opts.len = cryptedLen;
×
NEW
1176
    opts.source = (char *)encryptBuf;
×
NEW
1177
    opts.result = (char *)encryptBuf;  // Encrypt in place
×
NEW
1178
    opts.unitLen = 16;
×
NEW
1179
    opts.pOsslAlgrName = pBse->cfg.encryptAlgrName;
×
NEW
1180
    tstrncpy((char *)opts.key, pBse->cfg.encryptKey, ENCRYPT_KEY_LEN + 1);
×
1181

NEW
1182
    int32_t count = CBC_Encrypt(&opts);
×
NEW
1183
    if (count != opts.len) {
×
NEW
1184
      TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1185
    }
1186

1187
    // Copy BLOCK_TAIL_LEN (compression info) unencrypted
NEW
1188
    (void)memcpy(encryptBuf + cryptedLen, pWrite + plainDataLen, BLOCK_TAIL_LEN);
×
1189

NEW
1190
    pWrite = encryptBuf;
×
NEW
1191
    len = cryptedLen + BLOCK_TAIL_LEN;
×
1192
  }
1193

1194
  code = taosCalcChecksumAppend(0, (uint8_t *)pWrite, len);
8,745✔
1195
  TSDB_CHECK_CODE(code, lino, _error);
8,745✔
1196

1197
  int64_t n = taosLSeekFile(pFile, pHandle->offset, SEEK_SET);
8,745✔
1198
  if (n < 0) {
8,745✔
1199
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1200
  }
1201

1202
  int32_t nwrite = taosWriteFile(pFile, (uint8_t *)pWrite, len);
8,745✔
1203
  if (nwrite != len) {
8,745✔
1204
    code = terrno;
×
1205
    TSDB_CHECK_CODE(code, lino, _error);
×
1206
  }
1207
  *nWrite = nwrite;
8,745✔
1208
  blockWrapperCleanup(&wrapper);
8,745✔
1209
  taosMemoryFree(encryptBuf);
8,745✔
1210
_error:
8,745✔
1211
  if (code != 0) {
8,745✔
1212
    bseError("failed to flush table builder at line %d since %s", lino, tstrerror(code));
×
1213
  } else {
1214
    bseDebug("flush at offset %" PRId64 ", size %d", pHandle->offset, len);
8,745✔
1215
  }
1216
  return code;
8,745✔
1217
}
1218
int32_t tableLoadBlock(TdFilePtr pFile, SBlkHandle *pHandle, SBlockWrapper *pBlkW) {
21,059,056✔
1219
  int32_t code = 0;
21,059,056✔
1220
  int32_t lino = 0;
21,059,056✔
1221

1222
  code = blockWrapperResize(pBlkW, pHandle->size + 16);
21,059,056✔
1223
  TSDB_CHECK_CODE(code, lino, _error);
21,059,056✔
1224

1225
  SBlock  *pBlk = pBlkW->data;
21,059,056✔
1226
  uint8_t *pRead = (uint8_t *)pBlk;
21,059,056✔
1227

1228
  SBlockWrapper pHelp = {0};
21,059,056✔
1229
  uint8_t      *decryptBuf = NULL;
21,059,056✔
1230

1231
  int64_t n = taosLSeekFile(pFile, pHandle->offset, SEEK_SET);
21,059,056✔
1232
  if (n < 0) {
21,059,056✔
1233
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1234
  }
1235

1236
  int32_t nr = taosReadFile(pFile, pRead, pHandle->size);
21,059,056✔
1237
  if (nr != pHandle->size) {
21,059,056✔
1238
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _error);
×
1239
  }
1240

1241
  if (taosCheckChecksumWhole((uint8_t *)pRead, pHandle->size) != 1) {
42,118,112✔
1242
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _error);
×
1243
  }
1244

1245
  // Decrypt data if encryption is enabled
1246
  SBse   *pBse = (SBse *)pBlkW->pBse;
21,059,056✔
1247
  int32_t dataLen = pHandle->size;
21,059,056✔
1248
  if (pBse != NULL && pBse->cfg.encryptKey[0] != '\0') {
21,059,056✔
1249
    // Data is encrypted (excluding BLOCK_TAIL_LEN), decrypt it
NEW
1250
    int32_t cryptedLen = pHandle->size - BLOCK_TAIL_LEN;
×
NEW
1251
    decryptBuf = taosMemoryMalloc(cryptedLen);
×
NEW
1252
    if (decryptBuf == NULL) {
×
NEW
1253
      TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1254
    }
1255

1256
    // Decrypt using CBC
NEW
1257
    SCryptOpts opts = {0};
×
NEW
1258
    opts.len = cryptedLen;
×
NEW
1259
    opts.source = (char *)pRead;
×
NEW
1260
    opts.result = (char *)decryptBuf;
×
NEW
1261
    opts.unitLen = 16;
×
NEW
1262
    opts.pOsslAlgrName = pBse->cfg.encryptAlgrName;
×
NEW
1263
    tstrncpy(opts.key, pBse->cfg.encryptKey, ENCRYPT_KEY_LEN + 1);
×
1264

NEW
1265
    int32_t count = CBC_Decrypt(&opts);
×
NEW
1266
    if (count != cryptedLen) {
×
NEW
1267
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _error);
×
1268
    }
1269

1270
    // Copy decrypted data back, BLOCK_TAIL_LEN remains unencrypted at the end
NEW
1271
    (void)memcpy(pRead, decryptBuf, cryptedLen);
×
1272
    // BLOCK_TAIL_LEN at pRead + cryptedLen is already in place (unencrypted)
1273
  }
1274

1275
  uint8_t compressType = 0;
21,059,056✔
1276
  int32_t rawSize = 0;
21,059,056✔
1277

1278
  // Get compression info from decrypted/plain data
1279
  COMPRESS_DATA_GET_TYPE_AND_RAWLEN(pRead, dataLen, compressType, rawSize);
21,059,056✔
1280

1281
  if (compressType != kNoCompres) {
21,059,056✔
1282
    code = blockWrapperInit(&pHelp, rawSize);
1,671✔
1283
    TSDB_CHECK_CODE(code, lino, _error);
1,671✔
1284

1285
    int32_t unCompressSize = pHelp.cap;
1,671✔
1286
    code = bseDecompressData(compressType, pRead, dataLen - BLOCK_TAIL_LEN, pHelp.data, &unCompressSize);
1,671✔
1287
    if (code != 0) {
1,671✔
1288
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _error);
×
1289
    }
1290

1291
    SBlock *p = pHelp.data;
1,671✔
1292
    if (BLOCK_ROW_SIZE_OFFSET(p) != unCompressSize) {
1,671✔
1293
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _error);
×
1294
    }
1295
    blockWrapperCleanup(pBlkW);
1,671✔
1296

1297
    blockWrapperTransfer(pBlkW, &pHelp);
1,671✔
1298

1299
  } else {
1300
    // For uncompressed data, use rawSize (which equals actual data size for no compression)
1301
    // rawSize is stored in BLOCK_TAIL_LEN and is always correct even after encryption
1302
    pBlk = pBlkW->data;
21,057,385✔
1303
    int32_t expectedLen = rawSize - sizeof(SBlock);
21,057,385✔
1304
    if (pBlk->len != expectedLen) {
21,057,385✔
UNCOV
1305
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _error);
×
1306
    }
1307
  }
1308
_error:
21,059,056✔
1309
  if (code != 0) {
21,059,056✔
1310
    bseError("failed to load block at lino %d since %s, read at offset %" PRId64 ", size:%" PRId64 "", lino,
×
1311
             tstrerror(code), pHandle->offset, pHandle->size);
1312
  } else {
1313
    bseDebug("read at offset %" PRId64 ", size %" PRId64 "", pHandle->offset, pHandle->size);
21,059,056✔
1314
  }
1315

1316
  blockWrapperCleanup(&pHelp);
21,059,056✔
1317
  taosMemoryFree(decryptBuf);
21,059,056✔
1318
  return code;
21,059,056✔
1319
}
1320
int32_t tableLoadRawBlock(TdFilePtr pFile, SBlkHandle *pHandle, SBlockWrapper *pBlkW, int8_t checkSum) {
×
1321
  int32_t code = 0;
×
1322
  int32_t lino = 0;
×
1323

1324
  SBlock  *pBlk = pBlkW->data;
×
1325
  uint8_t *pRead = (uint8_t *)pBlk + sizeof(SBseSnapMeta);
×
1326

1327
  int64_t n = taosLSeekFile(pFile, pHandle->offset, SEEK_SET);
×
1328
  if (n < 0) {
×
1329
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1330
  }
1331

1332
  int32_t nr = taosReadFile(pFile, pRead, pHandle->size);
×
1333
  if (nr != pHandle->size) {
×
1334
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _error);
×
1335
  }
1336

1337
  if (checkSum) {
×
1338
    if (taosCheckChecksumWhole((uint8_t *)pRead, pHandle->size) != 1) {
×
1339
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _error);
×
1340
    }
1341
  }
1342

1343
  pBlkW->size = pHandle->size + sizeof(SBseSnapMeta);
×
1344
_error:
×
1345
  if (code != 0) {
×
1346
    bseError("failed to load block at lino %d since %s", lino, tstrerror(code));
×
1347
  }
1348
  return code;
×
1349
}
1350

1351
int8_t seqRangeContains(SSeqRange *p, int64_t seq) { return seq >= p->sseq && seq <= p->eseq; }
62,681,409✔
1352

1353
void seqRangeReset(SSeqRange *p) {
22,499✔
1354
  p->sseq = -1;
22,499✔
1355
  p->eseq = -1;
22,499✔
1356
}
22,499✔
1357

1358
int8_t seqRangeIsGreater(SSeqRange *p, int64_t seq) { return seq > p->eseq; }
×
1359

1360
void seqRangeUpdate(SSeqRange *dst, SSeqRange *src) {
63,263,262✔
1361
  if (dst->sseq == -1) {
63,263,262✔
1362
    dst->sseq = src->sseq;
19,535✔
1363
  }
1364
  dst->eseq = src->eseq;
63,263,262✔
1365
}
63,263,262✔
1366

1367
int32_t blockWrapperInit(SBlockWrapper *p, int32_t cap) {
21,078,407✔
1368
  int32_t code = 0;
21,078,407✔
1369
  int32_t lino = 0;
21,078,407✔
1370
  p->data = taosMemoryCalloc(1, cap);
21,078,407✔
1371
  if (p->data == NULL) {
21,078,407✔
1372
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1373
  }
1374

1375
  p->kvSize = 0;
21,078,407✔
1376
  p->kvCap = 128;
21,078,407✔
1377
  p->kvBuffer = taosMemoryCalloc(1, p->kvCap);
21,078,407✔
1378
  if (p->kvBuffer == NULL) {
21,078,407✔
1379
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1380
  }
1381

1382
  SBlock *block = (SBlock *)p->data;
21,078,407✔
1383
  block->offset = 0;
21,078,407✔
1384
  block->version = 0;
21,078,407✔
1385
  p->cap = cap;
21,078,407✔
1386
_error:
21,078,407✔
1387
  if (code != 0) {
21,078,407✔
1388
    blockWrapperCleanup(p);
×
1389
  }
1390
  return code;
21,078,407✔
1391
}
1392
int32_t blockWrapperPushMeta(SBlockWrapper *p, int64_t seq, uint8_t *value, int32_t len) {
21,066,099✔
1393
  int32_t code = 0;
21,066,099✔
1394
  if ((p->kvSize + 12) > p->kvCap) {
21,066,099✔
1395
    if (p->kvCap == 0) {
17,824✔
1396
      p->kvCap = 128;
×
1397
    } else {
1398
      p->kvCap *= 2;
17,824✔
1399
    }
1400

1401
    void *data = taosMemoryRealloc(p->kvBuffer, p->kvCap);
17,824✔
1402
    if (data == NULL) {
17,824✔
1403
      return terrno;
×
1404
    }
1405
    p->kvBuffer = data;
17,824✔
1406
  }
1407
  uint8_t *data = (uint8_t *)p->kvBuffer + p->kvSize;
21,066,099✔
1408
  p->kvSize += taosEncodeVariantI64((void **)&data, seq);
21,066,099✔
1409
  p->kvSize += taosEncodeVariantI32((void **)&data, len);
21,066,099✔
1410
  return code;
21,066,099✔
1411
}
1412

1413
void blockWrapperClearMeta(SBlockWrapper *p) {
10,527,300✔
1414
  if (p->kvBuffer != NULL) {
10,527,300✔
1415
    taosMemoryFree(p->kvBuffer);
1,671✔
1416
  }
1417
  p->kvSize = 0;
10,527,300✔
1418
  p->kvCap = 0;
10,527,300✔
1419
}
10,527,300✔
1420

1421
void blockWrapperCleanup(SBlockWrapper *p) {
42,144,537✔
1422
  if (p->data != NULL) {
42,144,537✔
1423
    taosMemoryFree(p->data);
21,076,736✔
1424
    p->data = NULL;
21,076,736✔
1425
  }
1426
  p->kvSize = 0;
42,144,537✔
1427
  taosMemoryFreeClear(p->kvBuffer);
42,144,537✔
1428
  p->cap = 0;
42,144,537✔
1429
}
42,144,537✔
1430

1431
void blockWrapperTransfer(SBlockWrapper *dst, SBlockWrapper *src) {
1,671✔
1432
  if (dst == NULL || src == NULL) {
1,671✔
1433
    return;
×
1434
  }
1435
  dst->data = src->data;
1,671✔
1436
  dst->cap = src->cap;
1,671✔
1437

1438
  dst->kvBuffer = src->kvBuffer;
1,671✔
1439
  dst->kvSize = src->kvSize;
1,671✔
1440
  dst->kvCap = src->kvCap;
1,671✔
1441

1442
  src->kvBuffer = NULL;
1,671✔
1443
  src->kvSize = 0;
1,671✔
1444
  src->kvCap = 0;
1,671✔
1445

1446
  src->data = NULL;
1,671✔
1447
  src->cap = 0;
1,671✔
1448
}
1449

1450
int32_t blockWrapperResize(SBlockWrapper *p, int32_t newCap) {
21,086,179✔
1451
  if (p->cap < newCap) {
21,086,179✔
1452
    int32_t cap = p->cap;
3,537✔
1453
    if (cap == 0) cap = 1024;
3,537✔
1454
    while (cap < newCap) {
53,055✔
1455
      cap = cap * 2;
49,518✔
1456
    }
1457
    void *data = taosMemoryRealloc(p->data, cap);
3,537✔
1458
    if (data == NULL) {
3,537✔
1459
      return terrno;
×
1460
    }
1461
    p->data = data;
3,537✔
1462
    p->cap = cap;
3,537✔
1463
  }
1464
  return 0;
21,086,179✔
1465
}
1466

1467
void blockWrapperClear(SBlockWrapper *p) {
7,074✔
1468
  if (p->data == NULL) {
7,074✔
1469
    return;
×
1470
  }
1471
  SBlock *block = (SBlock *)p->data;
7,074✔
1472
  p->kvSize = 0;
7,074✔
1473
  p->size = 0;
7,074✔
1474
  blockClear(block);
7,074✔
1475
}
1476

1477
void blockWrapperSetType(SBlockWrapper *p, int8_t type) {
8,745✔
1478
  SBlock *block = (SBlock *)p->data;
8,745✔
1479
  block->type = type;
8,745✔
1480
}
8,745✔
1481

1482
int32_t tableReaderIterInit(int64_t timestamp, int8_t type, STableReaderIter **ppIter, SBse *pBse) {
×
1483
  int32_t    code = 0;
×
1484
  int32_t    lino = 0;
×
1485
  STableMgt *pTableMgt = pBse->pTableMgt;
×
1486

1487
  STableReaderIter *p = taosMemCalloc(1, sizeof(STableReaderIter));
×
1488
  if (p == NULL) {
×
1489
    return terrno;
×
1490
  }
1491

1492
  p->timestamp = timestamp;
×
1493
  SSubTableMgt *retentionMgt = NULL;
×
1494

1495
  code = createSubTableMgt(timestamp, 1, pBse->pTableMgt, &retentionMgt);
×
1496
  TSDB_CHECK_CODE(code, lino, _error);
×
1497

1498
  p->pSubMgt = retentionMgt;
×
1499

1500
  code = tableReaderOpen(timestamp, &p->pTableReader, retentionMgt->pReaderMgt);
×
1501
  TSDB_CHECK_CODE(code, lino, _error);
×
1502

1503
  tableReaderShouldPutToCache(p->pTableReader, 0);
×
1504

1505
  p->blockIndex = 0;
×
1506
  p->blockType = type;
×
1507

1508
  if (p->blockType == BSE_TABLE_DATA_TYPE) {
×
1509
    code = tableReaderGetMeta(p->pTableReader, &p->pMetaHandle);
×
1510
    TSDB_CHECK_CODE(code, lino, _error);
×
1511

1512
  } else if (p->blockType == BSE_TABLE_META_TYPE) {
×
1513
    p->pMetaHandle = taosArrayInit(8, sizeof(SBlkHandle));
×
1514
    if (p->pMetaHandle == NULL) {
×
1515
      TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1516
    }
1517
    code = tableMetaReaderLoadMetaHandle(p->pTableReader->pMetaReader, p->pMetaHandle);
×
1518
  } else {
1519
    p->isOver = 1;
×
1520
  }
1521
  *ppIter = p;
×
1522

1523
_error:
×
1524
  if (code != 0) {
×
1525
    bseError("failed to init table reader iter since %s", tstrerror(code));
×
1526
    tableReaderIterDestroy(p);
×
1527
  }
1528
  return code;
×
1529
}
1530

1531
int32_t tableReaderIterNext(STableReaderIter *pIter, uint8_t **pValue, int32_t *len) {
×
1532
  int32_t      code = 0;
×
1533
  int32_t      lino = 0;
×
1534
  SBseSnapMeta snapMeta = {0};
×
1535
  snapMeta.range.sseq = -1;
×
1536
  snapMeta.range.eseq = -1;
×
1537
  snapMeta.timestamp = pIter->timestamp;
×
1538
  snapMeta.fileType = pIter->fileType;
×
1539
  snapMeta.blockType = pIter->blockType;
×
1540

1541
  if (pIter->blockType == BSE_TABLE_DATA_TYPE) {
×
1542
    SBlkHandle *pHandle = NULL;
×
1543
    if (pIter->blockIndex >= taosArrayGetSize(pIter->pMetaHandle)) {
×
1544
      taosArrayDestroy(pIter->pMetaHandle);
×
1545
      pIter->pMetaHandle = NULL;
×
1546
      pIter->blockIndex = 0;
×
1547
      pIter->isOver = 1;
×
1548
      return 0;
×
1549
    } else {
1550
      pHandle = taosArrayGet(pIter->pMetaHandle, pIter->blockIndex);
×
1551
      bseDebug("file type %d, block type: %d,block index %d, offset %" PRId64 ", size %" PRId64 ", range [%" PRId64
×
1552
               ", %" PRId64 "]",
1553
               pIter->fileType, pIter->blockType, pIter->blockIndex, pHandle->offset, pHandle->size,
1554
               pHandle->range.sseq, pHandle->range.eseq);
1555
      code = tableReaderLoadRawBlock(pIter->pTableReader, pHandle, &pIter->blockWrapper);
×
1556
      TSDB_CHECK_CODE(code, lino, _error);
×
1557

1558
      pIter->blockIndex++;
×
1559
    }
1560

1561
  } else if (pIter->blockType == BSE_TABLE_META_TYPE) {
×
1562
    SBlkHandle *pHandle = NULL;
×
1563
    if (pIter->blockIndex >= taosArrayGetSize(pIter->pMetaHandle)) {
×
1564
      taosArrayDestroy(pIter->pMetaHandle);
×
1565
      pIter->pMetaHandle = NULL;
×
1566
      pIter->blockIndex = 0;
×
1567
      pIter->blockType = BSE_TABLE_META_INDEX_TYPE;
×
1568
    } else {
1569
      pHandle = taosArrayGet(pIter->pMetaHandle, pIter->blockIndex);
×
1570

1571
      bseDebug("file type %d, block type: %d,block index %d, offset %" PRId64 ", size %" PRId64 ", range [%" PRId64
×
1572
               ", %" PRId64 "]",
1573
               pIter->fileType, pIter->blockType, pIter->blockIndex, pHandle->offset, pHandle->size,
1574
               pHandle->range.sseq, pHandle->range.eseq);
1575
      code = tableReaderLoadRawMeta(pIter->pTableReader, pHandle, &pIter->blockWrapper);
×
1576
      TSDB_CHECK_CODE(code, lino, _error);
×
1577
      pIter->blockIndex++;
×
1578
    }
1579
  }
1580

1581
  if (pIter->blockType == BSE_TABLE_META_INDEX_TYPE) {
×
1582
    code = tableReaderLoadRawMetaIndex(pIter->pTableReader, &pIter->blockWrapper);
×
1583
    TSDB_CHECK_CODE(code, lino, _error);
×
1584

1585
    pIter->blockType = BSE_TABLE_FOOTER_TYPE;
×
1586
  } else if (pIter->blockType == BSE_TABLE_FOOTER_TYPE) {
×
1587
    code = tableReaderLoadRawFooter(pIter->pTableReader, &pIter->blockWrapper);
×
1588
    TSDB_CHECK_CODE(code, lino, _error);
×
1589

1590
    pIter->blockType = BSE_TABLE_END_TYPE;
×
1591
  } else if (pIter->blockType == BSE_TABLE_END_TYPE) {
×
1592
    pIter->isOver = 1;
×
1593
  }
1594

1595
_error:
×
1596
  if (code != 0) {
×
1597
    bseError("failed to load block since %s", tstrerror(code));
×
1598
    pIter->isOver = 1;
×
1599
  }
1600
  SSeqRange range = {0};
×
1601
  if (pIter->blockWrapper.data != NULL) {
×
1602
    updateSnapshotMeta(&pIter->blockWrapper, range, pIter->fileType, pIter->blockType, snapMeta.timestamp);
×
1603
    *pValue = pIter->blockWrapper.data;
×
1604
    *len = pIter->blockWrapper.size;
×
1605
  }
1606
  return code;
×
1607
}
1608

1609
int8_t tableReaderIterValid(STableReaderIter *pIter) { return pIter->isOver == 0; }
×
1610

1611
int32_t bseReadCurrentSnap(SBse *pBse, uint8_t **pValue, int32_t *len) {
×
1612
  int32_t   code = 0;
×
1613
  char      path[128] = {0};
×
1614
  int32_t   lino = 0;
×
1615
  TdFilePtr fd = NULL;
×
1616
  int64_t   sz = 0;
×
1617
  char      name[TSDB_FILENAME_LEN] = {0};
×
1618

1619
  uint8_t *pCurrent = NULL;
×
1620

1621
  bseBuildCurrentFullName(pBse, name);
×
1622
  if (taosCheckExistFile(name) == 0) {
×
1623
    bseInfo("vgId:%d, no current meta file found, skip recover", BSE_VGID(pBse));
×
1624
    return 0;
×
1625
  }
1626
  code = taosStatFile(name, &sz, NULL, NULL);
×
1627
  TSDB_CHECK_CODE(code, lino, _error);
×
1628

1629
  fd = taosOpenFile(name, TD_FILE_READ);
×
1630
  if (fd == NULL) {
×
1631
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1632
  }
1633
  pCurrent = (uint8_t *)taosMemoryCalloc(1, sizeof(SBseSnapMeta) + sz);
×
1634
  if (pCurrent == NULL) {
×
1635
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1636
  }
1637

1638
  int64_t nread = taosReadFile(fd, pCurrent + sizeof(SBseSnapMeta), sz);
×
1639
  if (nread != sz) {
×
1640
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1641
  }
1642
  if (taosCloseFile(&fd) != 0) {
×
1643
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1644
  }
1645

1646
  SBseSnapMeta *pMeta = (SBseSnapMeta *)(pCurrent);
×
1647
  pMeta->fileType = BSE_CURRENT_SNAP;
×
1648

1649
  *pValue = pCurrent;
×
1650

1651
  *len = sz + sizeof(SBseSnapMeta);
×
1652
_error:
×
1653
  if (code != 0) {
×
1654
    bseError("vgId:%d, failed to read current at line %d since %s", BSE_VGID(pBse), lino, tstrerror(code));
×
1655
    if (taosCloseFile(&fd) != 0) {
×
1656
      bseError("failed to close file %s since %s", name, tstrerror(terrno));
×
1657
    }
1658
    taosMemoryFree(pCurrent);
×
1659
  }
1660
  return code;
×
1661
}
1662

1663
void tableReaderIterDestroy(STableReaderIter *pIter) {
×
1664
  if (pIter == NULL) return;
×
1665

1666
  taosArrayDestroy(pIter->pMetaHandle);
×
1667
  tableReaderClose(pIter->pTableReader);
×
1668
  blockWrapperCleanup(&pIter->blockWrapper);
×
1669
  destroySubTableMgt(pIter->pSubMgt);
×
1670
  taosMemoryFree(pIter);
×
1671
}
1672

1673
int32_t blockWithMetaInit(SBlock *pBlock, SBlockWithMeta **pMeta) {
×
1674
  int32_t code = 0;
×
1675
  int32_t lino = 0;
×
1676

1677
  SBlockWithMeta *p = taosMemCalloc(1, sizeof(SBlockWithMeta));
×
1678
  if (p == NULL) {
×
1679
    return terrno;
×
1680
  }
1681
  p->pBlock = pBlock;
×
1682
  p->pMeta = taosArrayInit(8, sizeof(SBlockIndexMeta));
×
1683
  if (p->pMeta == NULL) {
×
1684
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1685
  }
1686

1687
  uint8_t *p1 = (uint8_t *)pBlock->data;
×
1688
  uint8_t *p2 = (uint8_t *)p1;
×
1689
  while (p2 - p1 < pBlock->len) {
×
1690
    int64_t         k;
×
1691
    int32_t         vlen = 0;
×
1692
    SBlockIndexMeta meta = {0};
×
1693
    int32_t         offset = 0;
×
1694
    p2 = taosDecodeVariantI64((void **)p2, &k);
×
1695
    offset = p2 - p1;
×
1696
    p2 = taosDecodeVariantI32((void **)p2, &vlen);
×
1697

1698
    meta.seq = k;
×
1699
    meta.offset = offset;
×
1700
    if (taosArrayPush(p->pMeta, &meta) == NULL) {
×
1701
      TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1702
    }
1703
    p2 += vlen;
×
1704
  }
1705

1706
  *pMeta = p;
×
1707
_error:
×
1708
  if (code != 0) {
×
1709
    bseError("failed to init block with meta since %s", tstrerror(code));
×
1710
    blockWithMetaCleanup(p);
×
1711
  }
1712
  return code;
×
1713
}
1714

1715
void blockWithMetaCleanup(SBlockWithMeta *p) {
×
1716
  if (p == NULL) return;
×
1717
  taosArrayDestroy(p->pMeta);
×
1718
  taosMemoryFree(p);
×
1719
  return;
×
1720
}
1721

1722
int comprareFunc(const void *pLeft, const void *pRight) {
×
1723
  SBlockIndexMeta *p1 = (SBlockIndexMeta *)pLeft;
×
1724
  SBlockIndexMeta *p2 = (SBlockIndexMeta *)pRight;
×
1725
  if (p1->seq > p2->seq) {
×
1726
    return 1;
×
1727
  } else if (p1->seq < p2->seq) {
×
1728
    return -1;
×
1729
  }
1730
  return 0;
×
1731
}
1732

1733
int32_t blockWithMetaSeek(SBlockWithMeta *p, int64_t seq, uint8_t **pValue, int32_t *len) {
×
1734
  int32_t         code = 0;
×
1735
  SBlockIndexMeta key = {.seq = seq, .offset = 0};
×
1736
  int32_t         idx = taosArraySearchIdx(p->pMeta, &seq, comprareFunc, TD_EQ);
×
1737
  if (idx < 0) {
×
1738
    return TSDB_CODE_NOT_FOUND;
×
1739
  }
1740
  SBlockIndexMeta *pMeta = taosArrayGet(p->pMeta, idx);
×
1741
  if (pMeta == NULL) {
×
1742
    return TSDB_CODE_NOT_FOUND;
×
1743
  }
1744

1745
  uint8_t *data = (uint8_t *)p->pBlock->data + pMeta->offset;
×
1746

1747
  data = taosDecodeVariantI32((void *)data, len);
×
1748
  if (*len <= 0) {
×
1749
    return TSDB_CODE_NOT_FOUND;
×
1750
  }
1751
  *pValue = taosMemCalloc(1, *len);
×
1752
  if (*pValue == NULL) {
×
1753
    return terrno;
×
1754
  }
1755
  memcpy(*pValue, data, *len);
×
1756

1757
  return code;
×
1758
}
1759

1760
int32_t tableMetaOpen(char *name, SBTableMeta **pMeta, void *pMetaMgt) {
3,721✔
1761
  int32_t code = 0;
3,721✔
1762
  int32_t lino = 0;
3,721✔
1763

1764
  SBTableMeta *p = taosMemCalloc(1, sizeof(SBTableMeta));
3,721✔
1765
  if (p == NULL) {
3,721✔
1766
    TSDB_CHECK_CODE(code, lino, _error);
×
1767
  }
1768

1769
  if (name != NULL) {
3,721✔
1770
    memcpy(p->name, name, strlen(name) + 1);
×
1771
  }
1772
  p->pBse = ((STableMetaMgt *)pMetaMgt)->pBse;
3,721✔
1773

1774
  p->blockCap = BSE_BLOCK_SIZE((SBse *)p->pBse);
3,721✔
1775

1776
  *pMeta = p;
3,721✔
1777
_error:
3,721✔
1778
  if (code != 0) {
3,721✔
1779
    bseError("failed to open table meta %s at line %d since %s", name, lino, tstrerror(code));
×
1780
    tableMetaClose(p);
×
1781
  }
1782

1783
  return code;
3,721✔
1784
}
1785

1786
int32_t tableMetaCommit(SBTableMeta *pMeta, SArray *pBlock) {
3,537✔
1787
  int32_t                code = 0;
3,537✔
1788
  int32_t                lino = 0;
3,537✔
1789
  SBtableMetaWriter     *pWriter = NULL;
3,537✔
1790
  SBtableMetaReader     *pReader = NULL;
3,537✔
1791
  SBtableMetaReaderIter *pIter = NULL;
3,537✔
1792

1793
  char tempMetaName[TSDB_FILENAME_LEN] = {0};
3,537✔
1794
  char metaName[TSDB_FILENAME_LEN] = {0};
3,537✔
1795

1796
  char tempMetaPath[TSDB_FILENAME_LEN] = {0};
3,537✔
1797
  char metaPath[TSDB_FILENAME_LEN] = {0};
3,537✔
1798

1799
  bseBuildTempMetaName(pMeta->timestamp, tempMetaName);
3,537✔
1800
  bseBuildMetaName(pMeta->timestamp, metaName);
3,537✔
1801

1802
  code = tableMetaWriterInit(pMeta, tempMetaName, &pWriter);
3,537✔
1803
  TSDB_CHECK_CODE(code, lino, _error);
3,537✔
1804

1805
  code = tableMetaReaderInit(pMeta, metaName, &pReader);
3,537✔
1806
  TSDB_CHECK_CODE(code, lino, _error);
3,537✔
1807

1808
  code = tableMetaReaderOpenIter(pReader, &pIter);
3,537✔
1809
  TSDB_CHECK_CODE(code, lino, _error);
3,537✔
1810

1811
  while (!pIter->isOver) {
5,208✔
1812
    SBlkHandle    blkHandle = {0};
2,785✔
1813
    SBlockWrapper wrapper;
2,785✔
1814

1815
    code = tableMetaReaderIterNext(pIter, &wrapper, &blkHandle);
2,785✔
1816
    TSDB_CHECK_CODE(code, lino, _error);
2,785✔
1817

1818
    if (pIter->isOver) {
2,785✔
1819
      break;
1,114✔
1820
    }
1821

1822
    blockWrapperSetType(&wrapper, BSE_TABLE_META_TYPE);
1,671✔
1823

1824
    code = tableMetaWriteAppendRawBlock(pWriter, &wrapper, &blkHandle);
1,671✔
1825
    TSDB_CHECK_CODE(code, lino, _error);
1,671✔
1826

1827
    seqRangeUpdate(&pMeta->range, &blkHandle.range);
1,671✔
1828
  }
1829

1830
  code = tableMetaWriterAppendBlock(pWriter, pBlock);
3,537✔
1831
  TSDB_CHECK_CODE(code, lino, _error);
3,537✔
1832

1833
  code = tableMetaWriterCommit(pWriter);
3,537✔
1834
  TSDB_CHECK_CODE(code, lino, _error);
3,537✔
1835

1836
  tableMetaWriterClose(pWriter);
3,537✔
1837
  tableMetaReaderClose(pReader);
3,537✔
1838

1839
  pWriter = NULL;
3,537✔
1840
  pReader = NULL;
3,537✔
1841

1842
  bseBuildFullName(pMeta->pBse, tempMetaName, tempMetaPath);
3,537✔
1843
  bseBuildFullName(pMeta->pBse, metaName, metaPath);
3,537✔
1844

1845
  code = taosRenameFile(tempMetaPath, metaPath);
3,537✔
1846
  TSDB_CHECK_CODE(code, lino, _error);
3,537✔
1847

1848
_error:
3,537✔
1849
  if (code != 0) {
3,537✔
1850
    bseError("failed to commit table meta %s at line %d since %s", pMeta->name, lino, tstrerror(code));
×
1851
  }
1852
  tableMetaReaderIterClose(pIter);
3,537✔
1853
  tableMetaWriterClose(pWriter);
3,537✔
1854
  tableMetaReaderClose(pReader);
3,537✔
1855

1856
  return code;
3,537✔
1857
}
1858
int32_t tableMetaWriterAppendBlock(SBtableMetaWriter *pMeta, SArray *pBlock) {
3,537✔
1859
  int32_t code = 0;
3,537✔
1860
  if (taosArrayAddAll(pMeta->pBlock, pBlock) == NULL) {
3,537✔
1861
    return terrno;
×
1862
  }
1863
  return code;
3,537✔
1864
}
1865

1866
int32_t tableMetaWriterFlushBlock(SBtableMetaWriter *pMeta) {
3,537✔
1867
  int32_t   code = 0;
3,537✔
1868
  int32_t   lino = 0;
3,537✔
1869
  SSeqRange range = {.sseq = -1, .eseq = -1};
3,537✔
1870

1871
  int64_t offset = 0;
3,537✔
1872
  int32_t nWrite = 0;
3,537✔
1873
  int32_t size = pMeta->blockCap;
3,537✔
1874

1875
  blockWrapperClear(&pMeta->blockWrapper);
3,537✔
1876
  code = blockWrapperResize(&pMeta->blockWrapper, size);
3,537✔
1877
  TSDB_CHECK_CODE(code, lino, _error);
3,537✔
1878

1879
  for (int32_t i = 0; i < taosArrayGetSize(pMeta->pBlock); i++) {
7,074✔
1880
    SMetaBlock *pBlk = taosArrayGet(pMeta->pBlock, i);
3,537✔
1881
    if (blockEsimateSize(pMeta->blockWrapper.data, sizeof(SMetaBlock)) >= pMeta->blockCap) {
3,537✔
1882
      SBlkHandle handle = {.offset = pMeta->offset, .size = offset, .range = range};
×
1883

1884
      blockWrapperSetType(&pMeta->blockWrapper, BSE_TABLE_META_TYPE);
×
1885

1886
      code = tableFlushBlock(pMeta->pFile, &handle, &pMeta->blockWrapper, &nWrite);
×
1887
      TSDB_CHECK_CODE(code, lino, _error);
×
1888

1889
      pMeta->offset += nWrite;
×
1890
      handle.size = nWrite;
×
1891

1892
      blockWrapperClear(&pMeta->blockWrapper);
×
1893
      code = blockWrapperResize(&pMeta->blockWrapper, size);
×
1894
      TSDB_CHECK_CODE(code, lino, _error);
×
1895

1896
      if (taosArrayPush(pMeta->pBlkHandle, &handle) == NULL) {
×
1897
        TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1898
      }
1899
      range.sseq = -1;
×
1900
      offset = 0;
×
1901
    }
1902

1903
    offset += metaBlockAdd(pMeta->blockWrapper.data, pBlk);
3,537✔
1904

1905
    if (range.sseq == -1) {
3,537✔
1906
      range.sseq = pBlk->range.sseq;
3,537✔
1907
    }
1908
    range.eseq = pBlk->range.eseq;
3,537✔
1909
  }
1910
  if (offset == 0) {
3,537✔
1911
    return 0;
×
1912
  }
1913

1914
  blockWrapperSetType(&pMeta->blockWrapper, BSE_TABLE_META_TYPE);
3,537✔
1915

1916
  SBlkHandle handle = {.offset = pMeta->offset, .size = offset, .range = range};
3,537✔
1917
  code = tableFlushBlock(pMeta->pFile, &handle, &pMeta->blockWrapper, &nWrite);
3,537✔
1918
  TSDB_CHECK_CODE(code, lino, _error);
3,537✔
1919

1920
  pMeta->offset += nWrite;
3,537✔
1921
  handle.size = nWrite;
3,537✔
1922

1923
  if (taosArrayPush(pMeta->pBlkHandle, &handle) == NULL) {
7,074✔
1924
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1925
  }
1926
_error:
3,537✔
1927
  if (code != 0) {
3,537✔
1928
    bseError("failed to flush table meta %s at line %d since %s", pMeta->name, 0, tstrerror(code));
×
1929
    tableMetaWriterClose(pMeta);
×
1930
  }
1931
  return code;
3,537✔
1932
}
1933

1934
int32_t tableMetaWriterFlushIndex(SBtableMetaWriter *pMeta) {
3,537✔
1935
  int32_t code = 0;
3,537✔
1936
  int32_t lino = 0;
3,537✔
1937

1938
  int32_t nWrite = 0;
3,537✔
1939
  int64_t lastOffset = pMeta->offset;
3,537✔
1940
  int32_t blkHandleSize = 0;
3,537✔
1941

1942
  int32_t extra = 8;
3,537✔
1943
  int32_t size = taosArrayGetSize(pMeta->pBlkHandle) * sizeof(SBlkHandle);
3,537✔
1944

1945
  SSeqRange range = {-1, -1};
3,537✔
1946

1947
  blockWrapperClear(&pMeta->blockWrapper);
3,537✔
1948
  code = blockWrapperResize(&pMeta->blockWrapper, size + extra);
3,537✔
1949
  TSDB_CHECK_CODE(code, lino, _error);
3,537✔
1950

1951
  for (int32_t i = 0; i < taosArrayGetSize(pMeta->pBlkHandle); i++) {
8,745✔
1952
    SBlkHandle *pHandle = taosArrayGet(pMeta->pBlkHandle, i);
5,208✔
1953
    if (pHandle == NULL) {
5,208✔
1954
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _error);
×
1955
    }
1956
    blkHandleSize += metaBlockAddIndex(pMeta->blockWrapper.data, pHandle);
5,208✔
1957

1958
    seqRangeUpdate(&range, &pHandle->range);
5,208✔
1959
  }
1960

1961
  blockWrapperSetType(&pMeta->blockWrapper, BSE_TABLE_META_INDEX_TYPE);
3,537✔
1962

1963
  SBlkHandle handle = {.offset = lastOffset, .size = blkHandleSize, .range = range};
3,537✔
1964
  code = tableFlushBlock(pMeta->pFile, &handle, &pMeta->blockWrapper, &nWrite);
3,537✔
1965
  TSDB_CHECK_CODE(code, lino, _error);
3,537✔
1966

1967
  SBlkHandle metaHandle = {.offset = pMeta->offset, .size = nWrite, .range = range};
3,537✔
1968
  SBlkHandle indexHandle = {.offset = pMeta->offset + nWrite, .size = 0, .range = range};
3,537✔
1969
  pMeta->offset += nWrite;
3,537✔
1970

1971
  memcpy(pMeta->footer.metaHandle, &metaHandle, sizeof(SBlkHandle));
3,537✔
1972
  memcpy(pMeta->footer.indexHandle, &metaHandle, sizeof(SBlkHandle));
3,537✔
1973
_error:
3,537✔
1974
  if (code != 0) {
3,537✔
1975
    bseError("failed to build table meta index at line %d since %s", lino, tstrerror(code));
×
1976
  }
1977
  return code;
3,537✔
1978
}
1979

1980
int32_t tableMetaWriterFlushFooter(SBtableMetaWriter *p) {
3,537✔
1981
  char buf[kEncodeLen] = {0};
3,537✔
1982

1983
  int32_t code = 0;
3,537✔
1984
  int32_t lino = 0;
3,537✔
1985

1986
  code = footerEncode(&p->footer, buf);
3,537✔
1987
  TSDB_CHECK_CODE(code, lino, _error);
3,537✔
1988

1989
  p->offset += sizeof(buf);
3,537✔
1990

1991
  int32_t nwrite = taosWriteFile(p->pFile, buf, sizeof(buf));
3,537✔
1992
  if (nwrite != sizeof(buf)) {
3,537✔
1993
    code = terrno;
×
1994
    TSDB_CHECK_CODE(code, lino, _error);
×
1995
  }
1996

1997
_error:
3,537✔
1998
  if (code != 0) {
3,537✔
1999
    bseError("failed to add footer to table builder at line %d since %s", lino, tstrerror(code));
×
2000
  }
2001
  return code;
3,537✔
2002
}
2003
int32_t tableMetaWriterCommit(SBtableMetaWriter *pMeta) {
3,537✔
2004
  int32_t code = 0;
3,537✔
2005
  int32_t lino = 0;
3,537✔
2006

2007
  code = tableMetaWriterFlushBlock(pMeta);
3,537✔
2008
  TSDB_CHECK_CODE(code, lino, _error);
3,537✔
2009

2010
  code = tableMetaWriterFlushIndex(pMeta);
3,537✔
2011
  TSDB_CHECK_CODE(code, lino, _error);
3,537✔
2012

2013
  code = tableMetaWriterFlushFooter(pMeta);
3,537✔
2014
  TSDB_CHECK_CODE(code, lino, _error);
3,537✔
2015
_error:
3,537✔
2016
  if (code != 0) {
3,537✔
2017
    bseError("failed to commit table meta %s at line %d since %s", pMeta->name, lino, tstrerror(code));
×
2018
    tableMetaWriterClose(pMeta);
×
2019
  }
2020
  return code;
3,537✔
2021
}
2022
int32_t tableMetaWriteAppendRawBlock(SBtableMetaWriter *pMeta, SBlockWrapper *pBlock, SBlkHandle *pBlkHandle) {
1,671✔
2023
  int32_t code = 0;
1,671✔
2024
  int32_t lino = 0;
1,671✔
2025

2026
  int32_t nwrite = 0;
1,671✔
2027
  code = tableFlushBlock(pMeta->pFile, pBlkHandle, pBlock, &nwrite);
1,671✔
2028
  TSDB_CHECK_CODE(code, lino, _error);
1,671✔
2029

2030
  SBlkHandle handle = {.offset = pMeta->offset, .size = nwrite, .range = pBlkHandle->range};
1,671✔
2031
  if (taosArrayPush(pMeta->pBlkHandle, &handle) == NULL) {
3,342✔
2032
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
2033
  }
2034
  pMeta->offset += nwrite;
1,671✔
2035
_error:
1,671✔
2036
  if (code != 0) {
1,671✔
2037
    bseError("failed to append block to table meta %s at line %d since %s", pMeta->name, lino, tstrerror(code));
×
2038
    tableMetaWriterClose(pMeta);
×
2039
  }
2040
  return code;
1,671✔
2041
}
2042

2043
int32_t tableMetaReaderLoadFooter(SBtableMetaReader *pMeta) {
10,530,837✔
2044
  int32_t code = 0;
10,530,837✔
2045
  int32_t lino = 0;
10,530,837✔
2046
  char    footer[kEncodeLen] = {0};
10,530,837✔
2047

2048
  if (pMeta->pFile == NULL) {
10,530,837✔
2049
    return 0;
2,423✔
2050
  }
2051
  int64_t n = taosLSeekFile(pMeta->pFile, -kEncodeLen, SEEK_END);
10,528,414✔
2052
  if (n < 0) {
10,528,414✔
2053
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
2054
  }
2055

2056
  if (taosReadFile(pMeta->pFile, footer, kEncodeLen) != kEncodeLen) {
10,528,414✔
2057
    code = terrno;
×
2058
    TSDB_CHECK_CODE(code, lino, _error);
×
2059
  }
2060

2061
  code = footerDecode(&pMeta->footer, footer);
10,528,414✔
2062
  TSDB_CHECK_CODE(code, lino, _error);
10,528,414✔
2063
_error:
10,528,414✔
2064
  if (code != 0) {
10,528,414✔
2065
    bseError("failed to load table meta footer %s at line %d since %s", pMeta->name, lino, tstrerror(code));
×
2066
  }
2067
  return code;
10,528,414✔
2068
}
2069

2070
int32_t tableOpenFile(char *name, int8_t read, TdFilePtr *pFile, int64_t *size) {
21,064,838✔
2071
  int32_t lino = 0;
21,064,838✔
2072
  int32_t code = 0;
21,064,838✔
2073
  int32_t opt = 0;
21,064,838✔
2074

2075
  TdFilePtr p = NULL;
21,064,838✔
2076
  if (read) {
21,064,838✔
2077
    opt = TD_FILE_READ;
21,058,137✔
2078
  } else {
2079
    opt = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_APPEND;
6,701✔
2080
  }
2081

2082
  if (!taosCheckExistFile(name)) {
21,064,838✔
2083
    if (read) {
9,124✔
2084
      return 0;
2,423✔
2085
    }
2086

2087
    p = taosOpenFile(name, opt);
6,701✔
2088
    if (p == NULL) {
6,701✔
2089
      TSDB_CHECK_CODE(code = terrno, lino, _error);
×
2090
    }
2091

2092
    *pFile = p;
6,701✔
2093
    return code;
6,701✔
2094
  }
2095

2096
  code = taosStatFile(name, size, NULL, NULL);
21,055,714✔
2097
  TSDB_CHECK_CODE(code, lino, _error);
21,055,714✔
2098
  if (*size <= 0) {
21,055,714✔
2099
    TSDB_CHECK_CODE(code = TSDB_CODE_NOT_FOUND, lino, _error);
×
2100
  }
2101

2102
  p = taosOpenFile(name, opt);
21,055,714✔
2103
  if (p == NULL) {
21,055,714✔
2104
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
2105
  }
2106
  *pFile = p;
21,055,714✔
2107

2108
_error:
21,055,714✔
2109
  if (code != 0) {
21,055,714✔
2110
    bseError("failed to open table meta %s at line %d since %s", name, lino, tstrerror(code));
×
2111
  }
2112
  return code;
21,055,714✔
2113
}
2114
int32_t tableMetaOpenFile(SBtableMetaWriter *pMeta, int8_t read, char *name) {
10,534,374✔
2115
  int32_t code = 0;
10,534,374✔
2116
  int64_t size = 0;
10,534,374✔
2117
  int32_t lino = 0;
10,534,374✔
2118

2119
  code = tableOpenFile(name, read, &pMeta->pFile, &size);
10,534,374✔
2120
  TSDB_CHECK_CODE(code, lino, _error);
10,534,374✔
2121

2122
_error:
10,534,374✔
2123
  if (code != 0) {
10,534,374✔
2124
    bseError("failed to open table meta %s at line %d since %s", pMeta->name, lino, tstrerror(code));
×
2125
  }
2126

2127
  return code;
10,534,374✔
2128
}
2129

2130
int32_t tableMetaReaderLoad(SBtableMetaReader *pMeta) {
10,530,837✔
2131
  int32_t code = 0;
10,530,837✔
2132
  int32_t lino = 0;
10,530,837✔
2133

2134
  code = tableMetaOpenFile(pMeta, 1, pMeta->name);
10,530,837✔
2135
  TSDB_CHECK_CODE(code, lino, _error);
10,530,837✔
2136

2137
  code = tableMetaReaderLoadFooter(pMeta);
10,530,837✔
2138
  TSDB_CHECK_CODE(code, lino, _error);
10,530,837✔
2139

2140
  code = tableMetaReaderLoadIndex(pMeta);
10,530,837✔
2141
  TSDB_CHECK_CODE(code, lino, _error);
10,530,837✔
2142

2143
_error:
10,530,837✔
2144
  if (code != 0) {
10,530,837✔
2145
    bseError("failed to load table meta %s at line %d since %s", pMeta->name, lino, tstrerror(code));
×
2146
  }
2147
  return code;
10,530,837✔
2148
}
2149

2150
void tableMetaClose(SBTableMeta *p) {
3,721✔
2151
  if (p == NULL) return;
3,721✔
2152
  taosMemoryFree(p);
3,721✔
2153
}
2154

2155
int32_t tableMetaWriterInit(SBTableMeta *pMeta, char *name, SBtableMetaWriter **ppWriter) {
3,537✔
2156
  int32_t code = 0;
3,537✔
2157
  int32_t lino = 0;
3,537✔
2158

2159
  char path[TSDB_FILENAME_LEN] = {0};
3,537✔
2160
  bseBuildFullName(pMeta->pBse, name, path);
3,537✔
2161

2162
  SBtableMetaWriter *p = taosMemCalloc(1, sizeof(SBtableMetaWriter));
3,537✔
2163
  if (p == NULL) {
3,537✔
2164
    return terrno;
×
2165
  }
2166
  p->pTableMeta = pMeta;
3,537✔
2167

2168
  p->blockCap = pMeta->blockCap;
3,537✔
2169

2170
  p->pBlkHandle = taosArrayInit(128, sizeof(SBlkHandle));
3,537✔
2171
  if (p->pBlkHandle == NULL) {
3,537✔
2172
    TSDB_CHECK_CODE(code, lino, _error);
×
2173
  }
2174

2175
  p->pBlock = taosArrayInit(128, sizeof(SMetaBlock));
3,537✔
2176
  if (p->pBlock == NULL) {
3,537✔
2177
    TSDB_CHECK_CODE(code, lino, _error);
×
2178
  }
2179

2180
  code = blockWrapperInit(&p->blockWrapper, 1024);
3,537✔
2181
  TSDB_CHECK_CODE(code, lino, _error);
3,537✔
2182

2183
  // Set pBse pointer for encryption/decryption
2184
  p->blockWrapper.pBse = pMeta->pBse;
3,537✔
2185

2186
  code = tableMetaOpenFile(p, 0, path);
3,537✔
2187
  TSDB_CHECK_CODE(code, lino, _error);
3,537✔
2188

2189
  *ppWriter = p;
3,537✔
2190
_error:
3,537✔
2191
  if (code != 0) {
3,537✔
2192
    bseError("failed to init table meta writer %s at line %d since %s", pMeta->name, lino, tstrerror(code));
×
2193
    tableMetaWriterClose(p);
×
2194
  }
2195
  return code;
3,537✔
2196
}
2197

2198
void tableMetaWriterClose(SBtableMetaWriter *p) {
7,074✔
2199
  if (p == NULL) return;
7,074✔
2200
  if (taosCloseFile(&p->pFile) != 0) {
3,537✔
2201
    bseError("failed to close table meta writer file since %s", tstrerror(terrno));
×
2202
  }
2203
  taosArrayDestroy(p->pBlkHandle);
3,537✔
2204
  taosArrayDestroy(p->pBlock);
3,537✔
2205
  blockWrapperCleanup(&p->blockWrapper);
3,537✔
2206
  taosMemoryFree(p);
3,537✔
2207
}
2208

2209
int32_t tableMetaReaderInit(SBTableMeta *pMeta, char *name, SBtableMetaReader **ppReader) {
10,530,837✔
2210
  int32_t code = 0;
10,530,837✔
2211
  int32_t lino = 0;
10,530,837✔
2212
  char    path[TSDB_FILENAME_LEN] = {0};
10,530,837✔
2213
  bseBuildFullName(pMeta->pBse, name, path);
10,530,837✔
2214

2215
  SBtableMetaReader *p = taosMemCalloc(1, sizeof(SBtableMetaReader));
10,530,837✔
2216
  if (p == NULL) {
10,530,837✔
2217
    return terrno;
×
2218
  }
2219
  memcpy(p->name, path, sizeof(path));
10,530,837✔
2220
  p->pTableMeta = pMeta;
10,530,837✔
2221

2222
  p->pBlkHandle = taosArrayInit(128, sizeof(SBlkHandle));
10,530,837✔
2223
  if (p->pBlkHandle == NULL) {
10,530,837✔
2224
    TSDB_CHECK_CODE(code, lino, _error);
×
2225
  }
2226

2227
  code = blockWrapperInit(&p->blockWrapper, 1024);
10,530,837✔
2228
  TSDB_CHECK_CODE(code, lino, _error);
10,530,837✔
2229

2230
  // Set pBse pointer for encryption/decryption
2231
  p->blockWrapper.pBse = pMeta->pBse;
10,530,837✔
2232

2233
  code = tableMetaReaderLoad(p);
10,530,837✔
2234
  TSDB_CHECK_CODE(code, lino, _error);
10,530,837✔
2235

2236
  *ppReader = p;
10,530,837✔
2237
_error:
10,530,837✔
2238
  if (code != 0) {
10,530,837✔
2239
    bseError("failed to init table meta reader %s at line %d since %s", pMeta->name, lino, tstrerror(code));
×
2240
    tableMetaReaderClose(p);
×
2241
  }
2242
  return code;
10,530,837✔
2243
}
2244

2245
void tableMetaReaderClose(SBtableMetaReader *p) {
10,534,374✔
2246
  if (p == NULL) return;
10,534,374✔
2247
  if (taosCloseFile(&p->pFile) != 0) {
10,530,837✔
2248
    bseError("failed to close table meta reader file since %s", tstrerror(terrno));
×
2249
  }
2250
  taosArrayDestroy(p->pBlkHandle);
10,530,837✔
2251
  blockWrapperCleanup(&p->blockWrapper);
10,530,837✔
2252
  taosMemoryFree(p);
10,530,837✔
2253
}
2254
int32_t tableMetaReaderLoadBlockMeta(SBtableMetaReader *p, int64_t seq, SMetaBlock *pMetaBlock) {
10,527,300✔
2255
  int32_t            code = 0;
10,527,300✔
2256
  int32_t            lino = 0;
10,527,300✔
2257
  SBtableMetaReader *pMeta = p;
10,527,300✔
2258
  SSeqRange          range = {.sseq = seq, .eseq = seq};
10,527,300✔
2259

2260
  SBlkHandle  handle = {.range = range};
10,527,300✔
2261
  int32_t     index = taosArraySearchIdx(p->pBlkHandle, &handle, compareFunc, TD_LE);
10,527,300✔
2262
  SBlkHandle *pHandle = taosArrayGet(p->pBlkHandle, index);
10,527,300✔
2263
  if (pHandle == NULL) {
10,527,300✔
2264
    return TSDB_CODE_NOT_FOUND;
×
2265
  }
2266

2267
  code = tableLoadBlock(p->pFile, pHandle, &p->blockWrapper);
10,527,300✔
2268
  TSDB_CHECK_CODE(code, lino, _error);
10,527,300✔
2269

2270
  code = blockSeekMeta(p->blockWrapper.data, seq, pMetaBlock);
10,527,300✔
2271
  TSDB_CHECK_CODE(code, lino, _error);
10,527,300✔
2272

2273
_error:
10,527,300✔
2274
  return code;
10,527,300✔
2275
}
2276
int32_t tableMetaReaderLoadAllDataHandle(SBtableMetaReader *p, SArray *dataHandle) {
×
2277
  int32_t lino = 0;
×
2278
  int32_t code = 0;
×
2279

2280
  SArray *pMeta = taosArrayInit(8, sizeof(SMetaBlock));
×
2281
  if (pMeta == NULL) {
×
2282
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
2283
  }
2284

2285
  for (int32_t i = 0; i < taosArrayGetSize(p->pBlkHandle); i++) {
×
2286
    SBlkHandle *pHandle = taosArrayGet(p->pBlkHandle, i);
×
2287
    if (pHandle == NULL) {
×
2288
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
2289
    }
2290

2291
    code = tableLoadBlock(p->pFile, pHandle, &p->blockWrapper);
×
2292
    TSDB_CHECK_CODE(code, lino, _exit);
×
2293

2294
    code = blockGetAllMeta(p->blockWrapper.data, pMeta);
×
2295
    TSDB_CHECK_CODE(code, lino, _exit);
×
2296

2297
    if (taosArrayGetSize(pMeta) == 0) {
×
2298
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
2299
    }
2300

2301
    for (int32_t j = 0; j < taosArrayGetSize(pMeta); j++) {
×
2302
      SMetaBlock *pBlk = taosArrayGet(pMeta, j);
×
2303
      SBlkHandle  handle = {.offset = pBlk->offset, .size = pBlk->size, .range = pBlk->range};
×
2304
      if (taosArrayPush(dataHandle, &handle) == NULL) {
×
2305
        TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
2306
      }
2307
    }
2308
  }
2309
_exit:
×
2310
  taosArrayDestroy(pMeta);
×
2311
  return code;
×
2312
}
2313

2314
int32_t tableMetaReaderLoadMetaHandle(SBtableMetaReader *p, SArray *pMetaHandle) {
×
2315
  int32_t code = 0;
×
2316
  int32_t lino = 0;
×
2317

2318
  if (taosArrayGetSize(p->pBlkHandle) == 0) {
×
2319
    return TSDB_CODE_NOT_FOUND;
×
2320
  }
2321

2322
  for (int32_t i = 0; i < taosArrayGetSize(p->pBlkHandle); i++) {
×
2323
    SBlkHandle *pHandle = taosArrayGet(p->pBlkHandle, i);
×
2324
    if (pHandle == NULL) {
×
2325
      return TSDB_CODE_FILE_CORRUPTED;
×
2326
    }
2327
    if (taosArrayPush(pMetaHandle, pHandle) == NULL) {
×
2328
      TSDB_CHECK_CODE(code = terrno, lino, _error);
×
2329
    }
2330
  }
2331
_error:
×
2332
  return code;
×
2333
}
2334

2335
int32_t tableMetaReaderLoadIndex(SBtableMetaReader *p) {
10,530,837✔
2336
  int32_t code = 0;
10,530,837✔
2337
  int32_t lino = 0;
10,530,837✔
2338
  int32_t offset = 0;
10,530,837✔
2339
  SBtableMetaReader *pMeta = p;
10,530,837✔
2340

2341
  if (pMeta->pFile == NULL) {
10,530,837✔
2342
    return 0;
2,423✔
2343
  }
2344

2345
  pMeta->blockWrapper.type = BSE_TABLE_META_TYPE;
10,528,414✔
2346

2347
  code = tableLoadBlock(pMeta->pFile, pMeta->footer.metaHandle, &pMeta->blockWrapper);
10,528,414✔
2348
  TSDB_CHECK_CODE(code, lino, _error);
10,528,414✔
2349

2350
  if (blockGetType(p->blockWrapper.data) != BSE_TABLE_META_INDEX_TYPE) {
10,528,414✔
2351
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _error);
×
2352
  }
2353

2354
  SBlock  *pBlk = (SBlock *)pMeta->blockWrapper.data;
10,528,414✔
2355
  uint8_t *data = (uint8_t *)pBlk->data;
10,528,414✔
2356

2357
  do {
2358
    SBlkHandle handle = {0};
25,567,971✔
2359
    offset += blkHandleDecode(&handle, (char *)data + offset);
25,567,971✔
2360
    if (taosArrayPush(pMeta->pBlkHandle, &handle) == NULL) {
51,135,942✔
2361
      TSDB_CHECK_CODE(terrno, lino, _error);
×
2362
    }
2363
  } while (offset < pBlk->len);
25,567,971✔
2364

2365
_error:
10,528,414✔
2366
  if (code != 0) {
10,528,414✔
2367
    bseError("failed to load table meta blk handle %s at line %d since %s", pMeta->name, lino, tstrerror(code));
×
2368
  }
2369
  return code;
10,528,414✔
2370
}
2371

2372
int32_t tableMetaReaderOpenIter(SBtableMetaReader *pReader, SBtableMetaReaderIter **pIter) {
3,537✔
2373
  int32_t code = 0;
3,537✔
2374
  int32_t lino = 0;
3,537✔
2375

2376
  SBtableMetaReaderIter *p = taosMemCalloc(1, sizeof(SBtableMetaReaderIter));
3,537✔
2377
  if (p == NULL) {
3,537✔
2378
    return terrno;
×
2379
  }
2380
  p->pReader = pReader;
3,537✔
2381

2382
  code = blockWrapperInit(&p->pBlockWrapper, 1024);
3,537✔
2383
  if (code != 0) {
3,537✔
2384
    return code;
×
2385
  }
2386

2387
  // Set pBse pointer for encryption/decryption
2388
  p->pBlockWrapper.pBse = ((SBTableMeta *)pReader->pTableMeta)->pBse;
3,537✔
2389

2390
  *pIter = p;
3,537✔
2391
  if (taosArrayGetSize(pReader->pBlkHandle) == 0) {
3,537✔
2392
    p->isOver = 1;
2,423✔
2393
    return 0;
2,423✔
2394
  }
2395

2396
  return 0;
1,114✔
2397
}
2398

2399
int32_t tableMetaReaderIterNext(SBtableMetaReaderIter *pIter, SBlockWrapper *pDataWrapper, SBlkHandle *dstHandle) {
2,785✔
2400
  int32_t code = 0;
2,785✔
2401
  int32_t lino = 0;
2,785✔
2402

2403
  if (pIter->blkIdx >= taosArrayGetSize(pIter->pReader->pBlkHandle)) {
2,785✔
2404
    pIter->isOver = 1;
1,114✔
2405
    return 0;
1,114✔
2406
  }
2407

2408
  SBlkHandle *pHandle = taosArrayGet(pIter->pReader->pBlkHandle, pIter->blkIdx);
1,671✔
2409
  if (pHandle == NULL) {
1,671✔
2410
    return TSDB_CODE_FILE_CORRUPTED;
×
2411
  }
2412

2413
  SBlockWrapper *pWrapper = &pIter->pBlockWrapper;
1,671✔
2414
  code = blockWrapperResize(pWrapper, pHandle->size);
1,671✔
2415
  TSDB_CHECK_CODE(code, lino, _error);
1,671✔
2416

2417
  code = tableLoadBlock(pIter->pReader->pFile, pHandle, pWrapper);
1,671✔
2418
  TSDB_CHECK_CODE(code, lino, _error);
1,671✔
2419

2420
  pIter->blkIdx++;
1,671✔
2421

2422
  if (blockGetType(pWrapper->data) != BSE_TABLE_META_TYPE) {
1,671✔
2423
    pIter->isOver = 1;
×
2424
    return 0;
×
2425
  }
2426

2427
  *pDataWrapper = *pWrapper;
1,671✔
2428
  *dstHandle = *pHandle;
1,671✔
2429

2430
_error:
1,671✔
2431
  if (code != 0) {
1,671✔
2432
    bseError("failed to load table meta blk handle %s at line %d since %s", pIter->pReader->name, lino,
×
2433
             tstrerror(code));
2434
    pIter->pReader = NULL;
×
2435
  }
2436
  return code;
1,671✔
2437
}
2438

2439
void tableMetaReaderIterClose(SBtableMetaReaderIter *p) {
3,537✔
2440
  if (p == NULL) return;
3,537✔
2441
  blockWrapperCleanup(&p->pBlockWrapper);
3,537✔
2442
  taosMemoryFree(p);
3,537✔
2443
}
2444

2445
int32_t bseMemTableCreate(STableMemTable **pMemTable, int32_t cap) {
6,317✔
2446
  int32_t code = 0;
6,317✔
2447
  int32_t lino = 0;
6,317✔
2448

2449
  STableMemTable *p = taosMemoryCalloc(1, sizeof(STableMemTable));
6,317✔
2450
  if (p == NULL) {
6,317✔
2451
    return terrno;
×
2452
  }
2453

2454
  p->pMetaHandle = taosArrayInit(8, sizeof(SBlkHandle));
6,317✔
2455
  if (p->pMetaHandle == NULL) {
6,317✔
2456
    TAOS_CHECK_GOTO(terrno, &lino, _error);
×
2457
  }
2458

2459
  code = blockWrapperInit(&p->pBlockWrapper, cap);
6,317✔
2460
  TAOS_CHECK_GOTO(code, &lino, _error);
6,317✔
2461

2462
  taosInitRWLatch(&p->latch);
6,317✔
2463

2464
  seqRangeReset(&p->range);
6,317✔
2465
  seqRangeReset(&p->tableRange);
6,317✔
2466
  p->ref = 1;
6,317✔
2467
  bseTrace("create mem table %p", p);
6,317✔
2468

2469
_error:
6,317✔
2470
  if (code != 0) {
6,317✔
2471
    bseMemTableDestroy(p);
×
2472
  }
2473
  *pMemTable = p;
6,317✔
2474

2475
  return code;
6,317✔
2476
}
2477

2478
int32_t bseMemTableRef(STableMemTable *pMemTable) {
21,098,220✔
2479
  int32_t code = 0;
21,098,220✔
2480
  if (pMemTable == NULL) {
21,098,220✔
2481
    return TSDB_CODE_INVALID_CFG;
×
2482
  }
2483

2484
  SBse *pBse = (SBse *)pMemTable->pBse;
21,098,220✔
2485
  bseTrace("ref mem table %p", pMemTable);
21,098,220✔
2486

2487
  int32_t nRef = atomic_fetch_add_32(&pMemTable->ref, 1);
21,098,220✔
2488
  if (nRef <= 0) {
21,098,220✔
2489
    bseError("vgId:%d, memtable ref count is invalid, ref:%d", BSE_VGID(pBse), nRef);
×
2490
    return TSDB_CODE_INVALID_CFG;
×
2491
  }
2492
  return code;
21,098,220✔
2493
}
2494

2495
void bseMemTableUnRef(STableMemTable *pMemTable) {
21,108,085✔
2496
  int32_t code = 0;
21,108,085✔
2497

2498
  bseTrace("unref mem table %p", pMemTable);
21,108,085✔
2499
  if (pMemTable == NULL) {
21,108,085✔
2500
    return;
3,548✔
2501
  }
2502
  if (atomic_sub_fetch_32(&pMemTable->ref, 1) == 0) {
21,104,537✔
2503
    bseMemTableDestroy(pMemTable);
6,317✔
2504
    bseTrace("destroy mem table %p", pMemTable);
6,317✔
2505
  }
2506
}
2507
void bseMemTableDestroy(STableMemTable *pMemTable) {
6,317✔
2508
  if (pMemTable == NULL) return;
6,317✔
2509
  taosArrayDestroy(pMemTable->pMetaHandle);
6,317✔
2510
  blockWrapperCleanup(&pMemTable->pBlockWrapper);
6,317✔
2511
  taosMemoryFree(pMemTable);
6,317✔
2512
}
2513
int32_t bseMemTablePush(STableMemTable *pMemTable, void *pHandle) {
3,537✔
2514
  int32_t code = 0;
3,537✔
2515
  if (pMemTable == NULL || pHandle == NULL) {
3,537✔
2516
    code = TSDB_CODE_INVALID_PARA;
×
2517
    return code;
×
2518
  }
2519

2520
  if (taosArrayPush(pMemTable->pMetaHandle, pHandle) == NULL) {
7,074✔
2521
    code = terrno;
×
2522
    bseError("Failed to push handle to memtable since %s", tstrerror(code));
×
2523

2524
    return code;
×
2525
  }
2526
  return code;
3,537✔
2527
}
2528
int32_t bseMemTablGetMetaBlock(STableMemTable *p, SArray **pMetaBlock) {
3,537✔
2529
  int32_t inLock = 0;
3,537✔
2530
  int32_t lino = 0;
3,537✔
2531
  int32_t code = bseMemTableRef(p);
3,537✔
2532
  if (code != 0) {
3,537✔
2533
    bseError("Failed to ref memtable since %s", tstrerror(code));
×
2534
    return code;
×
2535
  }
2536

2537
  SArray *pBlock = taosArrayInit(8, sizeof(SMetaBlock));
3,537✔
2538
  if (pBlock == NULL) {
3,537✔
2539
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
2540
  }
2541
  taosRLockLatch(&p->latch);
3,537✔
2542
  inLock = 1;
3,537✔
2543

2544
  for (int32_t i = 0; i < taosArrayGetSize(p->pMetaHandle); i++) {
7,074✔
2545
    SBlkHandle *handle = taosArrayGet(p->pMetaHandle, i);
3,537✔
2546
    SMetaBlock  block = {.type = BSE_TABLE_META_TYPE,
7,074✔
2547
                         .version = BSE_DATA_VER,
2548
                         .range = handle->range,
2549
                         .offset = handle->offset,
3,537✔
2550
                         .size = handle->size};
3,537✔
2551
    if (taosArrayPush(pBlock, &block) == NULL) {
3,537✔
2552
      TSDB_CHECK_CODE(code = terrno, lino, _error);
×
2553
    }
2554
  }
2555
_error:
3,537✔
2556
  if (inLock) taosRUnLockLatch(&p->latch);
3,537✔
2557
  if (code != 0) {
3,537✔
2558
    bseError("failed to get meta block from memtable since %s", tstrerror(code));
×
2559
    taosArrayDestroy(pBlock);
×
2560
    pBlock = NULL;
×
2561
  }
2562
  bseMemTableUnRef(p);
3,537✔
2563

2564
  *pMetaBlock = pBlock;
3,537✔
2565

2566
  return code;
3,537✔
2567
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc