• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4991

17 Mar 2026 07:57AM UTC coverage: 69.756% (+0.4%) from 69.348%
#4991

push

travis-ci

web-flow
merge: from main to 3.0 branch #34807

14 of 16 new or added lines in 5 files covered. (87.5%)

3928 existing lines in 138 files now uncovered.

192146 of 275455 relevant lines covered (69.76%)

137208686.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.73
/source/dnode/vnode/src/bse/bseTable.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "bse.h"
17
#include "bseCache.h"
18
#include "bseSnapshot.h"
19
#include "bseTable.h"
20
#include "bseTableMgt.h"
21
#include "crypt.h"
22
#include "osMemPool.h"
23
#include "vnodeInt.h"
24

25
// table footer func
26
static int32_t footerEncode(STableFooter *pFooter, char *buf);
27
static int32_t footerDecode(STableFooter *pFooter, char *buf);
28

29
// block handle func
30
static int32_t blkHandleEncode(SBlkHandle *pHandle, char *buf);
31
static int32_t blkHandleDecode(SBlkHandle *pHandle, char *buf);
32

33
// table meta func
34
static int32_t metaBlockEncode(SMetaBlock *pMeta, char *buf);
35
static int32_t metaBlockDecode(SMetaBlock *pMeta, char *buf);
36

37
static int32_t metaBlockAdd(SBlock *p, SMetaBlock *pMeta);
38
static int32_t metaBlockGet(SBlock *p, SMetaBlock *pMeta);
39

40
// table footer func
41
static int32_t footerEncode(STableFooter *pFooter, char *buf);
42
static int32_t footerDecode(STableFooter *pFooter, char *buf);
43

44
// block func
45
static int32_t blockCreate(int32_t cap, SBlock **pBlock);
46
static void    blockDestroy(SBlock *pBlock);
47
static int32_t blockPut(SBlock *pBlock, int64_t seq, uint8_t *value, int32_t len);
48
static int32_t blockAppendBatch(SBlock *p, uint8_t *value, int32_t len);
49
static int32_t blockEsimateSize(SBlock *pBlock, int32_t extra);
50
static void    blockClear(SBlock *pBlock);
51
static int32_t blockSeek(SBlock *p, int64_t seq, uint8_t **pValue, int32_t *len);
52
static int8_t  blockGetType(SBlock *p);
53

54
static int32_t blockSeekMeta(SBlock *p, int64_t seq, SMetaBlock *pMeta);
55
static int32_t blockGetAllMeta(SBlock *p, SArray *pResult);
56
static int32_t metaBlockAddIndex(SBlock *p, SBlkHandle *pInfo);
57

58
static int32_t tableMetaWriterInit(SBTableMeta *pMeta, char *name, SBtableMetaWriter **ppWriter);
59
static int32_t tableMetaWriterCommit(SBtableMetaWriter *pMeta);
60
static void    tableMetaWriterClose(SBtableMetaWriter *p);
61
static int32_t tableMetaWriteAppendRawBlock(SBtableMetaWriter *pMeta, SBlockWrapper *pBlock, SBlkHandle *pBlkHandle);
62

63
static int32_t tableMetaReaderInit(SBTableMeta *pMeta, char *name, SBtableMetaReader **ppReader);
64
static void    tableMetaReaderClose(SBtableMetaReader *p);
65
static int32_t tableMetaReaderLoadIndex(SBtableMetaReader *p);
66

67
static int32_t tableMetaOpenFile(SBtableMetaWriter *pMeta, int8_t read, char *name);
68

69
static int32_t tableMetaReaderOpenIter(SBtableMetaReader *pReader, SBtableMetaReaderIter **pIter);
70
static int32_t tableMetaReaderIterNext(SBtableMetaReaderIter *pIter, SBlockWrapper *pDataWrapper,
71
                                       SBlkHandle *dstHandle);
72
static void    tableMetaReaderIterClose(SBtableMetaReaderIter *p);
73

74
// STable builder func
75
static int32_t tableBuilderGetBlockSize(STableBuilder *p);
76
static int32_t tableBuilderLoadBlock(STableBuilder *p, SBlkHandle *pHandle, SBlockWrapper *pBlkWrapper);
77
static int32_t tableBuilderSeek(STableBuilder *p, SBlkHandle *pHandle, int64_t seq, uint8_t **pValue, int32_t *len);
78
static void    tableBuilderUpdateBlockRange(STableBuilder *p, SBlockItemInfo *pInfo);
79
static void    tableBuildUpdateTableRange(STableBuilder *p, SBlockItemInfo *pInfo);
80

81
// STable pReaderMgt func
82

83
static int32_t tableReaderInitMeta(STableReader *p, SBlock *pBlock);
84

85
static int32_t tableReaderLoadRawBlock(STableReader *p, SBlkHandle *pHandle, SBlockWrapper *pBlkWrapper);
86
static int32_t tableReaderLoadRawMeta(STableReader *p, SBlkHandle *pHandle, SBlockWrapper *blkWrapper);
87
static int32_t tableReaderLoadRawMetaIndex(STableReader *p, SBlockWrapper *blkWrapper);
88
static int32_t tableReaderLoadRawFooter(STableReader *p, SBlockWrapper *blkWrapper);
89

90
static int32_t tableOpenFile(char *name, int8_t read, TdFilePtr *pFile, int64_t *size);
91
static int32_t tableFlushBlock(TdFilePtr pFile, SBlkHandle *pHandle, SBlockWrapper *pBlk, int32_t *nWrite);
92
static int32_t tableLoadBlock(TdFilePtr pFile, SBlkHandle *pHandle, SBlockWrapper *pBlk);
93
static int32_t tableLoadRawBlock(TdFilePtr pFile, SBlkHandle *pHandle, SBlockWrapper *pBlk, int8_t checkSum);
94

95
/*---block formate----*/
96
//---datatype--|---len---|--data---|--rawdatasize---|--compressType---|---checksum---|
97
//- int8_t   |  int32_t | uint8_t[] |    int32_t     |      int8_t     |    TSCKSUM|
98
#define BLOCK_ROW_SIZE_OFFSET(p)      (sizeof(SBlock) + (p)->len)
99
#define BLOCK_ROW_SIZE(p)             BLOCK_ROW_SIZE_OFFSET(p)
100
#define BLOCK_COMPRESS_TYPE_OFFSET(p) (BLOCK_ROW_SIZE_OFFSET(p) + sizeof(int32_t))
101
#define BLOCK_CHECKSUM_OFFSET(p)      (BLOCK_COMPRESS_TYPE_OFFSET(p) + sizeof(int8_t))
102
#define BLOCK_TOTAL_SIZE(p)           (BLOCK_CHECKSUM_OFFSET(p) + sizeof(TSCKSUM))
103

104
#define BLOCK_SET_ROW_SIZE(p, size) *(int32_t *)((char *)(p) + BLOCK_ROW_SIZE_OFFSET(p)) = (size)
105
#define BLOCK_GET_ROW_SIZE(p)       *(int32_t *)((char *)(p) + BLOCK_ROW_SIZE_OFFSET(p))
106

107
#define BLOCK_SET_COMPRESS_TYPE(p, type) *(int8_t *)((char *)(p) + BLOCK_COMPRESS_TYPE_OFFSET(p)) = (type)
108
#define BLOCK_GET_COMPRESS_TYPE(p)       *(int8_t *)((char *)(p) + BLOCK_COMPRESS_TYPE_OFFSET(p))
109

110
#define BLOCK_TAIL_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(TSCKSUM))
111

112
#define COMREPSS_DATA_SET_TYPE_AND_RAWLEN(p, len, type, rawLen) \
113
  do {                                                          \
114
    *(int32_t *)((char *)(p) + len) = (rawLen);                 \
115
    *(int8_t *)((char *)(p) + len + sizeof(int32_t)) = (type);  \
116
  } while (0);
117
#define COMPRESS_DATA_GET_TYPE_AND_RAWLEN(p, len, type, rawLen)                 \
118
  do {                                                                          \
119
    (rawLen) = *(int32_t *)((char *)(p) + len - BLOCK_TAIL_LEN);                \
120
    (type) = *(int8_t *)((char *)(p) + len - BLOCK_TAIL_LEN + sizeof(int32_t)); \
121
  } while (0);
122

UNCOV
123
int32_t tableBuilderSeek(STableBuilder *p, SBlkHandle *pHandle, int64_t seq, uint8_t **pValue, int32_t *len) {
×
UNCOV
124
  int32_t code = 0;
×
UNCOV
125
  int32_t lino = 0;
×
126

UNCOV
127
  SBlockWrapper blockWrapper = {0};
×
128

UNCOV
129
  code = tableBuilderLoadBlock(p, pHandle, &blockWrapper);
×
UNCOV
130
  TSDB_CHECK_CODE(code, lino, _error);
×
131

UNCOV
132
  code = blockSeek(blockWrapper.data, seq, pValue, len);
×
UNCOV
133
  TSDB_CHECK_CODE(code, lino, _error);
×
134

UNCOV
135
_error:
×
UNCOV
136
  if (code != 0) {
×
137
    bseError("failed to seek data from table builder at lino %d ince %s", lino, tstrerror(code));
×
138
  }
UNCOV
139
  blockWrapperCleanup(&blockWrapper);
×
UNCOV
140
  return code;
×
141
}
142

UNCOV
143
int32_t tableBuilderLoadBlock(STableBuilder *p, SBlkHandle *pHandle, SBlockWrapper *pBlkWrapper) {
×
UNCOV
144
  int32_t code = 0;
×
UNCOV
145
  int32_t lino = 0;
×
UNCOV
146
  code = blockWrapperInit(pBlkWrapper, pHandle->size);
×
UNCOV
147
  TSDB_CHECK_CODE(code, lino, _error);
×
148

149
  // Set pBse pointer for encryption/decryption
UNCOV
150
  pBlkWrapper->pBse = p->pBse;
×
151

UNCOV
152
  code = tableLoadBlock(p->pDataFile, pHandle, pBlkWrapper);
×
UNCOV
153
_error:
×
UNCOV
154
  if (code != 0) {
×
155
    bseError("failed to load block from table builder at lino %d since %s", lino, tstrerror(code));
×
156
  }
UNCOV
157
  return code;
×
158
}
159

160
int32_t tableBuilderOpen(int64_t ts, STableBuilder **pBuilder, SBse *pBse) {
5,376✔
161
  int32_t code = 0;
5,376✔
162
  int32_t lino = 0;
5,376✔
163

164
  char name[TSDB_FILENAME_LEN] = {0};
5,376✔
165
  char path[TSDB_FILENAME_LEN] = {0};
5,376✔
166
  bseBuildDataName(ts, name);
5,376✔
167
  bseBuildFullName(pBse, name, path);
5,376✔
168

169
  STableBuilder *p = taosMemoryCalloc(1, sizeof(STableBuilder));
5,376✔
170
  if (p == NULL) {
5,376✔
171
    TSDB_CHECK_CODE(terrno, lino, _error);
×
172
  }
173
  p->timestamp = ts;
5,376✔
174
  memcpy(p->name, name, strlen(name));
5,376✔
175

176
  p->blockCap = BSE_BLOCK_SIZE(pBse);
5,376✔
177

178
  code = bseMemTableCreate(&p->pMemTable, BSE_BLOCK_SIZE(pBse));
5,376✔
179
  TSDB_CHECK_CODE(code, lino, _error);
5,376✔
180

181
  p->compressType = BSE_COMPRESS_TYPE(pBse);
5,376✔
182
  TSDB_CHECK_CODE(code, lino, _error);
5,376✔
183

184
  seqRangeReset(&p->tableRange);
5,376✔
185
  seqRangeReset(&p->blockRange);
5,376✔
186

187
  p->pBse = pBse;
5,376✔
188
  code = tableOpenFile(path, 0, &p->pDataFile, &p->offset);
5,376✔
189
  p->blockCap = BSE_BLOCK_SIZE(pBse);
5,376✔
190

191
  *pBuilder = p;
5,376✔
192
  p->pMemTable->pTableBuilder = p;
5,376✔
193

194
_error:
5,376✔
195
  if (code != 0) {
5,376✔
196
    (void)tableBuilderClose(p, 0);
×
197
    bseError("failed to open table builder at line %d since %s", lino, tstrerror(code));
×
198
  }
199
  return code;
5,376✔
200
}
201

202
int32_t tableBuilderGetMetaBlock(STableBuilder *p, SArray **pMetaBlock) {
6,060✔
203
  return bseMemTablGetMetaBlock(p->pImmuMemTable, pMetaBlock);
6,060✔
204
}
205

206
int32_t tableBuilderAddMeta(STableBuilder *p, SBlkHandle *pHandle, int8_t immu) {
5,620✔
207
  int32_t code = 0;
5,620✔
208
  int32_t lino = 0;
5,620✔
209

210
  STableMemTable *pMemTable = immu ? p->pImmuMemTable : p->pMemTable;
5,620✔
211

212
  code = bseMemTableRef(pMemTable);
5,620✔
213
  TAOS_CHECK_GOTO(code, &lino, _error);
5,620✔
214

215
  code = bseMemTablePush(pMemTable, pHandle);
5,620✔
216
  TAOS_CHECK_GOTO(code, &lino, _error);
5,620✔
217

218
  seqRangeReset(&pMemTable->range);
5,620✔
219
_error:
5,620✔
220
  bseMemTableUnRef(pMemTable);
5,620✔
221
  return code;
5,620✔
222
}
223
int32_t tableBuilderSetBlockInfo(STableMemTable *pMemTable) {
6,060✔
224
  int32_t        code = 0;
6,060✔
225
  int32_t        lino = 0;
6,060✔
226
  SBlockWrapper *pWp = &pMemTable->pBlockWrapper;
6,060✔
227

228
  code = blockWrapperResize(pWp, BLOCK_TOTAL_SIZE((SBlock *)(pWp->data)) + pWp->kvSize);
6,060✔
229
  TSDB_CHECK_CODE(code, lino, _error);
6,060✔
230

231
  SBlock *pBlock = (SBlock *)pWp->data;
6,060✔
232

233
  pBlock->offset = pBlock->len;
6,060✔
234
  memcpy(pBlock->data + pBlock->len, pWp->kvBuffer, pWp->kvSize);
6,060✔
235
  pBlock->len += pWp->kvSize;
6,060✔
236
  pBlock->version = BSE_DATA_VER;
6,060✔
237
_error:
6,060✔
238
  return code;
6,060✔
239
}
240
int32_t tableBuilderFlush(STableBuilder *p, int8_t type, int8_t immutable) {
6,060✔
241
  int32_t code = 0;
6,060✔
242
  int32_t lino = 0;
6,060✔
243
  int8_t  inLock = 0;
6,060✔
244

245
  STableMemTable *pMemTable = immutable ? p->pImmuMemTable : p->pMemTable;
6,060✔
246
  if (p == NULL) return code;
6,060✔
247

248
  SBlockWrapper wrapper = {0};
6,060✔
249
  code = bseMemTableRef(pMemTable);
6,060✔
250
  TSDB_CHECK_CODE(code, lino, _error);
6,060✔
251

252
  if (immutable) {
6,060✔
253
    taosWLockLatch(&pMemTable->latch);
6,060✔
254
    inLock = 1;
6,060✔
255
  }
256

257
  code = tableBuilderSetBlockInfo(pMemTable);
6,060✔
258
  TSDB_CHECK_CODE(code, lino, _error);
6,060✔
259

260
  SBlock *pBlk = pMemTable->pBlockWrapper.data;
6,060✔
261
  if (pBlk->len == 0) {
6,060✔
262
    goto _error;
440✔
263
  }
264

265
  int8_t compressType = BSE_COMPRESS_TYPE(p->pBse);
5,620✔
266

267
  uint8_t *pWrite = (uint8_t *)pBlk;
5,620✔
268
  int32_t  len = BLOCK_TOTAL_SIZE(pBlk);
5,620✔
269

270
  pBlk->type = type;
5,620✔
271

272
  BLOCK_SET_COMPRESS_TYPE(pBlk, compressType);
5,620✔
273
  BLOCK_SET_ROW_SIZE(pBlk, BLOCK_ROW_SIZE(pBlk));
5,620✔
274

275
  if (compressType != kNoCompres) {
5,620✔
276
    code = blockWrapperInit(&wrapper, len + 16);
5,620✔
277
    TSDB_CHECK_CODE(code, lino, _error);
5,620✔
278

279
    int32_t compressSize = wrapper.cap;
5,620✔
280
    code = bseCompressData(compressType, pWrite, BLOCK_ROW_SIZE(pBlk), wrapper.data, &compressSize);
5,620✔
281
    if (code != 0) {
5,620✔
282
      bseWarn("failed to compress data since %s, not set compress", tstrerror(TSDB_CODE_THIRDPARTY_ERROR));
×
283

284
      blockWrapperCleanup(&wrapper);
×
285
      BLOCK_SET_COMPRESS_TYPE(pBlk, kNoCompres);
×
286
      BLOCK_SET_ROW_SIZE(pBlk, BLOCK_ROW_SIZE(pBlk));
×
287
    } else {
288
      int32_t rawSize = BLOCK_ROW_SIZE(pBlk);
5,620✔
289
      COMREPSS_DATA_SET_TYPE_AND_RAWLEN(wrapper.data, compressSize, compressType, rawSize);
5,620✔
290
      len = compressSize + BLOCK_TAIL_LEN;
5,620✔
291

292
      pWrite = (uint8_t *)wrapper.data;
5,620✔
293
    }
294
  }
295

296
  code = taosCalcChecksumAppend(0, (uint8_t *)pWrite, len);
5,620✔
297
  TSDB_CHECK_CODE(code, lino, _error);
5,620✔
298

299
  SBlkHandle handle = {.size = len, .offset = p->offset, .range = pMemTable->range};
5,620✔
300

301
  bseDebug("bse flush at offset %" PRId64 " len: %d, block range sseq:%" PRId64 ", eseq:%" PRId64 "", p->offset, len,
5,620✔
302
           handle.range.sseq, handle.range.eseq);
303

304
  int64_t n = taosLSeekFile(p->pDataFile, handle.offset, SEEK_SET);
5,620✔
305
  if (n < 0) {
5,620✔
306
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
307
  }
308

309
  int64_t nwrite = taosWriteFile(p->pDataFile, (uint8_t *)pWrite, len);
5,620✔
310
  if (nwrite != len) {
5,620✔
311
    code = terrno;
×
312
    TSDB_CHECK_CODE(code, lino, _error);
×
313
  }
314
  p->offset += len;
5,620✔
315

316
  code = tableBuilderAddMeta(p, &handle, immutable);
5,620✔
317

318
_error:
6,060✔
319
  if (code != 0) {
6,060✔
320
    bseError("failed to flush table builder at line %d since %s", lino, tstrerror(code));
×
321
  }
322

323
  if (pMemTable != NULL) {
6,060✔
324
    if (!immutable) blockWrapperClear(&pMemTable->pBlockWrapper);
6,060✔
325
    if (inLock) {
6,060✔
326
      taosWUnLockLatch(&pMemTable->latch);
6,060✔
327
    }
328
    bseMemTableUnRef(pMemTable);
6,060✔
329
  }
330
  blockWrapperCleanup(&wrapper);
6,060✔
331
  return code;
6,060✔
332
}
333

334
void tableBuildUpdateTableRange(STableBuilder *p, SBlockItemInfo *pInfo) {
27,684✔
335
  SSeqRange range = {.sseq = pInfo->seq, .eseq = pInfo->seq};
27,684✔
336
  seqRangeUpdate(&p->tableRange, &range);
27,684✔
337
}
27,684✔
338

339
void tableBuilderUpdateBlockRange(STableBuilder *p, SBlockItemInfo *pInfo) {
22,361,688✔
340
  SSeqRange range = {.sseq = pInfo->seq, .eseq = pInfo->seq};
22,361,688✔
341
  seqRangeUpdate(&p->blockRange, &range);
22,361,688✔
342
}
22,361,688✔
343
void memtableUpdateBlockRange(STableMemTable *p, SBlockItemInfo *pInfo) {
22,389,372✔
344
  SSeqRange range = {.sseq = pInfo->seq, .eseq = pInfo->seq};
22,389,372✔
345
  seqRangeUpdate(&p->range, &range);
22,389,372✔
346
  seqRangeUpdate(&p->tableRange, &range);
22,389,372✔
347
}
22,389,372✔
348

349
// table block data
350
// data1 data2 data3 data4 k1v1 k2v2, k3,v3 compresss size raw_size
351
//|seq len value|seq len value| seq len value| seq len value|
352
int32_t tableBuilderPut(STableBuilder *p, SBseBatch *pBatch) {
32,007✔
353
  int32_t code = 0;
32,007✔
354
  int32_t lino = 0;
32,007✔
355
  int32_t len = 0, offset = 0;
32,007✔
356
  int8_t  inLock = 0;
32,007✔
357

358
  code = bseMemTableRef(p->pMemTable);
32,007✔
359
  if (code != 0) {
32,007✔
360
    return code;
×
361
  }
362

363
  taosWLockLatch(&p->pMemTable->latch);
32,007✔
364
  inLock = 1;
32,007✔
365

366
  SBlockWrapper *pBlockWrapper = &p->pMemTable->pBlockWrapper;
32,007✔
367

368
  for (int32_t i = 0; i < taosArrayGetSize(pBatch->pSeq);) {
22,393,695✔
369
    SBlockItemInfo *pInfo = taosArrayGet(pBatch->pSeq, i);
22,361,688✔
370
    if (i == 0 || i == taosArrayGetSize(pBatch->pSeq) - 1) {
22,361,688✔
371
      tableBuildUpdateTableRange(p, pInfo);
27,684✔
372
      memtableUpdateBlockRange(p->pMemTable, pInfo);
27,684✔
373
    }
374

375
    if (atomic_load_8(&p->hasImmuMemTable) ||
44,723,376✔
376
        (blockWrapperSize(pBlockWrapper, len + pInfo->size) < tableBuilderGetBlockSize(p))) {
22,361,688✔
377
      i++;
22,361,688✔
378
      len += pInfo->size;
22,361,688✔
379
      tableBuilderUpdateBlockRange(p, pInfo);
22,361,688✔
380
      memtableUpdateBlockRange(p->pMemTable, pInfo);
22,361,688✔
381

382
      code = blockWrapperPushMeta(pBlockWrapper, pInfo->seq, NULL, pInfo->size);
22,361,688✔
383
      TSDB_CHECK_CODE(code, lino, _error);
22,361,688✔
384

385
      bseTrace("start to insert  bse table builder mem %p, idx %d", p->pMemTable, i);
22,361,688✔
386
      continue;
22,361,688✔
387
    } else {
388
      if (len > 0) {
×
389
        offset += blockAppendBatch(pBlockWrapper->data, pBatch->buf + offset, len);
×
390
      }
391
      bseTrace("start to flush bse table builder mem %p", p->pMemTable);
×
392
      code = tableBuilderFlush(p, BSE_TABLE_DATA_TYPE, 0);
×
393
      TSDB_CHECK_CODE(code, lino, _error);
×
394
      len = 0;
×
395
    }
396
  }
397

398
  if (offset < pBatch->len) {
32,007✔
399
    int32_t size = pBatch->len - offset;
21,971✔
400
    if (size > 0) {
21,971✔
401
      code = blockWrapperResize(pBlockWrapper,
21,971✔
402
                                size + BLOCK_TOTAL_SIZE((SBlock *)(pBlockWrapper->data)) + pBlockWrapper->kvSize);
21,971✔
403
      TSDB_CHECK_CODE(code, lino, _error);
21,971✔
404
    }
405

406
    if (blockAppendBatch(pBlockWrapper->data, pBatch->buf + offset, size) != size) {
21,971✔
407
      code = TSDB_CODE_INVALID_PARA;
×
408
    }
409
  }
410
_error:
32,007✔
411
  if (code != 0) {
32,007✔
412
    bseError("failed to append batch since %s", tstrerror(code));
×
413
  }
414
  if (inLock) {
32,007✔
415
    taosWUnLockLatch(&p->pMemTable->latch);
32,007✔
416
  }
417

418
  bseMemTableUnRef(p->pMemTable);
32,007✔
419
  return code;
32,007✔
420
}
421

422
int32_t tableBuilderTruncFile(STableBuilder *p, int64_t size) {
×
423
  int32_t code = 0;
×
424
  int32_t lino = 0;
×
425

426
  if (p->pDataFile == NULL) {
×
427
    return TSDB_CODE_INVALID_PARA;
×
428
  }
429
  code = taosFtruncateFile(p->pDataFile, size);
×
430
  TSDB_CHECK_CODE(code, lino, _error);
×
431

432
_error:
×
433
  if (code != 0) {
×
434
    bseError("failed to truncate file since %s", tstrerror(code));
×
435
  }
436
  return code;
×
437
}
438

439
int32_t compareFunc(const void *pLeft, const void *pRight) {
21,514,020✔
440
  SBlkHandle *p1 = (SBlkHandle *)pLeft;
21,514,020✔
441
  SBlkHandle *p2 = (SBlkHandle *)pRight;
21,514,020✔
442
  if (p1->range.sseq > p2->range.sseq) {
21,514,020✔
443
    return 1;
21,513,437✔
444
  } else if (p1->range.sseq < p2->range.sseq) {
583✔
445
    return -1;
×
446
  }
447
  return 0;
583✔
448
}
UNCOV
449
int32_t findTargetBlock(SArray *pMetaHandle, int64_t seq) {
×
UNCOV
450
  SBlkHandle handle = {.range = {.sseq = seq, .eseq = seq}};
×
UNCOV
451
  return taosArraySearchIdx(pMetaHandle, &handle, compareFunc, TD_LE);
×
452
}
453

454
int32_t findInMemtable(STableMemTable *p, int64_t seq, uint8_t **value, int32_t *len) {
22,049,596✔
455
  int32_t code = 0;
22,049,596✔
456
  int8_t  inBuf = 1;
22,049,596✔
457
  int32_t lino = 0;
22,049,596✔
458
  int8_t  inLock = 0;
22,049,596✔
459
  if (p == NULL) {
22,049,596✔
UNCOV
460
    return TSDB_CODE_NOT_FOUND;
×
461
  }
462

463
  code = bseMemTableRef(p);
22,049,596✔
464
  if (code != 0) {
22,049,596✔
465
    return code;
×
466
  }
467

468
  taosRLockLatch(&p->latch);
22,049,596✔
469
  inLock = 1;
22,049,596✔
470

471
  if (!seqRangeContains(&p->tableRange, seq)) {
22,049,596✔
472
    TSDB_CHECK_CODE(TSDB_CODE_NOT_FOUND, lino, _error);
×
473
  }
474

475
  if (taosArrayGetSize(p->pMetaHandle) > 0) {
22,049,596✔
UNCOV
476
    SBlkHandle *pHandle = taosArrayGetLast(p->pMetaHandle);
×
UNCOV
477
    if (!seqRangeIsGreater(&pHandle->range, seq)) {
×
UNCOV
478
      inBuf = 0;
×
UNCOV
479
      int32_t idx = findTargetBlock(p->pMetaHandle, seq);
×
UNCOV
480
      if (idx < 0) {
×
481
        TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_RANGE, lino, _error);
×
482
      }
UNCOV
483
      pHandle = taosArrayGet(p->pMetaHandle, idx);
×
UNCOV
484
      code = tableBuilderSeek(p->pTableBuilder, pHandle, seq, value, len);
×
UNCOV
485
      TSDB_CHECK_CODE(code, lino, _error);
×
486
    }
487
  }
488

489
  if (inBuf == 1) {
22,049,596✔
490
    code = blockWrapperSeek(&p->pBlockWrapper, seq, value, len);
22,049,596✔
491
    if (code != 0) {
22,049,596✔
492
      bseInfo("mem table range [%" PRId64 ", %" PRId64 "]", p->tableRange.sseq, p->tableRange.eseq);
×
493
    }
494
    TSDB_CHECK_CODE(code, lino, _error);
22,049,596✔
495
  }
496
_error:
22,049,596✔
497
  if (inLock) {
22,049,596✔
498
    taosRUnLockLatch(&p->latch);
22,049,596✔
499
  }
500
  bseMemTableUnRef(p);
22,049,596✔
501
  if (code != 0) {
22,049,596✔
502
    bseInfo("failed to find seq %" PRId64 " in memtable %p at line %d since %s", seq, p, lino, tstrerror(code));
×
503
  }
504
  return code;
22,049,596✔
505
}
506
int32_t tableBuilderGet(STableBuilder *p, int64_t seq, uint8_t **value, int32_t *len) {
22,049,596✔
507
  int32_t code = 0;
22,049,596✔
508
  if (p == NULL) {
22,049,596✔
509
    return TSDB_CODE_NOT_FOUND;
×
510
  }
511

512
  code = findInMemtable(p->pMemTable, seq, value, len);
22,049,596✔
513
  if (code != 0) {
22,049,596✔
UNCOV
514
    code = findInMemtable(p->pImmuMemTable, seq, value, len);
×
515
  }
516
  return code;
22,049,596✔
517
}
518

519
static void updateTableRange(SBTableMeta *pTableMeta, SArray *pMetaBlock) {
5,620✔
520
  if (pMetaBlock == NULL) {
5,620✔
521
    return;
×
522
  }
523

524
  for (int32_t i = 0; i < taosArrayGetSize(pMetaBlock); i++) {
11,240✔
525
    SMetaBlock *pMeta = taosArrayGet(pMetaBlock, i);
5,620✔
526
    seqRangeUpdate(&pTableMeta->range, &pMeta->range);
5,620✔
527
  }
528
}
529
static int32_t tableBuilderClearImmuMemTable(STableBuilder *p) {
5,620✔
530
  int32_t           code = 0;
5,620✔
531
  STableBuilderMgt *pMgt = p->pBuilderMgt;
5,620✔
532
  (void)taosThreadRwlockWrlock(&pMgt->mutex);
5,620✔
533
  atomic_store_8(&p->hasImmuMemTable, 0);
5,620✔
534
  bseMemTableUnRef(p->pImmuMemTable);
5,620✔
535
  p->pImmuMemTable = NULL;
5,620✔
536

537
  (void)taosThreadRwlockUnlock(&pMgt->mutex);
5,620✔
538
  return code;
5,620✔
539
}
540
static int32_t tableBuildeSwapMemTable(STableBuilder *p) {
×
541
  int32_t code = 0;
×
542
  (void)taosThreadRwlockWrlock(&p->pBse->rwlock);
×
543
  p->pImmuMemTable = p->pMemTable;
×
544
  p->pMemTable = NULL;
×
545

546
  atomic_store_8(&p->hasImmuMemTable, 1);
×
547

548
  (void)taosThreadRwlockUnlock(&p->pBse->rwlock);
×
549
  return code;
×
550
}
551

552
int32_t tableBuilderCommit(STableBuilder *p, SBseLiveFileInfo *pInfo) {
6,060✔
553
  int32_t code = 0;
6,060✔
554
  int32_t lino = 0;
6,060✔
555

556
  STableCommitInfo commitInfo = {0};
6,060✔
557
  SArray          *pMetaBlock = NULL;
6,060✔
558
  if (p == NULL) {
6,060✔
559
    return TSDB_CODE_INVALID_PARA;
×
560
  }
561

562
  code = tableBuilderFlush(p, BSE_TABLE_DATA_TYPE, 1);
6,060✔
563
  TSDB_CHECK_CODE(code, lino, _error);
6,060✔
564

565
  code = taosFsyncFile(p->pDataFile);
6,060✔
566
  TSDB_CHECK_CODE(code, lino, _error);
6,060✔
567

568
  code = tableBuilderGetMetaBlock(p, &pMetaBlock);
6,060✔
569
  TSDB_CHECK_CODE(code, lino, _error);
6,060✔
570

571
  if (taosArrayGetSize(pMetaBlock) == 0) {
6,060✔
572
    bseDebug("no meta block to commit for table %s", p->name);
440✔
573
    taosArrayDestroy(pMetaBlock);
440✔
574
    pMetaBlock = NULL;
440✔
575
    return code;
440✔
576
  }
577

578
  code = tableMetaCommit(p->pTableMeta, pMetaBlock);
5,620✔
579
  TSDB_CHECK_CODE(code, lino, _error);
5,620✔
580

581
  updateTableRange(p->pTableMeta, pMetaBlock);
5,620✔
582

583
  pInfo->level = 0;
5,620✔
584
  pInfo->range = p->pTableMeta->range;
5,620✔
585
  pInfo->timestamp = p->timestamp;
5,620✔
586
  pInfo->size = p->offset;
5,620✔
587

588
  code = tableBuilderClearImmuMemTable(p);
5,620✔
589
  TSDB_CHECK_CODE(code, lino, _error);
5,620✔
590

591
_error:
5,620✔
592
  if (code != 0) {
5,620✔
593
    bseError("failed to commit table builder at line %d since %s ", lino, tstrerror(code));
×
594
  } else {
595
    bseInfo("succ to commit table %s", p->name);
5,620✔
596
  }
597
  taosArrayDestroy(pMetaBlock);
5,620✔
598
  return code;
5,620✔
599
}
600

601
int32_t tableBuilderGetBlockSize(STableBuilder *p) { return p->blockCap; }
22,361,688✔
602

603
void tableBuilderClose(STableBuilder *p, int8_t commited) {
6,839✔
604
  if (p == NULL) {
6,839✔
605
    return;
1,463✔
606
  }
607

608
  bseMemTableUnRef(p->pMemTable);
5,376✔
609
  bseMemTableUnRef(p->pImmuMemTable);
5,376✔
610

611
  if (taosCloseFile(&p->pDataFile) != 0) {
5,376✔
612
    bseError("failed to close table builder file %s since %s", p->name, tstrerror(terrno));
×
613
  }
614
  taosMemoryFree(p);
5,376✔
615
}
616

617
static void addSnapshotMetaToBlock(SBlockWrapper *pBlkWrapper, SSeqRange range, int8_t fileType, int8_t blockType,
×
618
                                   int64_t timestamp) {
619
  SBseSnapMeta *pSnapMeta = pBlkWrapper->data;
×
620

621
  pSnapMeta->range = range;
×
622
  pSnapMeta->fileType = fileType;
×
623
  pSnapMeta->blockType = blockType;
×
624
  pSnapMeta->timestamp = timestamp;
×
625
  return;
×
626
}
627

628
static void updateSnapshotMeta(SBlockWrapper *pBlkWrapper, SSeqRange range, int8_t fileType, int8_t blockType,
×
629
                               int64_t timestamp) {
630
  SBseSnapMeta *pSnapMeta = (SBseSnapMeta *)pBlkWrapper->data;
×
631
  pSnapMeta->timestamp = timestamp;
×
632
  return;
×
633
}
634
int32_t tableReaderLoadRawBlock(STableReader *p, SBlkHandle *pHandle, SBlockWrapper *blkWrapper) {
×
635
  int32_t code = 0;
×
636
  int32_t lino = 0;
×
637

638
  code = blockWrapperResize(blkWrapper, pHandle->size + sizeof(SBseSnapMeta));
×
639
  TSDB_CHECK_CODE(code, lino, _error);
×
640

641
  code = tableLoadRawBlock(p->pDataFile, pHandle, blkWrapper, 1);
×
642
  TSDB_CHECK_CODE(code, lino, _error);
×
643

644
  addSnapshotMetaToBlock(blkWrapper, p->range, BSE_TABLE_SNAP, BSE_TABLE_DATA_TYPE, p->timestamp);
×
645

646
_error:
×
647
  if (code != 0) {
×
648
    bseError("table reader failed to load block at line %d since %s", lino, tstrerror(code));
×
649
  }
650
  return code;
×
651
}
652

653
int32_t tableReaderLoadRawMeta(STableReader *p, SBlkHandle *pHandle, SBlockWrapper *blkWrapper) {
×
654
  int32_t code = 0;
×
655
  int32_t lino = 0;
×
656

657
  SBtableMetaReader *pReader = p->pMetaReader;
×
658

659
  code = blockWrapperResize(blkWrapper, pHandle->size + sizeof(SBseSnapMeta));
×
660
  TSDB_CHECK_CODE(code, lino, _error);
×
661

662
  code = tableLoadRawBlock(pReader->pFile, pHandle, blkWrapper, 1);
×
663
  TSDB_CHECK_CODE(code, lino, _error);
×
664

665
  addSnapshotMetaToBlock(blkWrapper, p->range, BSE_TABLE_META_SNAP, BSE_TABLE_META_TYPE, p->timestamp);
×
666
_error:
×
667
  if (code != 0) {
×
668
    bseError("failed to load raw meta from table pReaderMgt at line %d lino since %s", lino, tstrerror(code));
×
669
  }
670
  return code;
×
671
}
672
int32_t tableReaderLoadRawMetaIndex(STableReader *p, SBlockWrapper *blkWrapper) {
×
673
  int32_t code = 0;
×
674
  int32_t lino = 0;
×
675

676
  SBtableMetaReader *pReader = p->pMetaReader;
×
677
  SBlkHandle        *pHandle = p->pMetaReader->footer.metaHandle;
×
678

679
  code = blockWrapperResize(blkWrapper, pHandle->size + sizeof(SBseSnapMeta));
×
680
  TSDB_CHECK_CODE(code, lino, _error);
×
681

682
  code = tableLoadRawBlock(pReader->pFile, pHandle, blkWrapper, 1);
×
683
  TSDB_CHECK_CODE(code, lino, _error);
×
684

685
  addSnapshotMetaToBlock(blkWrapper, p->range, BSE_TABLE_META_SNAP, BSE_TABLE_META_INDEX_TYPE, p->timestamp);
×
686
_error:
×
687
  if (code != 0) {
×
688
    bseError("failed to load raw meta from table pReaderMgt at line %d lino since %s", lino, tstrerror(code));
×
689
  }
690
  return code;
×
691
}
692
int32_t tableReaderLoadRawFooter(STableReader *p, SBlockWrapper *blkWrapper) {
×
693
  int32_t code = 0;
×
694
  int32_t lino = 0;
×
695
  char    buf[kEncodeLen] = {0};
×
696

697
  SBtableMetaReader *pReader = p->pMetaReader;
×
698
  code = footerEncode(&pReader->footer, buf);
×
699
  int32_t len = sizeof(buf);
×
700

701
  int64_t n = taosLSeekFile(pReader->pFile, -kEncodeLen, SEEK_END);
×
702
  if (n < 0) {
×
703
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
704
  }
705

706
  if (taosReadFile(pReader->pFile, buf, sizeof(buf)) != len) {
×
707
    TSDB_CHECK_CODE(terrno, lino, _error);
×
708
  }
709

710
  code = blockWrapperResize(blkWrapper, len + sizeof(SBseSnapMeta));
×
711
  TSDB_CHECK_CODE(code, lino, _error);
×
712

713
  memcpy((uint8_t *)blkWrapper->data + sizeof(SBseSnapMeta), buf, sizeof(buf));
×
714
  blkWrapper->size = len + sizeof(SBseSnapMeta);
×
715

716
  addSnapshotMetaToBlock(blkWrapper, p->range, BSE_TABLE_META_SNAP, BSE_TABLE_FOOTER_TYPE, p->timestamp);
×
717
_error:
×
718
  if (code != 0) {
×
719
    bseError("failed to load raw footer from table pReaderMgt at lino %d since %s", lino, tstrerror(code));
×
720
  }
721
  return code;
×
722
}
723

724
int32_t tableReaderOpen(int64_t timestamp, STableReader **pReader, void *pReaderMgt) {
11,020,020✔
725
  char data[TSDB_FILENAME_LEN] = {0};
11,020,020✔
726
  char meta[TSDB_FILENAME_LEN] = {0};
11,020,020✔
727

728
  char dataPath[TSDB_FILENAME_LEN] = {0};
11,020,020✔
729

730
  int32_t code = 0;
11,020,020✔
731
  int32_t lino = 0;
11,020,020✔
732
  int64_t size = 0;
11,020,020✔
733

734
  STableReaderMgt *pMgt = (STableReaderMgt *)pReaderMgt;
11,020,020✔
735
  if (pMgt == NULL) {
11,020,020✔
736
    return TSDB_CODE_INVALID_CFG;
×
737
  }
738

739
  SSubTableMgt *pMeta = pMgt->pMgt;
11,020,020✔
740

741
  STableReader *p = taosMemCalloc(1, sizeof(STableReader));
11,020,020✔
742
  if (p == NULL) {
11,020,020✔
743
    TSDB_CHECK_CODE(terrno, lino, _error);
×
744
  }
745

746
  bseBuildDataName(timestamp, data);
11,020,020✔
747

748
  p->timestamp = timestamp;
11,020,020✔
749
  p->blockCap = 1024;
11,020,020✔
750
  p->pReaderMgt = pReaderMgt;
11,020,020✔
751
  memcpy(p->name, data, strlen(data));
11,020,020✔
752

753
  bseBuildFullName(pMgt->pBse, data, dataPath);
11,020,020✔
754
  code = tableOpenFile(dataPath, 1, &p->pDataFile, &p->fileSize);
11,020,020✔
755
  TSDB_CHECK_CODE(code, lino, _error);
11,020,020✔
756

757
  code = blockWrapperInit(&p->blockWrapper, 1024);
11,020,020✔
758
  TSDB_CHECK_CODE(code, lino, _error);
11,020,020✔
759

760
  // Set pBse pointer for encryption/decryption
761
  p->blockWrapper.pBse = pMgt->pBse;
11,020,020✔
762

763
  bseBuildMetaName(timestamp, meta);
11,020,020✔
764
  code = tableMetaReaderInit(pMeta->pTableMetaMgt->pTableMeta, meta, &p->pMetaReader);
11,020,020✔
765
  TSDB_CHECK_CODE(code, lino, _error);
11,020,020✔
766

767
  *pReader = p;
11,020,020✔
768

769
_error:
11,020,020✔
770
  if (code != 0) {
11,020,020✔
771
    tableReaderClose(p);
×
772
    bseError("failed to open table pReaderMgt at line %d since %s", lino, tstrerror(code));
×
773
  }
774
  return code;
11,020,020✔
775
}
776
void tableReaderShouldPutToCache(STableReader *p, int8_t cache) { p->putInCache = cache; }
×
777

778
int32_t tableReaderGet(STableReader *p, int64_t seq, uint8_t **pValue, int32_t *len) {
11,020,020✔
779
  int32_t    lino = 0;
11,020,020✔
780
  int32_t    code = 0;
11,020,020✔
781
  SMetaBlock block = {0};
11,020,020✔
782

783
  STableReaderMgt   *pMgt = (STableReaderMgt *)p->pReaderMgt;
11,020,020✔
784
  SBtableMetaReader *pMeta = p->pMetaReader;
11,020,020✔
785

786
  code = tableMetaReaderLoadBlockMeta(pMeta, seq, &block);
11,020,020✔
787
  TSDB_CHECK_CODE(code, lino, _error);
11,020,020✔
788

789
  SBlockWrapper wrapper = {0};
11,020,020✔
790
  SBlkHandle    blkhandle = {.offset = block.offset, .size = block.size, .range = block.range};
11,020,020✔
791

792
  SCacheItem *pItem = NULL;
11,020,020✔
793
  code = blockCacheGet(pMgt->pBlockCache, &blkhandle.range, (void **)&pItem);
11,020,020✔
794
  if (code != 0) {
11,020,020✔
795
    code = blockWrapperInit(&wrapper, block.size + 16);
2,629✔
796
    TSDB_CHECK_CODE(code, lino, _error);
2,629✔
797

798
    bseDebug("block size:%" PRId64 ", offset:%" PRId64 ", [sseq:%" PRId64 ", eseq:%" PRId64 "]", block.size,
2,629✔
799
             block.offset, block.range.sseq, block.range.eseq);
800

801
    code = tableLoadBlock(p->pDataFile, &blkhandle, &wrapper);
2,629✔
802
    if (code != 0) {
2,629✔
803
      blockWrapperCleanup(&wrapper);
×
804
      TSDB_CHECK_CODE(code, lino, _error);
×
805
    }
806

807
    code = blockCachePut(pMgt->pBlockCache, &block.range, wrapper.data);
2,629✔
808
    TSDB_CHECK_CODE(code, lino, _error);
2,629✔
809

810
  } else {
811
    wrapper.data = pItem->pItem;
11,017,391✔
812
    wrapper.pCachItem = pItem;
11,017,391✔
813
  }
814

815
  code = blockSeek(wrapper.data, seq, pValue, len);
11,020,020✔
816
  TSDB_CHECK_CODE(code, lino, _error);
11,020,020✔
817

818
  if (wrapper.pCachItem != NULL) {
11,020,020✔
819
    bseCacheUnrefItem(wrapper.pCachItem);
11,017,391✔
820
  }
821
  blockWrapperClearMeta(&wrapper);
11,020,020✔
822

823
_error:
11,020,020✔
824
  if (code != 0) {
11,020,020✔
825
    bseError("failed to get table reader data at line %d since %s", lino, tstrerror(code));
×
826
  }
827
  return code;
11,020,020✔
828
}
829
int32_t tableReaderGetMeta(STableReader *p, SArray **pMeta) {
×
830
  int32_t code = 0;
×
831
  int32_t lino = 0;
×
832

833
  SArray *pMetaHandle = taosArrayInit(128, sizeof(SBlkHandle));
×
834
  if (pMetaHandle == NULL) {
×
835
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
836
  }
837

838
  code = tableMetaReaderLoadAllDataHandle(p->pMetaReader, pMetaHandle);
×
839
  TSDB_CHECK_CODE(code, lino, _error);
×
840

841
  *pMeta = pMetaHandle;
×
842

843
_error:
×
844
  if (code != 0) {
×
845
    bseError("failed to get table reader meta at lino %d since %s", lino, tstrerror(code));
×
846
  }
847
  return code;
×
848
}
849

850
void tableReaderClose(STableReader *p) {
11,020,020✔
851
  if (p == NULL) return;
11,020,020✔
852
  int32_t code = 0;
11,020,020✔
853

854
  taosArrayDestroy(p->pMetaHandle);
11,020,020✔
855

856
  if (taosCloseFile(&p->pDataFile) != 0) {
11,020,020✔
857
    bseError("failed to close table reader file %s since %s", p->name, tstrerror(terrno));
×
858
  }
859
  tableMetaReaderClose(p->pMetaReader);
11,020,020✔
860
  blockWrapperCleanup(&p->blockWrapper);
11,020,020✔
861

862
  taosMemoryFree(p);
11,020,020✔
863
}
864

865
int32_t blockCreate(int32_t cap, SBlock **p) {
×
866
  int32_t code = 0;
×
867
  SBlock *t = taosMemCalloc(1, cap);
×
868
  if (t == NULL) {
×
869
    return terrno;
×
870
  }
871
  *p = t;
×
872
  return code;
×
873
}
874

875
int32_t blockEsimateSize(SBlock *p, int32_t extra) { return BLOCK_TOTAL_SIZE(p) + extra; }
22,367,308✔
876

877
int32_t blockWrapperSize(SBlockWrapper *p, int32_t extra) {
22,361,688✔
878
  if (p == NULL || p->data == NULL) {
22,361,688✔
879
    return 0;
×
880
  }
881

882
  return p->kvSize + blockEsimateSize(p->data, extra) + 12;
22,361,688✔
883
}
884
int32_t blockAppendBatch(SBlock *p, uint8_t *value, int32_t len) {
21,971✔
885
  int32_t  code = 0;
21,971✔
886
  int32_t  offset = 0;
21,971✔
887
  uint8_t *data = (uint8_t *)p->data + p->len;
21,971✔
888
  memcpy(data, value, len);
21,971✔
889
  p->len += len;
21,971✔
890
  return len;
21,971✔
891
}
892
int32_t blockPut(SBlock *p, int64_t seq, uint8_t *value, int32_t len) {
×
893
  int32_t  code = 0;
×
894
  uint8_t *data = (uint8_t *)p->data + p->len;
×
895

896
  int32_t offset = taosEncodeVariantI64((void **)&data, seq);
×
897
  offset += taosEncodeVariantI32((void **)&data, len);
×
898
  offset += taosEncodeBinary((void **)&data, value, len);
×
899
  p->len += len;
×
900
  return offset;
×
901
}
902
void blockClear(SBlock *p) {
11,240✔
903
  p->len = 0;
11,240✔
904
  p->type = 0;
11,240✔
905
  p->data[0] = 0;
11,240✔
906
}
11,240✔
907

908
int32_t blockSeek(SBlock *p, int64_t seq, uint8_t **pValue, int32_t *len) {
11,020,020✔
909
  int8_t  found = 0;
11,020,020✔
910
  int32_t code = 0;
11,020,020✔
911
  int32_t offset = 0;
11,020,020✔
912

913
  uint8_t *p1 = (uint8_t *)p->data;
11,020,020✔
914
  uint8_t *p2 = p1 + p->offset;
11,020,020✔
915
  while (p2 - p1 < p->len) {
2,147,483,647✔
916
    int64_t k;
2,147,483,647✔
917
    int32_t v;
2,147,483,647✔
918
    p2 = taosDecodeVariantI64(p2, &k);
2,147,483,647✔
919
    p2 = taosDecodeVariantI32(p2, &v);
2,147,483,647✔
920

921
    if (seq == k) {
2,147,483,647✔
922
      *len = v;
11,020,020✔
923
      found = 1;
11,020,020✔
924
      *pValue = taosMemoryCalloc(1, v);
11,020,020✔
925
      if (*pValue == NULL) {
11,020,020✔
926
        return terrno;
×
927
      }
928
      memcpy(*pValue, (uint8_t *)p->data + offset, v);
11,020,020✔
929
      break;
11,020,020✔
930
    }
931
    offset += v;
2,147,483,647✔
932
  }
933
  if (found == 0) {
11,020,020✔
934
    code = TSDB_CODE_NOT_FOUND;
×
935
  }
936
  return code;
11,020,020✔
937
}
938

939
int32_t blockWrapperSeek(SBlockWrapper *p, int64_t tgt, uint8_t **pValue, int32_t *len) {
22,049,596✔
940
  int32_t code = 0;
22,049,596✔
941
  if (p == NULL || p->data == NULL) {
22,049,596✔
942
    return TSDB_CODE_NOT_FOUND;
×
943
  }
944
  int32_t  offset = 0;
22,049,596✔
945
  uint8_t *p1 = p->kvBuffer;
22,049,596✔
946
  uint8_t *p2 = p1;
22,049,596✔
947
  SBlock  *pBlk = (SBlock *)p->data;
22,049,596✔
948
  while ((p2 - p1) < p->kvSize) {
2,147,483,647✔
949
    int64_t seq = 0;
2,147,483,647✔
950
    int32_t vlen = 0;
2,147,483,647✔
951
    p2 = taosDecodeVariantI64(p2, &seq);
2,147,483,647✔
952
    p2 = taosDecodeVariantI32(p2, &vlen);
2,147,483,647✔
953

954
    if (seq == tgt) {
2,147,483,647✔
955
      *len = vlen;
22,049,596✔
956
      *pValue = taosMemoryCalloc(1, vlen);
22,049,596✔
957
      if (*pValue == NULL) {
22,049,596✔
958
        return terrno;
×
959
      }
960
      uint8_t *pdata = (uint8_t *)pBlk->data + offset;
22,049,596✔
961
      memcpy(*pValue, pdata, vlen);
22,049,596✔
962
      return 0;
22,049,596✔
963
    }
964
    offset += vlen;
2,147,483,647✔
965
  }
966
  bseInfo("blockWrapperSeek not found seq:%" PRId64 ", tgt:%" PRId64 "", tgt, tgt);
×
967
  return TSDB_CODE_BLOB_SEQ_NOT_FOUND;
×
968
}
969

970
int8_t blockGetType(SBlock *p) { return p->type; }
11,022,935✔
971
void   blockDestroy(SBlock *pBlock) { taosMemoryFree(pBlock); }
×
972

973
int32_t metaBlockAddIndex(SBlock *p, SBlkHandle *pInfo) {
7,369✔
974
  int32_t  code = 0;
7,369✔
975
  uint8_t *data = (uint8_t *)p->data + p->len;
7,369✔
976
  int32_t  offset = blkHandleEncode(pInfo, (char *)data);
7,369✔
977
  p->len += offset;
7,369✔
978
  return offset;
7,369✔
979
}
980

981
int32_t blkHandleEncode(SBlkHandle *pHandle, char *buf) {
18,609✔
982
  char   *p = buf;
18,609✔
983
  int32_t tlen = 0;
18,609✔
984
  tlen += taosEncodeVariantU64((void **)&p, pHandle->offset);
18,609✔
985
  tlen += taosEncodeVariantU64((void **)&p, pHandle->size);
18,609✔
986
  tlen += taosEncodeVariantI64((void **)&p, pHandle->range.sseq);
18,609✔
987
  tlen += taosEncodeVariantI64((void **)&p, pHandle->range.eseq);
18,609✔
988
  return tlen;
18,609✔
989
}
990
int32_t blkHandleDecode(SBlkHandle *pHandle, char *buf) {
48,805,141✔
991
  char *p = buf;
48,805,141✔
992
  p = taosDecodeVariantU64(p, &pHandle->offset);
48,805,141✔
993
  p = taosDecodeVariantU64(p, &pHandle->size);
48,805,141✔
994
  p = taosDecodeVariantI64(p, &pHandle->range.sseq);
48,805,141✔
995
  p = taosDecodeVariantI64(p, &pHandle->range.eseq);
48,805,141✔
996
  return p - buf;
48,805,141✔
997
}
998

999
// | meta handle | index handle | padding | magic number high | magic number low |
1000
int32_t footerEncode(STableFooter *pFooter, char *buf) {
5,620✔
1001
  char   *p = buf;
5,620✔
1002
  int32_t len = 0;
5,620✔
1003
  len += blkHandleEncode(pFooter->metaHandle, p + len);
5,620✔
1004
  len += blkHandleEncode(pFooter->indexHandle, p + len);
5,620✔
1005

1006
  p = buf + kEncodeLen - 8;
5,620✔
1007
  len += taosEncodeFixedU32((void **)&p, kMagicNum);
5,620✔
1008
  len += taosEncodeFixedU32((void **)&p, kMagicNum);
5,620✔
1009
  return 0;
5,620✔
1010
}
1011
int32_t footerDecode(STableFooter *pFooter, char *buf) {
11,021,186✔
1012
  int32_t  code = 0;
11,021,186✔
1013
  char    *p = buf;
11,021,186✔
1014
  char    *mp = buf + kEncodeLen - 8;
11,021,186✔
1015
  uint32_t ml, mh;
11,021,186✔
1016

1017
  if (taosDecodeFixedU32(mp, &ml) == NULL) {
11,021,186✔
1018
    return TSDB_CODE_FILE_CORRUPTED;
×
1019
  }
1020
  if (taosDecodeFixedU32(mp + 4, &mh) == NULL) {
22,042,372✔
1021
    return TSDB_CODE_FILE_CORRUPTED;
×
1022
  }
1023

1024
  if (ml != kMagicNum || mh != kMagicNum) {
11,021,186✔
1025
    return TSDB_CODE_FILE_CORRUPTED;
×
1026
  }
1027

1028
  int32_t len = blkHandleDecode(pFooter->metaHandle, buf);
11,021,186✔
1029
  if (len < 0) {
11,021,186✔
1030
    return TSDB_CODE_FILE_CORRUPTED;
×
1031
  }
1032

1033
  len = blkHandleDecode(pFooter->indexHandle, buf + len);
11,021,186✔
1034
  if (len < 0) {
11,021,186✔
1035
    return TSDB_CODE_FILE_CORRUPTED;
×
1036
  }
1037
  return code;
11,021,186✔
1038
}
1039

1040
int32_t blockSeekMeta(SBlock *pBlock, int64_t seq, SMetaBlock *pMeta) {
11,020,020✔
1041
  int32_t  code = 0;
11,020,020✔
1042
  int32_t  len = 0;
11,020,020✔
1043
  uint8_t *p = (uint8_t *)pBlock->data;
11,020,020✔
1044

1045
  while (len < pBlock->len) {
11,020,020✔
1046
    SMetaBlock meta = {0};
11,020,020✔
1047
    int32_t    offset = metaBlockDecode(&meta, (char *)p);
11,020,020✔
1048
    if (seqRangeContains(&meta.range, seq)) {
11,020,020✔
1049
      memcpy(pMeta, &meta, sizeof(SMetaBlock));
11,020,020✔
1050
      return 0;
11,020,020✔
1051
    }
1052
    len += offset;
×
1053
    p += offset;
×
1054
  }
1055
  return TSDB_CODE_NOT_FOUND;
×
1056
}
1057
int32_t blockGetAllMeta(SBlock *pBlock, SArray *pMeta) {
×
1058
  int32_t  code = 0;
×
1059
  int32_t  len = 0;
×
1060
  uint8_t *p = (uint8_t *)pBlock->data;
×
1061

1062
  while (len < pBlock->len) {
×
1063
    SMetaBlock meta = {0};
×
1064
    int32_t    offset = metaBlockDecode(&meta, (char *)p);
×
1065
    if (taosArrayPush(pMeta, &meta) == NULL) {
×
1066
      return terrno;
×
1067
    }
1068
    len += offset;
×
1069
    p += offset;
×
1070
  }
1071

1072
  return code;
×
1073
}
1074

1075
int32_t metaBlockEncode(SMetaBlock *pMeta, char *buf) {
5,620✔
1076
  char   *p = buf;
5,620✔
1077
  int32_t len = 0;
5,620✔
1078
  len += taosEncodeFixedI8((void **)&p, pMeta->type);
5,620✔
1079
  len += taosEncodeFixedI8((void **)&p, pMeta->version);
5,620✔
1080
  len += taosEncodeFixedI16((void **)&p, pMeta->reserve);
5,620✔
1081
  len += taosEncodeVariantI64((void **)&p, pMeta->offset);
5,620✔
1082
  len += taosEncodeVariantI64((void **)&p, pMeta->size);
5,620✔
1083
  len += taosEncodeVariantI64((void **)&p, pMeta->range.sseq);
5,620✔
1084
  len += taosEncodeVariantI64((void **)&p, pMeta->range.eseq);
5,620✔
1085
  return len;
5,620✔
1086
}
1087
int32_t metaBlockDecode(SMetaBlock *pMeta, char *buf) {
11,020,020✔
1088
  char   *p = buf;
11,020,020✔
1089
  int32_t len = 0;
11,020,020✔
1090
  p = taosDecodeFixedI8(p, &pMeta->type);
11,020,020✔
1091
  p = taosDecodeFixedI8(p, &pMeta->version);
11,020,020✔
1092
  p = taosDecodeFixedI16(p, &pMeta->reserve);
11,020,020✔
1093
  p = taosDecodeVariantI64(p, &pMeta->offset);
11,020,020✔
1094
  p = taosDecodeVariantI64(p, &pMeta->size);
11,020,020✔
1095
  p = taosDecodeVariantI64(p, &pMeta->range.sseq);
11,020,020✔
1096
  p = taosDecodeVariantI64(p, &pMeta->range.eseq);
11,020,020✔
1097
  return p - buf;
11,020,020✔
1098
}
1099
int32_t metaBlockAdd(SBlock *p, SMetaBlock *pBlk) {
5,620✔
1100
  int32_t  code = 0;
5,620✔
1101
  uint8_t *data = (uint8_t *)p->data + p->len;
5,620✔
1102
  int32_t  offset = metaBlockEncode(pBlk, (char *)data);
5,620✔
1103
  p->len += offset;
5,620✔
1104
  return offset;
5,620✔
1105
}
1106
int32_t metaBlockGet(SBlock *p, SMetaBlock *pBlk) {
×
1107
  int32_t  code = 0;
×
1108
  uint8_t *data = (uint8_t *)p->data + p->len;
×
1109
  int32_t  offset = metaBlockDecode(pBlk, (char *)data);
×
1110
  p->len += offset;
×
1111
  return offset;
×
1112
}
1113

1114
int32_t tableFlushBlock(TdFilePtr pFile, SBlkHandle *pHandle, SBlockWrapper *pBlkW, int32_t *nWrite) {
12,989✔
1115
  int32_t code = 0;
12,989✔
1116
  int32_t lino = 0;
12,989✔
1117

1118
  SBlock *pBlk = pBlkW->data;
12,989✔
1119
  if (pBlk->len == 0) {
12,989✔
1120
    return 0;
×
1121
  }
1122
  pBlk->version = BSE_META_VER;
12,989✔
1123
  int8_t compressType = kNoCompres;
12,989✔
1124

1125
  SBlockWrapper wrapper = {0};
12,989✔
1126
  uint8_t      *encryptBuf = NULL;
12,989✔
1127

1128
  uint8_t *pWrite = (uint8_t *)pBlk;
12,989✔
1129
  int32_t  len = BLOCK_TOTAL_SIZE(pBlk);
12,989✔
1130
  int32_t  plainLen = len;
12,989✔
1131

1132
  BLOCK_SET_COMPRESS_TYPE(pBlk, compressType);
12,989✔
1133
  BLOCK_SET_ROW_SIZE(pBlk, BLOCK_ROW_SIZE(pBlk));
12,989✔
1134

1135
  if (compressType != kNoCompres) {
12,989✔
1136
    code = blockWrapperInit(&wrapper, len + 4);
×
1137
    TSDB_CHECK_CODE(code, lino, _error);
×
1138

1139
    int32_t compressSize = wrapper.cap;
×
1140
    code = bseCompressData(compressType, pWrite, BLOCK_ROW_SIZE(pBlk), wrapper.data, &compressSize);
×
1141
    if (code != 0) {
×
1142
      bseWarn("failed to compress data since %s, not set compress", tstrerror(TSDB_CODE_THIRDPARTY_ERROR));
×
1143

1144
      blockWrapperCleanup(&wrapper);
×
1145
      BLOCK_SET_COMPRESS_TYPE(pBlk, kNoCompres);
×
1146
      BLOCK_SET_ROW_SIZE(pBlk, BLOCK_ROW_SIZE(pBlk));
×
1147
    } else {
1148
      int32_t rawSize = BLOCK_ROW_SIZE(pBlk);
×
1149
      COMREPSS_DATA_SET_TYPE_AND_RAWLEN(wrapper.data, compressSize, compressType, rawSize);
×
1150
      len = compressSize + BLOCK_TAIL_LEN;
×
1151

1152
      pWrite = (uint8_t *)wrapper.data;
×
1153
      plainLen = len;
×
1154
    }
1155
  }
1156

1157
  // Encrypt data if encryption is enabled (before checksum)
1158
  SBse *pBse = (SBse *)pBlkW->pBse;
12,989✔
1159
  if (pBse != NULL && pBse->cfg.encryptKey[0] != '\0') {
12,989✔
1160
    // Encrypt data excluding BLOCK_TAIL_LEN (to keep compression info readable)
1161
    // plainLen includes checksum, subtract BLOCK_TAIL_LEN
1162
    int32_t plainDataLen = plainLen - BLOCK_TAIL_LEN;
×
1163
    int32_t cryptedLen = ENCRYPTED_LEN(plainDataLen);
×
1164

1165
    // Allocate buffer for encrypted data + BLOCK_TAIL_LEN
1166
    encryptBuf = taosMemoryMalloc(cryptedLen + BLOCK_TAIL_LEN);
×
1167
    if (encryptBuf == NULL) {
×
1168
      TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1169
    }
1170

1171
    // Pad and encrypt data part only
1172
    (void)memset(encryptBuf, 0, cryptedLen);
×
1173
    (void)memcpy(encryptBuf, pWrite, plainDataLen);
×
1174

1175
    // Encrypt using CBC
1176
    SCryptOpts opts = {0};
×
1177
    opts.len = cryptedLen;
×
1178
    opts.source = (char *)encryptBuf;
×
1179
    opts.result = (char *)encryptBuf;  // Encrypt in place
×
1180
    opts.unitLen = 16;
×
1181
    opts.pOsslAlgrName = pBse->cfg.encryptAlgrName;
×
1182
    tstrncpy((char *)opts.key, pBse->cfg.encryptKey, ENCRYPT_KEY_LEN + 1);
×
1183

1184
    int32_t count = CBC_Encrypt(&opts);
×
1185
    if (count != opts.len) {
×
1186
      TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1187
    }
1188

1189
    // Copy BLOCK_TAIL_LEN (compression info) unencrypted
1190
    (void)memcpy(encryptBuf + cryptedLen, pWrite + plainDataLen, BLOCK_TAIL_LEN);
×
1191

1192
    pWrite = encryptBuf;
×
1193
    len = cryptedLen + BLOCK_TAIL_LEN;
×
1194
  }
1195

1196
  code = taosCalcChecksumAppend(0, (uint8_t *)pWrite, len);
12,989✔
1197
  TSDB_CHECK_CODE(code, lino, _error);
12,989✔
1198

1199
  int64_t n = taosLSeekFile(pFile, pHandle->offset, SEEK_SET);
12,989✔
1200
  if (n < 0) {
12,989✔
1201
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1202
  }
1203

1204
  int32_t nwrite = taosWriteFile(pFile, (uint8_t *)pWrite, len);
12,989✔
1205
  if (nwrite != len) {
12,989✔
1206
    code = terrno;
×
1207
    TSDB_CHECK_CODE(code, lino, _error);
×
1208
  }
1209
  *nWrite = nwrite;
12,989✔
1210
  blockWrapperCleanup(&wrapper);
12,989✔
1211
  taosMemoryFree(encryptBuf);
12,989✔
1212
_error:
12,989✔
1213
  if (code != 0) {
12,989✔
1214
    bseError("failed to flush table builder at line %d since %s", lino, tstrerror(code));
×
1215
  } else {
1216
    bseDebug("flush at offset %" PRId64 ", size %d", pHandle->offset, len);
12,989✔
1217
  }
1218
  return code;
12,989✔
1219
}
1220
int32_t tableLoadBlock(TdFilePtr pFile, SBlkHandle *pHandle, SBlockWrapper *pBlkW) {
22,045,584✔
1221
  int32_t code = 0;
22,045,584✔
1222
  int32_t lino = 0;
22,045,584✔
1223

1224
  code = blockWrapperResize(pBlkW, pHandle->size + 16);
22,045,584✔
1225
  TSDB_CHECK_CODE(code, lino, _error);
22,045,584✔
1226

1227
  SBlock  *pBlk = pBlkW->data;
22,045,584✔
1228
  uint8_t *pRead = (uint8_t *)pBlk;
22,045,584✔
1229

1230
  SBlockWrapper pHelp = {0};
22,045,584✔
1231
  uint8_t      *decryptBuf = NULL;
22,045,584✔
1232

1233
  int64_t n = taosLSeekFile(pFile, pHandle->offset, SEEK_SET);
22,045,584✔
1234
  if (n < 0) {
22,045,584✔
1235
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1236
  }
1237

1238
  int32_t nr = taosReadFile(pFile, pRead, pHandle->size);
22,045,584✔
1239
  if (nr != pHandle->size) {
22,045,584✔
1240
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _error);
×
1241
  }
1242

1243
  if (taosCheckChecksumWhole((uint8_t *)pRead, pHandle->size) != 1) {
44,091,168✔
1244
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _error);
×
1245
  }
1246

1247
  // Decrypt data if encryption is enabled
1248
  SBse   *pBse = (SBse *)pBlkW->pBse;
22,045,584✔
1249
  int32_t dataLen = pHandle->size;
22,045,584✔
1250
  if (pBse != NULL && pBse->cfg.encryptKey[0] != '\0') {
22,045,584✔
1251
    // Data is encrypted (excluding BLOCK_TAIL_LEN), decrypt it
1252
    int32_t cryptedLen = pHandle->size - BLOCK_TAIL_LEN;
×
1253
    decryptBuf = taosMemoryMalloc(cryptedLen);
×
1254
    if (decryptBuf == NULL) {
×
1255
      TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1256
    }
1257

1258
    // Decrypt using CBC
1259
    SCryptOpts opts = {0};
×
1260
    opts.len = cryptedLen;
×
1261
    opts.source = (char *)pRead;
×
1262
    opts.result = (char *)decryptBuf;
×
1263
    opts.unitLen = 16;
×
1264
    opts.pOsslAlgrName = pBse->cfg.encryptAlgrName;
×
1265
    tstrncpy(opts.key, pBse->cfg.encryptKey, ENCRYPT_KEY_LEN + 1);
×
1266

1267
    int32_t count = CBC_Decrypt(&opts);
×
1268
    if (count != cryptedLen) {
×
1269
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _error);
×
1270
    }
1271

1272
    // Copy decrypted data back, BLOCK_TAIL_LEN remains unencrypted at the end
1273
    (void)memcpy(pRead, decryptBuf, cryptedLen);
×
1274
    // BLOCK_TAIL_LEN at pRead + cryptedLen is already in place (unencrypted)
1275
  }
1276

1277
  uint8_t compressType = 0;
22,045,584✔
1278
  int32_t rawSize = 0;
22,045,584✔
1279

1280
  // Get compression info from decrypted/plain data
1281
  COMPRESS_DATA_GET_TYPE_AND_RAWLEN(pRead, dataLen, compressType, rawSize);
22,045,584✔
1282

1283
  if (compressType != kNoCompres) {
22,045,584✔
1284
    code = blockWrapperInit(&pHelp, rawSize);
2,629✔
1285
    TSDB_CHECK_CODE(code, lino, _error);
2,629✔
1286

1287
    int32_t unCompressSize = pHelp.cap;
2,629✔
1288
    code = bseDecompressData(compressType, pRead, dataLen - BLOCK_TAIL_LEN, pHelp.data, &unCompressSize);
2,629✔
1289
    if (code != 0) {
2,629✔
1290
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _error);
×
1291
    }
1292

1293
    SBlock *p = pHelp.data;
2,629✔
1294
    if (BLOCK_ROW_SIZE_OFFSET(p) != unCompressSize) {
2,629✔
1295
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _error);
×
1296
    }
1297
    blockWrapperCleanup(pBlkW);
2,629✔
1298

1299
    blockWrapperTransfer(pBlkW, &pHelp);
2,629✔
1300

1301
  } else {
1302
    // For uncompressed data, use rawSize (which equals actual data size for no compression)
1303
    // rawSize is stored in BLOCK_TAIL_LEN and is always correct even after encryption
1304
    pBlk = pBlkW->data;
22,042,955✔
1305
    int32_t expectedLen = rawSize - sizeof(SBlock);
22,042,955✔
1306
    if (pBlk->len != expectedLen) {
22,042,955✔
1307
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _error);
×
1308
    }
1309
  }
1310
_error:
22,045,584✔
1311
  if (code != 0) {
22,045,584✔
1312
    bseError("failed to load block at lino %d since %s, read at offset %" PRId64 ", size:%" PRId64 "", lino,
×
1313
             tstrerror(code), pHandle->offset, pHandle->size);
1314
  } else {
1315
    bseDebug("read at offset %" PRId64 ", size %" PRId64 "", pHandle->offset, pHandle->size);
22,045,584✔
1316
  }
1317

1318
  blockWrapperCleanup(&pHelp);
22,045,584✔
1319
  taosMemoryFree(decryptBuf);
22,045,584✔
1320
  return code;
22,045,584✔
1321
}
1322
int32_t tableLoadRawBlock(TdFilePtr pFile, SBlkHandle *pHandle, SBlockWrapper *pBlkW, int8_t checkSum) {
×
1323
  int32_t code = 0;
×
1324
  int32_t lino = 0;
×
1325

1326
  SBlock  *pBlk = pBlkW->data;
×
1327
  uint8_t *pRead = (uint8_t *)pBlk + sizeof(SBseSnapMeta);
×
1328

1329
  int64_t n = taosLSeekFile(pFile, pHandle->offset, SEEK_SET);
×
1330
  if (n < 0) {
×
1331
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1332
  }
1333

1334
  int32_t nr = taosReadFile(pFile, pRead, pHandle->size);
×
1335
  if (nr != pHandle->size) {
×
1336
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _error);
×
1337
  }
1338

1339
  if (checkSum) {
×
1340
    if (taosCheckChecksumWhole((uint8_t *)pRead, pHandle->size) != 1) {
×
1341
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _error);
×
1342
    }
1343
  }
1344

1345
  pBlkW->size = pHandle->size + sizeof(SBseSnapMeta);
×
1346
_error:
×
1347
  if (code != 0) {
×
1348
    bseError("failed to load block at lino %d since %s", lino, tstrerror(code));
×
1349
  }
1350
  return code;
×
1351
}
1352

1353
int8_t seqRangeContains(SSeqRange *p, int64_t seq) { return seq >= p->sseq && seq <= p->eseq; }
65,610,116✔
1354

1355
void seqRangeReset(SSeqRange *p) {
34,614✔
1356
  p->sseq = -1;
34,614✔
1357
  p->eseq = -1;
34,614✔
1358
}
34,614✔
1359

UNCOV
1360
int8_t seqRangeIsGreater(SSeqRange *p, int64_t seq) { return seq > p->eseq; }
×
1361

1362
void seqRangeUpdate(SSeqRange *dst, SSeqRange *src) {
67,182,854✔
1363
  if (dst->sseq == -1) {
67,182,854✔
1364
    dst->sseq = src->sseq;
30,418✔
1365
  }
1366
  dst->eseq = src->eseq;
67,182,854✔
1367
}
67,182,854✔
1368

1369
int32_t blockWrapperInit(SBlockWrapper *p, int32_t cap) {
22,076,899✔
1370
  int32_t code = 0;
22,076,899✔
1371
  int32_t lino = 0;
22,076,899✔
1372
  p->data = taosMemoryCalloc(1, cap);
22,076,899✔
1373
  if (p->data == NULL) {
22,076,899✔
1374
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1375
  }
1376

1377
  p->kvSize = 0;
22,076,899✔
1378
  p->kvCap = 128;
22,076,899✔
1379
  p->kvBuffer = taosMemoryCalloc(1, p->kvCap);
22,076,899✔
1380
  if (p->kvBuffer == NULL) {
22,076,899✔
1381
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1382
  }
1383

1384
  SBlock *block = (SBlock *)p->data;
22,076,899✔
1385
  block->offset = 0;
22,076,899✔
1386
  block->version = 0;
22,076,899✔
1387
  p->cap = cap;
22,076,899✔
1388
_error:
22,076,899✔
1389
  if (code != 0) {
22,076,899✔
1390
    blockWrapperCleanup(p);
×
1391
  }
1392
  return code;
22,076,899✔
1393
}
1394
int32_t blockWrapperPushMeta(SBlockWrapper *p, int64_t seq, uint8_t *value, int32_t len) {
22,361,688✔
1395
  int32_t code = 0;
22,361,688✔
1396
  if ((p->kvSize + 12) > p->kvCap) {
22,361,688✔
1397
    if (p->kvCap == 0) {
20,856✔
1398
      p->kvCap = 128;
×
1399
    } else {
1400
      p->kvCap *= 2;
20,856✔
1401
    }
1402

1403
    void *data = taosMemoryRealloc(p->kvBuffer, p->kvCap);
20,856✔
1404
    if (data == NULL) {
20,856✔
1405
      return terrno;
×
1406
    }
1407
    p->kvBuffer = data;
20,856✔
1408
  }
1409
  uint8_t *data = (uint8_t *)p->kvBuffer + p->kvSize;
22,361,688✔
1410
  p->kvSize += taosEncodeVariantI64((void **)&data, seq);
22,361,688✔
1411
  p->kvSize += taosEncodeVariantI32((void **)&data, len);
22,361,688✔
1412
  return code;
22,361,688✔
1413
}
1414

1415
void blockWrapperClearMeta(SBlockWrapper *p) {
11,020,020✔
1416
  if (p->kvBuffer != NULL) {
11,020,020✔
1417
    taosMemoryFree(p->kvBuffer);
2,629✔
1418
  }
1419
  p->kvSize = 0;
11,020,020✔
1420
  p->kvCap = 0;
11,020,020✔
1421
}
11,020,020✔
1422

1423
void blockWrapperCleanup(SBlockWrapper *p) {
44,133,283✔
1424
  if (p->data != NULL) {
44,133,283✔
1425
    taosMemoryFree(p->data);
22,074,270✔
1426
    p->data = NULL;
22,074,270✔
1427
  }
1428
  p->kvSize = 0;
44,133,283✔
1429
  taosMemoryFreeClear(p->kvBuffer);
44,133,283✔
1430
  p->cap = 0;
44,133,283✔
1431
}
44,133,283✔
1432

1433
void blockWrapperTransfer(SBlockWrapper *dst, SBlockWrapper *src) {
2,629✔
1434
  if (dst == NULL || src == NULL) {
2,629✔
1435
    return;
×
1436
  }
1437
  dst->data = src->data;
2,629✔
1438
  dst->cap = src->cap;
2,629✔
1439

1440
  dst->kvBuffer = src->kvBuffer;
2,629✔
1441
  dst->kvSize = src->kvSize;
2,629✔
1442
  dst->kvCap = src->kvCap;
2,629✔
1443

1444
  src->kvBuffer = NULL;
2,629✔
1445
  src->kvSize = 0;
2,629✔
1446
  src->kvCap = 0;
2,629✔
1447

1448
  src->data = NULL;
2,629✔
1449
  src->cap = 0;
2,629✔
1450
}
1451

1452
int32_t blockWrapperResize(SBlockWrapper *p, int32_t newCap) {
22,086,604✔
1453
  if (p->cap < newCap) {
22,086,604✔
1454
    int32_t cap = p->cap;
5,620✔
1455
    if (cap == 0) cap = 1024;
5,620✔
1456
    while (cap < newCap) {
84,300✔
1457
      cap = cap * 2;
78,680✔
1458
    }
1459
    void *data = taosMemoryRealloc(p->data, cap);
5,620✔
1460
    if (data == NULL) {
5,620✔
1461
      return terrno;
×
1462
    }
1463
    p->data = data;
5,620✔
1464
    p->cap = cap;
5,620✔
1465
  }
1466
  return 0;
22,086,604✔
1467
}
1468

1469
void blockWrapperClear(SBlockWrapper *p) {
11,240✔
1470
  if (p->data == NULL) {
11,240✔
1471
    return;
×
1472
  }
1473
  SBlock *block = (SBlock *)p->data;
11,240✔
1474
  p->kvSize = 0;
11,240✔
1475
  p->size = 0;
11,240✔
1476
  blockClear(block);
11,240✔
1477
}
1478

1479
void blockWrapperSetType(SBlockWrapper *p, int8_t type) {
12,989✔
1480
  SBlock *block = (SBlock *)p->data;
12,989✔
1481
  block->type = type;
12,989✔
1482
}
12,989✔
1483

1484
int32_t tableReaderIterInit(int64_t timestamp, int8_t type, STableReaderIter **ppIter, SBse *pBse) {
×
1485
  int32_t    code = 0;
×
1486
  int32_t    lino = 0;
×
1487
  STableMgt *pTableMgt = pBse->pTableMgt;
×
1488

1489
  STableReaderIter *p = taosMemCalloc(1, sizeof(STableReaderIter));
×
1490
  if (p == NULL) {
×
1491
    return terrno;
×
1492
  }
1493

1494
  p->timestamp = timestamp;
×
1495
  SSubTableMgt *retentionMgt = NULL;
×
1496

1497
  code = createSubTableMgt(timestamp, 1, pBse->pTableMgt, &retentionMgt);
×
1498
  TSDB_CHECK_CODE(code, lino, _error);
×
1499

1500
  p->pSubMgt = retentionMgt;
×
1501

1502
  code = tableReaderOpen(timestamp, &p->pTableReader, retentionMgt->pReaderMgt);
×
1503
  TSDB_CHECK_CODE(code, lino, _error);
×
1504

1505
  tableReaderShouldPutToCache(p->pTableReader, 0);
×
1506

1507
  p->blockIndex = 0;
×
1508
  p->blockType = type;
×
1509

1510
  if (p->blockType == BSE_TABLE_DATA_TYPE) {
×
1511
    code = tableReaderGetMeta(p->pTableReader, &p->pMetaHandle);
×
1512
    TSDB_CHECK_CODE(code, lino, _error);
×
1513

1514
  } else if (p->blockType == BSE_TABLE_META_TYPE) {
×
1515
    p->pMetaHandle = taosArrayInit(8, sizeof(SBlkHandle));
×
1516
    if (p->pMetaHandle == NULL) {
×
1517
      TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1518
    }
1519
    code = tableMetaReaderLoadMetaHandle(p->pTableReader->pMetaReader, p->pMetaHandle);
×
1520
  } else {
1521
    p->isOver = 1;
×
1522
  }
1523
  *ppIter = p;
×
1524

1525
_error:
×
1526
  if (code != 0) {
×
1527
    bseError("failed to init table reader iter since %s", tstrerror(code));
×
1528
    tableReaderIterDestroy(p);
×
1529
  }
1530
  return code;
×
1531
}
1532

1533
int32_t tableReaderIterNext(STableReaderIter *pIter, uint8_t **pValue, int32_t *len) {
×
1534
  int32_t      code = 0;
×
1535
  int32_t      lino = 0;
×
1536
  SBseSnapMeta snapMeta = {0};
×
1537
  snapMeta.range.sseq = -1;
×
1538
  snapMeta.range.eseq = -1;
×
1539
  snapMeta.timestamp = pIter->timestamp;
×
1540
  snapMeta.fileType = pIter->fileType;
×
1541
  snapMeta.blockType = pIter->blockType;
×
1542

1543
  if (pIter->blockType == BSE_TABLE_DATA_TYPE) {
×
1544
    SBlkHandle *pHandle = NULL;
×
1545
    if (pIter->blockIndex >= taosArrayGetSize(pIter->pMetaHandle)) {
×
1546
      taosArrayDestroy(pIter->pMetaHandle);
×
1547
      pIter->pMetaHandle = NULL;
×
1548
      pIter->blockIndex = 0;
×
1549
      pIter->isOver = 1;
×
1550
      return 0;
×
1551
    } else {
1552
      pHandle = taosArrayGet(pIter->pMetaHandle, pIter->blockIndex);
×
1553
      bseDebug("file type %d, block type: %d,block index %d, offset %" PRId64 ", size %" PRId64 ", range [%" PRId64
×
1554
               ", %" PRId64 "]",
1555
               pIter->fileType, pIter->blockType, pIter->blockIndex, pHandle->offset, pHandle->size,
1556
               pHandle->range.sseq, pHandle->range.eseq);
1557
      code = tableReaderLoadRawBlock(pIter->pTableReader, pHandle, &pIter->blockWrapper);
×
1558
      TSDB_CHECK_CODE(code, lino, _error);
×
1559

1560
      pIter->blockIndex++;
×
1561
    }
1562

1563
  } else if (pIter->blockType == BSE_TABLE_META_TYPE) {
×
1564
    SBlkHandle *pHandle = NULL;
×
1565
    if (pIter->blockIndex >= taosArrayGetSize(pIter->pMetaHandle)) {
×
1566
      taosArrayDestroy(pIter->pMetaHandle);
×
1567
      pIter->pMetaHandle = NULL;
×
1568
      pIter->blockIndex = 0;
×
1569
      pIter->blockType = BSE_TABLE_META_INDEX_TYPE;
×
1570
    } else {
1571
      pHandle = taosArrayGet(pIter->pMetaHandle, pIter->blockIndex);
×
1572

1573
      bseDebug("file type %d, block type: %d,block index %d, offset %" PRId64 ", size %" PRId64 ", range [%" PRId64
×
1574
               ", %" PRId64 "]",
1575
               pIter->fileType, pIter->blockType, pIter->blockIndex, pHandle->offset, pHandle->size,
1576
               pHandle->range.sseq, pHandle->range.eseq);
1577
      code = tableReaderLoadRawMeta(pIter->pTableReader, pHandle, &pIter->blockWrapper);
×
1578
      TSDB_CHECK_CODE(code, lino, _error);
×
1579
      pIter->blockIndex++;
×
1580
    }
1581
  }
1582

1583
  if (pIter->blockType == BSE_TABLE_META_INDEX_TYPE) {
×
1584
    code = tableReaderLoadRawMetaIndex(pIter->pTableReader, &pIter->blockWrapper);
×
1585
    TSDB_CHECK_CODE(code, lino, _error);
×
1586

1587
    pIter->blockType = BSE_TABLE_FOOTER_TYPE;
×
1588
  } else if (pIter->blockType == BSE_TABLE_FOOTER_TYPE) {
×
1589
    code = tableReaderLoadRawFooter(pIter->pTableReader, &pIter->blockWrapper);
×
1590
    TSDB_CHECK_CODE(code, lino, _error);
×
1591

1592
    pIter->blockType = BSE_TABLE_END_TYPE;
×
1593
  } else if (pIter->blockType == BSE_TABLE_END_TYPE) {
×
1594
    pIter->isOver = 1;
×
1595
  }
1596

1597
_error:
×
1598
  if (code != 0) {
×
1599
    bseError("failed to load block since %s", tstrerror(code));
×
1600
    pIter->isOver = 1;
×
1601
  }
1602
  SSeqRange range = {0};
×
1603
  if (pIter->blockWrapper.data != NULL) {
×
1604
    updateSnapshotMeta(&pIter->blockWrapper, range, pIter->fileType, pIter->blockType, snapMeta.timestamp);
×
1605
    *pValue = pIter->blockWrapper.data;
×
1606
    *len = pIter->blockWrapper.size;
×
1607
  }
1608
  return code;
×
1609
}
1610

1611
int8_t tableReaderIterValid(STableReaderIter *pIter) { return pIter->isOver == 0; }
×
1612

1613
int32_t bseReadCurrentSnap(SBse *pBse, uint8_t **pValue, int32_t *len) {
×
1614
  int32_t   code = 0;
×
1615
  char      path[128] = {0};
×
1616
  int32_t   lino = 0;
×
1617
  TdFilePtr fd = NULL;
×
1618
  int64_t   sz = 0;
×
1619
  char      name[TSDB_FILENAME_LEN] = {0};
×
1620

1621
  uint8_t *pCurrent = NULL;
×
1622

1623
  bseBuildCurrentFullName(pBse, name);
×
1624
  if (taosCheckExistFile(name) == 0) {
×
1625
    bseInfo("vgId:%d, no current meta file found, skip recover", BSE_VGID(pBse));
×
1626
    return 0;
×
1627
  }
1628
  code = taosStatFile(name, &sz, NULL, NULL);
×
1629
  TSDB_CHECK_CODE(code, lino, _error);
×
1630

1631
  fd = taosOpenFile(name, TD_FILE_READ);
×
1632
  if (fd == NULL) {
×
1633
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1634
  }
1635
  pCurrent = (uint8_t *)taosMemoryCalloc(1, sizeof(SBseSnapMeta) + sz);
×
1636
  if (pCurrent == NULL) {
×
1637
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1638
  }
1639

1640
  int64_t nread = taosReadFile(fd, pCurrent + sizeof(SBseSnapMeta), sz);
×
1641
  if (nread != sz) {
×
1642
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1643
  }
1644
  if (taosCloseFile(&fd) != 0) {
×
1645
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1646
  }
1647

1648
  SBseSnapMeta *pMeta = (SBseSnapMeta *)(pCurrent);
×
1649
  pMeta->fileType = BSE_CURRENT_SNAP;
×
1650

1651
  *pValue = pCurrent;
×
1652

1653
  *len = sz + sizeof(SBseSnapMeta);
×
1654
_error:
×
1655
  if (code != 0) {
×
1656
    bseError("vgId:%d, failed to read current at line %d since %s", BSE_VGID(pBse), lino, tstrerror(code));
×
1657
    if (taosCloseFile(&fd) != 0) {
×
1658
      bseError("failed to close file %s since %s", name, tstrerror(terrno));
×
1659
    }
1660
    taosMemoryFree(pCurrent);
×
1661
  }
1662
  return code;
×
1663
}
1664

1665
void tableReaderIterDestroy(STableReaderIter *pIter) {
×
1666
  if (pIter == NULL) return;
×
1667

1668
  taosArrayDestroy(pIter->pMetaHandle);
×
1669
  tableReaderClose(pIter->pTableReader);
×
1670
  blockWrapperCleanup(&pIter->blockWrapper);
×
1671
  destroySubTableMgt(pIter->pSubMgt);
×
1672
  taosMemoryFree(pIter);
×
1673
}
1674

1675
int32_t blockWithMetaInit(SBlock *pBlock, SBlockWithMeta **pMeta) {
×
1676
  int32_t code = 0;
×
1677
  int32_t lino = 0;
×
1678

1679
  SBlockWithMeta *p = taosMemCalloc(1, sizeof(SBlockWithMeta));
×
1680
  if (p == NULL) {
×
1681
    return terrno;
×
1682
  }
1683
  p->pBlock = pBlock;
×
1684
  p->pMeta = taosArrayInit(8, sizeof(SBlockIndexMeta));
×
1685
  if (p->pMeta == NULL) {
×
1686
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1687
  }
1688

1689
  uint8_t *p1 = (uint8_t *)pBlock->data;
×
1690
  uint8_t *p2 = (uint8_t *)p1;
×
1691
  while (p2 - p1 < pBlock->len) {
×
1692
    int64_t         k;
×
1693
    int32_t         vlen = 0;
×
1694
    SBlockIndexMeta meta = {0};
×
1695
    int32_t         offset = 0;
×
1696
    p2 = taosDecodeVariantI64((void **)p2, &k);
×
1697
    offset = p2 - p1;
×
1698
    p2 = taosDecodeVariantI32((void **)p2, &vlen);
×
1699

1700
    meta.seq = k;
×
1701
    meta.offset = offset;
×
1702
    if (taosArrayPush(p->pMeta, &meta) == NULL) {
×
1703
      TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1704
    }
1705
    p2 += vlen;
×
1706
  }
1707

1708
  *pMeta = p;
×
1709
_error:
×
1710
  if (code != 0) {
×
1711
    bseError("failed to init block with meta since %s", tstrerror(code));
×
1712
    blockWithMetaCleanup(p);
×
1713
  }
1714
  return code;
×
1715
}
1716

1717
void blockWithMetaCleanup(SBlockWithMeta *p) {
×
1718
  if (p == NULL) return;
×
1719
  taosArrayDestroy(p->pMeta);
×
1720
  taosMemoryFree(p);
×
1721
  return;
×
1722
}
1723

1724
int comprareFunc(const void *pLeft, const void *pRight) {
×
1725
  SBlockIndexMeta *p1 = (SBlockIndexMeta *)pLeft;
×
1726
  SBlockIndexMeta *p2 = (SBlockIndexMeta *)pRight;
×
1727
  if (p1->seq > p2->seq) {
×
1728
    return 1;
×
1729
  } else if (p1->seq < p2->seq) {
×
1730
    return -1;
×
1731
  }
1732
  return 0;
×
1733
}
1734

1735
int32_t blockWithMetaSeek(SBlockWithMeta *p, int64_t seq, uint8_t **pValue, int32_t *len) {
×
1736
  int32_t         code = 0;
×
1737
  SBlockIndexMeta key = {.seq = seq, .offset = 0};
×
1738
  int32_t         idx = taosArraySearchIdx(p->pMeta, &seq, comprareFunc, TD_EQ);
×
1739
  if (idx < 0) {
×
1740
    return TSDB_CODE_NOT_FOUND;
×
1741
  }
1742
  SBlockIndexMeta *pMeta = taosArrayGet(p->pMeta, idx);
×
1743
  if (pMeta == NULL) {
×
1744
    return TSDB_CODE_NOT_FOUND;
×
1745
  }
1746

1747
  uint8_t *data = (uint8_t *)p->pBlock->data + pMeta->offset;
×
1748

1749
  data = taosDecodeVariantI32((void *)data, len);
×
1750
  if (*len <= 0) {
×
1751
    return TSDB_CODE_NOT_FOUND;
×
1752
  }
1753
  *pValue = taosMemCalloc(1, *len);
×
1754
  if (*pValue == NULL) {
×
1755
    return terrno;
×
1756
  }
1757
  memcpy(*pValue, data, *len);
×
1758

1759
  return code;
×
1760
}
1761

1762
int32_t tableMetaOpen(char *name, SBTableMeta **pMeta, void *pMetaMgt) {
6,839✔
1763
  int32_t code = 0;
6,839✔
1764
  int32_t lino = 0;
6,839✔
1765

1766
  SBTableMeta *p = taosMemCalloc(1, sizeof(SBTableMeta));
6,839✔
1767
  if (p == NULL) {
6,839✔
1768
    TSDB_CHECK_CODE(code, lino, _error);
×
1769
  }
1770

1771
  if (name != NULL) {
6,839✔
1772
    memcpy(p->name, name, strlen(name) + 1);
×
1773
  }
1774
  p->pBse = ((STableMetaMgt *)pMetaMgt)->pBse;
6,839✔
1775

1776
  p->blockCap = BSE_BLOCK_SIZE((SBse *)p->pBse);
6,839✔
1777

1778
  *pMeta = p;
6,839✔
1779
_error:
6,839✔
1780
  if (code != 0) {
6,839✔
1781
    bseError("failed to open table meta %s at line %d since %s", name, lino, tstrerror(code));
×
1782
    tableMetaClose(p);
×
1783
  }
1784

1785
  return code;
6,839✔
1786
}
1787

1788
int32_t tableMetaCommit(SBTableMeta *pMeta, SArray *pBlock) {
5,620✔
1789
  int32_t                code = 0;
5,620✔
1790
  int32_t                lino = 0;
5,620✔
1791
  SBtableMetaWriter     *pWriter = NULL;
5,620✔
1792
  SBtableMetaReader     *pReader = NULL;
5,620✔
1793
  SBtableMetaReaderIter *pIter = NULL;
5,620✔
1794

1795
  char tempMetaName[TSDB_FILENAME_LEN] = {0};
5,620✔
1796
  char metaName[TSDB_FILENAME_LEN] = {0};
5,620✔
1797

1798
  char tempMetaPath[TSDB_FILENAME_LEN] = {0};
5,620✔
1799
  char metaPath[TSDB_FILENAME_LEN] = {0};
5,620✔
1800

1801
  bseBuildTempMetaName(pMeta->timestamp, tempMetaName);
5,620✔
1802
  bseBuildMetaName(pMeta->timestamp, metaName);
5,620✔
1803

1804
  code = tableMetaWriterInit(pMeta, tempMetaName, &pWriter);
5,620✔
1805
  TSDB_CHECK_CODE(code, lino, _error);
5,620✔
1806

1807
  code = tableMetaReaderInit(pMeta, metaName, &pReader);
5,620✔
1808
  TSDB_CHECK_CODE(code, lino, _error);
5,620✔
1809

1810
  code = tableMetaReaderOpenIter(pReader, &pIter);
5,620✔
1811
  TSDB_CHECK_CODE(code, lino, _error);
5,620✔
1812

1813
  while (!pIter->isOver) {
7,369✔
1814
    SBlkHandle    blkHandle = {0};
2,915✔
1815
    SBlockWrapper wrapper;
2,915✔
1816

1817
    code = tableMetaReaderIterNext(pIter, &wrapper, &blkHandle);
2,915✔
1818
    TSDB_CHECK_CODE(code, lino, _error);
2,915✔
1819

1820
    if (pIter->isOver) {
2,915✔
1821
      break;
1,166✔
1822
    }
1823

1824
    blockWrapperSetType(&wrapper, BSE_TABLE_META_TYPE);
1,749✔
1825

1826
    code = tableMetaWriteAppendRawBlock(pWriter, &wrapper, &blkHandle);
1,749✔
1827
    TSDB_CHECK_CODE(code, lino, _error);
1,749✔
1828

1829
    seqRangeUpdate(&pMeta->range, &blkHandle.range);
1,749✔
1830
  }
1831

1832
  code = tableMetaWriterAppendBlock(pWriter, pBlock);
5,620✔
1833
  TSDB_CHECK_CODE(code, lino, _error);
5,620✔
1834

1835
  code = tableMetaWriterCommit(pWriter);
5,620✔
1836
  TSDB_CHECK_CODE(code, lino, _error);
5,620✔
1837

1838
  tableMetaWriterClose(pWriter);
5,620✔
1839
  tableMetaReaderClose(pReader);
5,620✔
1840

1841
  pWriter = NULL;
5,620✔
1842
  pReader = NULL;
5,620✔
1843

1844
  bseBuildFullName(pMeta->pBse, tempMetaName, tempMetaPath);
5,620✔
1845
  bseBuildFullName(pMeta->pBse, metaName, metaPath);
5,620✔
1846

1847
  code = taosRenameFile(tempMetaPath, metaPath);
5,620✔
1848
  TSDB_CHECK_CODE(code, lino, _error);
5,620✔
1849

1850
_error:
5,620✔
1851
  if (code != 0) {
5,620✔
1852
    bseError("failed to commit table meta %s at line %d since %s", pMeta->name, lino, tstrerror(code));
×
1853
  }
1854
  tableMetaReaderIterClose(pIter);
5,620✔
1855
  tableMetaWriterClose(pWriter);
5,620✔
1856
  tableMetaReaderClose(pReader);
5,620✔
1857

1858
  return code;
5,620✔
1859
}
1860
int32_t tableMetaWriterAppendBlock(SBtableMetaWriter *pMeta, SArray *pBlock) {
5,620✔
1861
  int32_t code = 0;
5,620✔
1862
  if (taosArrayAddAll(pMeta->pBlock, pBlock) == NULL) {
5,620✔
1863
    return terrno;
×
1864
  }
1865
  return code;
5,620✔
1866
}
1867

1868
int32_t tableMetaWriterFlushBlock(SBtableMetaWriter *pMeta) {
5,620✔
1869
  int32_t   code = 0;
5,620✔
1870
  int32_t   lino = 0;
5,620✔
1871
  SSeqRange range = {.sseq = -1, .eseq = -1};
5,620✔
1872

1873
  int64_t offset = 0;
5,620✔
1874
  int32_t nWrite = 0;
5,620✔
1875
  int32_t size = pMeta->blockCap;
5,620✔
1876

1877
  blockWrapperClear(&pMeta->blockWrapper);
5,620✔
1878
  code = blockWrapperResize(&pMeta->blockWrapper, size);
5,620✔
1879
  TSDB_CHECK_CODE(code, lino, _error);
5,620✔
1880

1881
  for (int32_t i = 0; i < taosArrayGetSize(pMeta->pBlock); i++) {
11,240✔
1882
    SMetaBlock *pBlk = taosArrayGet(pMeta->pBlock, i);
5,620✔
1883
    if (blockEsimateSize(pMeta->blockWrapper.data, sizeof(SMetaBlock)) >= pMeta->blockCap) {
5,620✔
1884
      SBlkHandle handle = {.offset = pMeta->offset, .size = offset, .range = range};
×
1885

1886
      blockWrapperSetType(&pMeta->blockWrapper, BSE_TABLE_META_TYPE);
×
1887

1888
      code = tableFlushBlock(pMeta->pFile, &handle, &pMeta->blockWrapper, &nWrite);
×
1889
      TSDB_CHECK_CODE(code, lino, _error);
×
1890

1891
      pMeta->offset += nWrite;
×
1892
      handle.size = nWrite;
×
1893

1894
      blockWrapperClear(&pMeta->blockWrapper);
×
1895
      code = blockWrapperResize(&pMeta->blockWrapper, size);
×
1896
      TSDB_CHECK_CODE(code, lino, _error);
×
1897

1898
      if (taosArrayPush(pMeta->pBlkHandle, &handle) == NULL) {
×
1899
        TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1900
      }
1901
      range.sseq = -1;
×
1902
      offset = 0;
×
1903
    }
1904

1905
    offset += metaBlockAdd(pMeta->blockWrapper.data, pBlk);
5,620✔
1906

1907
    if (range.sseq == -1) {
5,620✔
1908
      range.sseq = pBlk->range.sseq;
5,620✔
1909
    }
1910
    range.eseq = pBlk->range.eseq;
5,620✔
1911
  }
1912
  if (offset == 0) {
5,620✔
1913
    return 0;
×
1914
  }
1915

1916
  blockWrapperSetType(&pMeta->blockWrapper, BSE_TABLE_META_TYPE);
5,620✔
1917

1918
  SBlkHandle handle = {.offset = pMeta->offset, .size = offset, .range = range};
5,620✔
1919
  code = tableFlushBlock(pMeta->pFile, &handle, &pMeta->blockWrapper, &nWrite);
5,620✔
1920
  TSDB_CHECK_CODE(code, lino, _error);
5,620✔
1921

1922
  pMeta->offset += nWrite;
5,620✔
1923
  handle.size = nWrite;
5,620✔
1924

1925
  if (taosArrayPush(pMeta->pBlkHandle, &handle) == NULL) {
11,240✔
1926
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1927
  }
1928
_error:
5,620✔
1929
  if (code != 0) {
5,620✔
1930
    bseError("failed to flush table meta %s at line %d since %s", pMeta->name, 0, tstrerror(code));
×
1931
    tableMetaWriterClose(pMeta);
×
1932
  }
1933
  return code;
5,620✔
1934
}
1935

1936
int32_t tableMetaWriterFlushIndex(SBtableMetaWriter *pMeta) {
5,620✔
1937
  int32_t code = 0;
5,620✔
1938
  int32_t lino = 0;
5,620✔
1939

1940
  int32_t nWrite = 0;
5,620✔
1941
  int64_t lastOffset = pMeta->offset;
5,620✔
1942
  int32_t blkHandleSize = 0;
5,620✔
1943

1944
  int32_t extra = 8;
5,620✔
1945
  int32_t size = taosArrayGetSize(pMeta->pBlkHandle) * sizeof(SBlkHandle);
5,620✔
1946

1947
  SSeqRange range = {-1, -1};
5,620✔
1948

1949
  blockWrapperClear(&pMeta->blockWrapper);
5,620✔
1950
  code = blockWrapperResize(&pMeta->blockWrapper, size + extra);
5,620✔
1951
  TSDB_CHECK_CODE(code, lino, _error);
5,620✔
1952

1953
  for (int32_t i = 0; i < taosArrayGetSize(pMeta->pBlkHandle); i++) {
12,989✔
1954
    SBlkHandle *pHandle = taosArrayGet(pMeta->pBlkHandle, i);
7,369✔
1955
    if (pHandle == NULL) {
7,369✔
1956
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _error);
×
1957
    }
1958
    blkHandleSize += metaBlockAddIndex(pMeta->blockWrapper.data, pHandle);
7,369✔
1959

1960
    seqRangeUpdate(&range, &pHandle->range);
7,369✔
1961
  }
1962

1963
  blockWrapperSetType(&pMeta->blockWrapper, BSE_TABLE_META_INDEX_TYPE);
5,620✔
1964

1965
  SBlkHandle handle = {.offset = lastOffset, .size = blkHandleSize, .range = range};
5,620✔
1966
  code = tableFlushBlock(pMeta->pFile, &handle, &pMeta->blockWrapper, &nWrite);
5,620✔
1967
  TSDB_CHECK_CODE(code, lino, _error);
5,620✔
1968

1969
  SBlkHandle metaHandle = {.offset = pMeta->offset, .size = nWrite, .range = range};
5,620✔
1970
  SBlkHandle indexHandle = {.offset = pMeta->offset + nWrite, .size = 0, .range = range};
5,620✔
1971
  pMeta->offset += nWrite;
5,620✔
1972

1973
  memcpy(pMeta->footer.metaHandle, &metaHandle, sizeof(SBlkHandle));
5,620✔
1974
  memcpy(pMeta->footer.indexHandle, &metaHandle, sizeof(SBlkHandle));
5,620✔
1975
_error:
5,620✔
1976
  if (code != 0) {
5,620✔
1977
    bseError("failed to build table meta index at line %d since %s", lino, tstrerror(code));
×
1978
  }
1979
  return code;
5,620✔
1980
}
1981

1982
int32_t tableMetaWriterFlushFooter(SBtableMetaWriter *p) {
5,620✔
1983
  char buf[kEncodeLen] = {0};
5,620✔
1984

1985
  int32_t code = 0;
5,620✔
1986
  int32_t lino = 0;
5,620✔
1987

1988
  code = footerEncode(&p->footer, buf);
5,620✔
1989
  TSDB_CHECK_CODE(code, lino, _error);
5,620✔
1990

1991
  p->offset += sizeof(buf);
5,620✔
1992

1993
  int32_t nwrite = taosWriteFile(p->pFile, buf, sizeof(buf));
5,620✔
1994
  if (nwrite != sizeof(buf)) {
5,620✔
1995
    code = terrno;
×
1996
    TSDB_CHECK_CODE(code, lino, _error);
×
1997
  }
1998

1999
_error:
5,620✔
2000
  if (code != 0) {
5,620✔
2001
    bseError("failed to add footer to table builder at line %d since %s", lino, tstrerror(code));
×
2002
  }
2003
  return code;
5,620✔
2004
}
2005
int32_t tableMetaWriterCommit(SBtableMetaWriter *pMeta) {
5,620✔
2006
  int32_t code = 0;
5,620✔
2007
  int32_t lino = 0;
5,620✔
2008

2009
  code = tableMetaWriterFlushBlock(pMeta);
5,620✔
2010
  TSDB_CHECK_CODE(code, lino, _error);
5,620✔
2011

2012
  code = tableMetaWriterFlushIndex(pMeta);
5,620✔
2013
  TSDB_CHECK_CODE(code, lino, _error);
5,620✔
2014

2015
  code = tableMetaWriterFlushFooter(pMeta);
5,620✔
2016
  TSDB_CHECK_CODE(code, lino, _error);
5,620✔
2017
_error:
5,620✔
2018
  if (code != 0) {
5,620✔
2019
    bseError("failed to commit table meta %s at line %d since %s", pMeta->name, lino, tstrerror(code));
×
2020
    tableMetaWriterClose(pMeta);
×
2021
  }
2022
  return code;
5,620✔
2023
}
2024
int32_t tableMetaWriteAppendRawBlock(SBtableMetaWriter *pMeta, SBlockWrapper *pBlock, SBlkHandle *pBlkHandle) {
1,749✔
2025
  int32_t code = 0;
1,749✔
2026
  int32_t lino = 0;
1,749✔
2027

2028
  int32_t nwrite = 0;
1,749✔
2029
  code = tableFlushBlock(pMeta->pFile, pBlkHandle, pBlock, &nwrite);
1,749✔
2030
  TSDB_CHECK_CODE(code, lino, _error);
1,749✔
2031

2032
  SBlkHandle handle = {.offset = pMeta->offset, .size = nwrite, .range = pBlkHandle->range};
1,749✔
2033
  if (taosArrayPush(pMeta->pBlkHandle, &handle) == NULL) {
3,498✔
2034
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
2035
  }
2036
  pMeta->offset += nwrite;
1,749✔
2037
_error:
1,749✔
2038
  if (code != 0) {
1,749✔
2039
    bseError("failed to append block to table meta %s at line %d since %s", pMeta->name, lino, tstrerror(code));
×
2040
    tableMetaWriterClose(pMeta);
×
2041
  }
2042
  return code;
1,749✔
2043
}
2044

2045
int32_t tableMetaReaderLoadFooter(SBtableMetaReader *pMeta) {
11,025,640✔
2046
  int32_t code = 0;
11,025,640✔
2047
  int32_t lino = 0;
11,025,640✔
2048
  char    footer[kEncodeLen] = {0};
11,025,640✔
2049

2050
  if (pMeta->pFile == NULL) {
11,025,640✔
2051
    return 0;
4,454✔
2052
  }
2053
  int64_t n = taosLSeekFile(pMeta->pFile, -kEncodeLen, SEEK_END);
11,021,186✔
2054
  if (n < 0) {
11,021,186✔
2055
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
2056
  }
2057

2058
  if (taosReadFile(pMeta->pFile, footer, kEncodeLen) != kEncodeLen) {
11,021,186✔
2059
    code = terrno;
×
2060
    TSDB_CHECK_CODE(code, lino, _error);
×
2061
  }
2062

2063
  code = footerDecode(&pMeta->footer, footer);
11,021,186✔
2064
  TSDB_CHECK_CODE(code, lino, _error);
11,021,186✔
2065
_error:
11,021,186✔
2066
  if (code != 0) {
11,021,186✔
2067
    bseError("failed to load table meta footer %s at line %d since %s", pMeta->name, lino, tstrerror(code));
×
2068
  }
2069
  return code;
11,021,186✔
2070
}
2071

2072
int32_t tableOpenFile(char *name, int8_t read, TdFilePtr *pFile, int64_t *size) {
22,056,656✔
2073
  int32_t lino = 0;
22,056,656✔
2074
  int32_t code = 0;
22,056,656✔
2075
  int32_t opt = 0;
22,056,656✔
2076

2077
  TdFilePtr p = NULL;
22,056,656✔
2078
  if (read) {
22,056,656✔
2079
    opt = TD_FILE_READ;
22,045,660✔
2080
  } else {
2081
    opt = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_APPEND;
10,996✔
2082
  }
2083

2084
  if (!taosCheckExistFile(name)) {
22,056,656✔
2085
    if (read) {
15,450✔
2086
      return 0;
4,454✔
2087
    }
2088

2089
    p = taosOpenFile(name, opt);
10,996✔
2090
    if (p == NULL) {
10,996✔
2091
      TSDB_CHECK_CODE(code = terrno, lino, _error);
×
2092
    }
2093

2094
    *pFile = p;
10,996✔
2095
    return code;
10,996✔
2096
  }
2097

2098
  code = taosStatFile(name, size, NULL, NULL);
22,041,206✔
2099
  TSDB_CHECK_CODE(code, lino, _error);
22,041,206✔
2100
  if (*size <= 0) {
22,041,206✔
2101
    TSDB_CHECK_CODE(code = TSDB_CODE_NOT_FOUND, lino, _error);
×
2102
  }
2103

2104
  p = taosOpenFile(name, opt);
22,041,206✔
2105
  if (p == NULL) {
22,041,206✔
2106
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
2107
  }
2108
  *pFile = p;
22,041,206✔
2109

2110
_error:
22,041,206✔
2111
  if (code != 0) {
22,041,206✔
2112
    bseError("failed to open table meta %s at line %d since %s", name, lino, tstrerror(code));
×
2113
  }
2114
  return code;
22,041,206✔
2115
}
2116
int32_t tableMetaOpenFile(SBtableMetaWriter *pMeta, int8_t read, char *name) {
11,031,260✔
2117
  int32_t code = 0;
11,031,260✔
2118
  int64_t size = 0;
11,031,260✔
2119
  int32_t lino = 0;
11,031,260✔
2120

2121
  code = tableOpenFile(name, read, &pMeta->pFile, &size);
11,031,260✔
2122
  TSDB_CHECK_CODE(code, lino, _error);
11,031,260✔
2123

2124
_error:
11,031,260✔
2125
  if (code != 0) {
11,031,260✔
2126
    bseError("failed to open table meta %s at line %d since %s", pMeta->name, lino, tstrerror(code));
×
2127
  }
2128

2129
  return code;
11,031,260✔
2130
}
2131

2132
int32_t tableMetaReaderLoad(SBtableMetaReader *pMeta) {
11,025,640✔
2133
  int32_t code = 0;
11,025,640✔
2134
  int32_t lino = 0;
11,025,640✔
2135

2136
  code = tableMetaOpenFile(pMeta, 1, pMeta->name);
11,025,640✔
2137
  TSDB_CHECK_CODE(code, lino, _error);
11,025,640✔
2138

2139
  code = tableMetaReaderLoadFooter(pMeta);
11,025,640✔
2140
  TSDB_CHECK_CODE(code, lino, _error);
11,025,640✔
2141

2142
  code = tableMetaReaderLoadIndex(pMeta);
11,025,640✔
2143
  TSDB_CHECK_CODE(code, lino, _error);
11,025,640✔
2144

2145
_error:
11,025,640✔
2146
  if (code != 0) {
11,025,640✔
2147
    bseError("failed to load table meta %s at line %d since %s", pMeta->name, lino, tstrerror(code));
×
2148
  }
2149
  return code;
11,025,640✔
2150
}
2151

2152
void tableMetaClose(SBTableMeta *p) {
6,839✔
2153
  if (p == NULL) return;
6,839✔
2154
  taosMemoryFree(p);
6,839✔
2155
}
2156

2157
int32_t tableMetaWriterInit(SBTableMeta *pMeta, char *name, SBtableMetaWriter **ppWriter) {
5,620✔
2158
  int32_t code = 0;
5,620✔
2159
  int32_t lino = 0;
5,620✔
2160

2161
  char path[TSDB_FILENAME_LEN] = {0};
5,620✔
2162
  bseBuildFullName(pMeta->pBse, name, path);
5,620✔
2163

2164
  SBtableMetaWriter *p = taosMemCalloc(1, sizeof(SBtableMetaWriter));
5,620✔
2165
  if (p == NULL) {
5,620✔
2166
    return terrno;
×
2167
  }
2168
  p->pTableMeta = pMeta;
5,620✔
2169

2170
  p->blockCap = pMeta->blockCap;
5,620✔
2171

2172
  p->pBlkHandle = taosArrayInit(128, sizeof(SBlkHandle));
5,620✔
2173
  if (p->pBlkHandle == NULL) {
5,620✔
2174
    TSDB_CHECK_CODE(code, lino, _error);
×
2175
  }
2176

2177
  p->pBlock = taosArrayInit(128, sizeof(SMetaBlock));
5,620✔
2178
  if (p->pBlock == NULL) {
5,620✔
2179
    TSDB_CHECK_CODE(code, lino, _error);
×
2180
  }
2181

2182
  code = blockWrapperInit(&p->blockWrapper, 1024);
5,620✔
2183
  TSDB_CHECK_CODE(code, lino, _error);
5,620✔
2184

2185
  // Set pBse pointer for encryption/decryption
2186
  p->blockWrapper.pBse = pMeta->pBse;
5,620✔
2187

2188
  code = tableMetaOpenFile(p, 0, path);
5,620✔
2189
  TSDB_CHECK_CODE(code, lino, _error);
5,620✔
2190

2191
  *ppWriter = p;
5,620✔
2192
_error:
5,620✔
2193
  if (code != 0) {
5,620✔
2194
    bseError("failed to init table meta writer %s at line %d since %s", pMeta->name, lino, tstrerror(code));
×
2195
    tableMetaWriterClose(p);
×
2196
  }
2197
  return code;
5,620✔
2198
}
2199

2200
void tableMetaWriterClose(SBtableMetaWriter *p) {
11,240✔
2201
  if (p == NULL) return;
11,240✔
2202
  if (taosCloseFile(&p->pFile) != 0) {
5,620✔
2203
    bseError("failed to close table meta writer file since %s", tstrerror(terrno));
×
2204
  }
2205
  taosArrayDestroy(p->pBlkHandle);
5,620✔
2206
  taosArrayDestroy(p->pBlock);
5,620✔
2207
  blockWrapperCleanup(&p->blockWrapper);
5,620✔
2208
  taosMemoryFree(p);
5,620✔
2209
}
2210

2211
int32_t tableMetaReaderInit(SBTableMeta *pMeta, char *name, SBtableMetaReader **ppReader) {
11,025,640✔
2212
  int32_t code = 0;
11,025,640✔
2213
  int32_t lino = 0;
11,025,640✔
2214
  char    path[TSDB_FILENAME_LEN] = {0};
11,025,640✔
2215
  bseBuildFullName(pMeta->pBse, name, path);
11,025,640✔
2216

2217
  SBtableMetaReader *p = taosMemCalloc(1, sizeof(SBtableMetaReader));
11,025,640✔
2218
  if (p == NULL) {
11,025,640✔
2219
    return terrno;
×
2220
  }
2221
  memcpy(p->name, path, sizeof(path));
11,025,640✔
2222
  p->pTableMeta = pMeta;
11,025,640✔
2223

2224
  p->pBlkHandle = taosArrayInit(128, sizeof(SBlkHandle));
11,025,640✔
2225
  if (p->pBlkHandle == NULL) {
11,025,640✔
2226
    TSDB_CHECK_CODE(code, lino, _error);
×
2227
  }
2228

2229
  code = blockWrapperInit(&p->blockWrapper, 1024);
11,025,640✔
2230
  TSDB_CHECK_CODE(code, lino, _error);
11,025,640✔
2231

2232
  // Set pBse pointer for encryption/decryption
2233
  p->blockWrapper.pBse = pMeta->pBse;
11,025,640✔
2234

2235
  code = tableMetaReaderLoad(p);
11,025,640✔
2236
  TSDB_CHECK_CODE(code, lino, _error);
11,025,640✔
2237

2238
  *ppReader = p;
11,025,640✔
2239
_error:
11,025,640✔
2240
  if (code != 0) {
11,025,640✔
2241
    bseError("failed to init table meta reader %s at line %d since %s", pMeta->name, lino, tstrerror(code));
×
2242
    tableMetaReaderClose(p);
×
2243
  }
2244
  return code;
11,025,640✔
2245
}
2246

2247
void tableMetaReaderClose(SBtableMetaReader *p) {
11,031,260✔
2248
  if (p == NULL) return;
11,031,260✔
2249
  if (taosCloseFile(&p->pFile) != 0) {
11,025,640✔
2250
    bseError("failed to close table meta reader file since %s", tstrerror(terrno));
×
2251
  }
2252
  taosArrayDestroy(p->pBlkHandle);
11,025,640✔
2253
  blockWrapperCleanup(&p->blockWrapper);
11,025,640✔
2254
  taosMemoryFree(p);
11,025,640✔
2255
}
2256
int32_t tableMetaReaderLoadBlockMeta(SBtableMetaReader *p, int64_t seq, SMetaBlock *pMetaBlock) {
11,020,020✔
2257
  int32_t            code = 0;
11,020,020✔
2258
  int32_t            lino = 0;
11,020,020✔
2259
  SBtableMetaReader *pMeta = p;
11,020,020✔
2260
  SSeqRange          range = {.sseq = seq, .eseq = seq};
11,020,020✔
2261

2262
  SBlkHandle  handle = {.range = range};
11,020,020✔
2263
  int32_t     index = taosArraySearchIdx(p->pBlkHandle, &handle, compareFunc, TD_LE);
11,020,020✔
2264
  SBlkHandle *pHandle = taosArrayGet(p->pBlkHandle, index);
11,020,020✔
2265
  if (pHandle == NULL) {
11,020,020✔
2266
    return TSDB_CODE_NOT_FOUND;
×
2267
  }
2268

2269
  code = tableLoadBlock(p->pFile, pHandle, &p->blockWrapper);
11,020,020✔
2270
  TSDB_CHECK_CODE(code, lino, _error);
11,020,020✔
2271

2272
  code = blockSeekMeta(p->blockWrapper.data, seq, pMetaBlock);
11,020,020✔
2273
  TSDB_CHECK_CODE(code, lino, _error);
11,020,020✔
2274

2275
_error:
11,020,020✔
2276
  return code;
11,020,020✔
2277
}
2278
int32_t tableMetaReaderLoadAllDataHandle(SBtableMetaReader *p, SArray *dataHandle) {
×
2279
  int32_t lino = 0;
×
2280
  int32_t code = 0;
×
2281

2282
  SArray *pMeta = taosArrayInit(8, sizeof(SMetaBlock));
×
2283
  if (pMeta == NULL) {
×
2284
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
2285
  }
2286

2287
  for (int32_t i = 0; i < taosArrayGetSize(p->pBlkHandle); i++) {
×
2288
    SBlkHandle *pHandle = taosArrayGet(p->pBlkHandle, i);
×
2289
    if (pHandle == NULL) {
×
2290
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
2291
    }
2292

2293
    code = tableLoadBlock(p->pFile, pHandle, &p->blockWrapper);
×
2294
    TSDB_CHECK_CODE(code, lino, _exit);
×
2295

2296
    code = blockGetAllMeta(p->blockWrapper.data, pMeta);
×
2297
    TSDB_CHECK_CODE(code, lino, _exit);
×
2298

2299
    if (taosArrayGetSize(pMeta) == 0) {
×
2300
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
2301
    }
2302

2303
    for (int32_t j = 0; j < taosArrayGetSize(pMeta); j++) {
×
2304
      SMetaBlock *pBlk = taosArrayGet(pMeta, j);
×
2305
      SBlkHandle  handle = {.offset = pBlk->offset, .size = pBlk->size, .range = pBlk->range};
×
2306
      if (taosArrayPush(dataHandle, &handle) == NULL) {
×
2307
        TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
2308
      }
2309
    }
2310
  }
2311
_exit:
×
2312
  taosArrayDestroy(pMeta);
×
2313
  return code;
×
2314
}
2315

2316
int32_t tableMetaReaderLoadMetaHandle(SBtableMetaReader *p, SArray *pMetaHandle) {
×
2317
  int32_t code = 0;
×
2318
  int32_t lino = 0;
×
2319

2320
  if (taosArrayGetSize(p->pBlkHandle) == 0) {
×
2321
    return TSDB_CODE_NOT_FOUND;
×
2322
  }
2323

2324
  for (int32_t i = 0; i < taosArrayGetSize(p->pBlkHandle); i++) {
×
2325
    SBlkHandle *pHandle = taosArrayGet(p->pBlkHandle, i);
×
2326
    if (pHandle == NULL) {
×
2327
      return TSDB_CODE_FILE_CORRUPTED;
×
2328
    }
2329
    if (taosArrayPush(pMetaHandle, pHandle) == NULL) {
×
2330
      TSDB_CHECK_CODE(code = terrno, lino, _error);
×
2331
    }
2332
  }
2333
_error:
×
2334
  return code;
×
2335
}
2336

2337
int32_t tableMetaReaderLoadIndex(SBtableMetaReader *p) {
11,025,640✔
2338
  int32_t code = 0;
11,025,640✔
2339
  int32_t lino = 0;
11,025,640✔
2340
  int32_t offset = 0;
11,025,640✔
2341
  SBtableMetaReader *pMeta = p;
11,025,640✔
2342

2343
  if (pMeta->pFile == NULL) {
11,025,640✔
2344
    return 0;
4,454✔
2345
  }
2346

2347
  pMeta->blockWrapper.type = BSE_TABLE_META_TYPE;
11,021,186✔
2348

2349
  code = tableLoadBlock(pMeta->pFile, pMeta->footer.metaHandle, &pMeta->blockWrapper);
11,021,186✔
2350
  TSDB_CHECK_CODE(code, lino, _error);
11,021,186✔
2351

2352
  if (blockGetType(p->blockWrapper.data) != BSE_TABLE_META_INDEX_TYPE) {
11,021,186✔
2353
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _error);
×
2354
  }
2355

2356
  SBlock  *pBlk = (SBlock *)pMeta->blockWrapper.data;
11,021,186✔
2357
  uint8_t *data = (uint8_t *)pBlk->data;
11,021,186✔
2358

2359
  do {
2360
    SBlkHandle handle = {0};
26,762,769✔
2361
    offset += blkHandleDecode(&handle, (char *)data + offset);
26,762,769✔
2362
    if (taosArrayPush(pMeta->pBlkHandle, &handle) == NULL) {
53,525,538✔
2363
      TSDB_CHECK_CODE(terrno, lino, _error);
×
2364
    }
2365
  } while (offset < pBlk->len);
26,762,769✔
2366

2367
_error:
11,021,186✔
2368
  if (code != 0) {
11,021,186✔
2369
    bseError("failed to load table meta blk handle %s at line %d since %s", pMeta->name, lino, tstrerror(code));
×
2370
  }
2371
  return code;
11,021,186✔
2372
}
2373

2374
int32_t tableMetaReaderOpenIter(SBtableMetaReader *pReader, SBtableMetaReaderIter **pIter) {
5,620✔
2375
  int32_t code = 0;
5,620✔
2376
  int32_t lino = 0;
5,620✔
2377

2378
  SBtableMetaReaderIter *p = taosMemCalloc(1, sizeof(SBtableMetaReaderIter));
5,620✔
2379
  if (p == NULL) {
5,620✔
2380
    return terrno;
×
2381
  }
2382
  p->pReader = pReader;
5,620✔
2383

2384
  code = blockWrapperInit(&p->pBlockWrapper, 1024);
5,620✔
2385
  if (code != 0) {
5,620✔
2386
    return code;
×
2387
  }
2388

2389
  // Set pBse pointer for encryption/decryption
2390
  p->pBlockWrapper.pBse = ((SBTableMeta *)pReader->pTableMeta)->pBse;
5,620✔
2391

2392
  *pIter = p;
5,620✔
2393
  if (taosArrayGetSize(pReader->pBlkHandle) == 0) {
5,620✔
2394
    p->isOver = 1;
4,454✔
2395
    return 0;
4,454✔
2396
  }
2397

2398
  return 0;
1,166✔
2399
}
2400

2401
int32_t tableMetaReaderIterNext(SBtableMetaReaderIter *pIter, SBlockWrapper *pDataWrapper, SBlkHandle *dstHandle) {
2,915✔
2402
  int32_t code = 0;
2,915✔
2403
  int32_t lino = 0;
2,915✔
2404

2405
  if (pIter->blkIdx >= taosArrayGetSize(pIter->pReader->pBlkHandle)) {
2,915✔
2406
    pIter->isOver = 1;
1,166✔
2407
    return 0;
1,166✔
2408
  }
2409

2410
  SBlkHandle *pHandle = taosArrayGet(pIter->pReader->pBlkHandle, pIter->blkIdx);
1,749✔
2411
  if (pHandle == NULL) {
1,749✔
2412
    return TSDB_CODE_FILE_CORRUPTED;
×
2413
  }
2414

2415
  SBlockWrapper *pWrapper = &pIter->pBlockWrapper;
1,749✔
2416
  code = blockWrapperResize(pWrapper, pHandle->size);
1,749✔
2417
  TSDB_CHECK_CODE(code, lino, _error);
1,749✔
2418

2419
  code = tableLoadBlock(pIter->pReader->pFile, pHandle, pWrapper);
1,749✔
2420
  TSDB_CHECK_CODE(code, lino, _error);
1,749✔
2421

2422
  pIter->blkIdx++;
1,749✔
2423

2424
  if (blockGetType(pWrapper->data) != BSE_TABLE_META_TYPE) {
1,749✔
2425
    pIter->isOver = 1;
×
2426
    return 0;
×
2427
  }
2428

2429
  *pDataWrapper = *pWrapper;
1,749✔
2430
  *dstHandle = *pHandle;
1,749✔
2431

2432
_error:
1,749✔
2433
  if (code != 0) {
1,749✔
2434
    bseError("failed to load table meta blk handle %s at line %d since %s", pIter->pReader->name, lino,
×
2435
             tstrerror(code));
2436
    pIter->pReader = NULL;
×
2437
  }
2438
  return code;
1,749✔
2439
}
2440

2441
void tableMetaReaderIterClose(SBtableMetaReaderIter *p) {
5,620✔
2442
  if (p == NULL) return;
5,620✔
2443
  blockWrapperCleanup(&p->pBlockWrapper);
5,620✔
2444
  taosMemoryFree(p);
5,620✔
2445
}
2446

2447
int32_t bseMemTableCreate(STableMemTable **pMemTable, int32_t cap) {
9,121✔
2448
  int32_t code = 0;
9,121✔
2449
  int32_t lino = 0;
9,121✔
2450

2451
  STableMemTable *p = taosMemoryCalloc(1, sizeof(STableMemTable));
9,121✔
2452
  if (p == NULL) {
9,121✔
2453
    return terrno;
×
2454
  }
2455

2456
  p->pMetaHandle = taosArrayInit(8, sizeof(SBlkHandle));
9,121✔
2457
  if (p->pMetaHandle == NULL) {
9,121✔
2458
    TAOS_CHECK_GOTO(terrno, &lino, _error);
×
2459
  }
2460

2461
  code = blockWrapperInit(&p->pBlockWrapper, cap);
9,121✔
2462
  TAOS_CHECK_GOTO(code, &lino, _error);
9,121✔
2463

2464
  taosInitRWLatch(&p->latch);
9,121✔
2465

2466
  seqRangeReset(&p->range);
9,121✔
2467
  seqRangeReset(&p->tableRange);
9,121✔
2468
  p->ref = 1;
9,121✔
2469
  bseTrace("create mem table %p", p);
9,121✔
2470

2471
_error:
9,121✔
2472
  if (code != 0) {
9,121✔
2473
    bseMemTableDestroy(p);
×
2474
  }
2475
  *pMemTable = p;
9,121✔
2476

2477
  return code;
9,121✔
2478
}
2479

2480
int32_t bseMemTableRef(STableMemTable *pMemTable) {
22,099,343✔
2481
  int32_t code = 0;
22,099,343✔
2482
  if (pMemTable == NULL) {
22,099,343✔
2483
    return TSDB_CODE_INVALID_CFG;
×
2484
  }
2485

2486
  SBse *pBse = (SBse *)pMemTable->pBse;
22,099,343✔
2487
  bseTrace("ref mem table %p", pMemTable);
22,099,343✔
2488

2489
  int32_t nRef = atomic_fetch_add_32(&pMemTable->ref, 1);
22,099,343✔
2490
  if (nRef <= 0) {
22,099,343✔
2491
    bseError("vgId:%d, memtable ref count is invalid, ref:%d", BSE_VGID(pBse), nRef);
×
2492
    return TSDB_CODE_INVALID_CFG;
×
2493
  }
2494
  return code;
22,099,343✔
2495
}
2496

2497
void bseMemTableUnRef(STableMemTable *pMemTable) {
22,115,715✔
2498
  int32_t code = 0;
22,115,715✔
2499

2500
  bseTrace("unref mem table %p", pMemTable);
22,115,715✔
2501
  if (pMemTable == NULL) {
22,115,715✔
2502
    return;
7,251✔
2503
  }
2504
  if (atomic_sub_fetch_32(&pMemTable->ref, 1) == 0) {
22,108,464✔
2505
    bseMemTableDestroy(pMemTable);
9,121✔
2506
    bseTrace("destroy mem table %p", pMemTable);
9,121✔
2507
  }
2508
}
2509
void bseMemTableDestroy(STableMemTable *pMemTable) {
9,121✔
2510
  if (pMemTable == NULL) return;
9,121✔
2511
  taosArrayDestroy(pMemTable->pMetaHandle);
9,121✔
2512
  blockWrapperCleanup(&pMemTable->pBlockWrapper);
9,121✔
2513
  taosMemoryFree(pMemTable);
9,121✔
2514
}
2515
int32_t bseMemTablePush(STableMemTable *pMemTable, void *pHandle) {
5,620✔
2516
  int32_t code = 0;
5,620✔
2517
  if (pMemTable == NULL || pHandle == NULL) {
5,620✔
2518
    code = TSDB_CODE_INVALID_PARA;
×
2519
    return code;
×
2520
  }
2521

2522
  if (taosArrayPush(pMemTable->pMetaHandle, pHandle) == NULL) {
11,240✔
2523
    code = terrno;
×
2524
    bseError("Failed to push handle to memtable since %s", tstrerror(code));
×
2525

2526
    return code;
×
2527
  }
2528
  return code;
5,620✔
2529
}
2530
int32_t bseMemTablGetMetaBlock(STableMemTable *p, SArray **pMetaBlock) {
6,060✔
2531
  int32_t inLock = 0;
6,060✔
2532
  int32_t lino = 0;
6,060✔
2533
  int32_t code = bseMemTableRef(p);
6,060✔
2534
  if (code != 0) {
6,060✔
2535
    bseError("Failed to ref memtable since %s", tstrerror(code));
×
2536
    return code;
×
2537
  }
2538

2539
  SArray *pBlock = taosArrayInit(8, sizeof(SMetaBlock));
6,060✔
2540
  if (pBlock == NULL) {
6,060✔
2541
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
2542
  }
2543
  taosRLockLatch(&p->latch);
6,060✔
2544
  inLock = 1;
6,060✔
2545

2546
  for (int32_t i = 0; i < taosArrayGetSize(p->pMetaHandle); i++) {
11,680✔
2547
    SBlkHandle *handle = taosArrayGet(p->pMetaHandle, i);
5,620✔
2548
    SMetaBlock  block = {.type = BSE_TABLE_META_TYPE,
11,240✔
2549
                         .version = BSE_DATA_VER,
2550
                         .range = handle->range,
2551
                         .offset = handle->offset,
5,620✔
2552
                         .size = handle->size};
5,620✔
2553
    if (taosArrayPush(pBlock, &block) == NULL) {
5,620✔
2554
      TSDB_CHECK_CODE(code = terrno, lino, _error);
×
2555
    }
2556
  }
2557
_error:
6,060✔
2558
  if (inLock) taosRUnLockLatch(&p->latch);
6,060✔
2559
  if (code != 0) {
6,060✔
2560
    bseError("failed to get meta block from memtable since %s", tstrerror(code));
×
2561
    taosArrayDestroy(pBlock);
×
2562
    pBlock = NULL;
×
2563
  }
2564
  bseMemTableUnRef(p);
6,060✔
2565

2566
  *pMetaBlock = pBlock;
6,060✔
2567

2568
  return code;
6,060✔
2569
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc