• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4986

15 Mar 2026 08:32AM UTC coverage: 37.305% (-31.3%) from 68.601%
#4986

push

travis-ci

tomchon
test: keep docs and unit test

125478 of 336361 relevant lines covered (37.3%)

1134847.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/dnode/vnode/src/bse/bseTable.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "bse.h"
17
#include "bseCache.h"
18
#include "bseSnapshot.h"
19
#include "bseTable.h"
20
#include "bseTableMgt.h"
21
#include "crypt.h"
22
#include "osMemPool.h"
23
#include "vnodeInt.h"
24

25
// table footer func
26
static int32_t footerEncode(STableFooter *pFooter, char *buf);
27
static int32_t footerDecode(STableFooter *pFooter, char *buf);
28

29
// block handle func
30
static int32_t blkHandleEncode(SBlkHandle *pHandle, char *buf);
31
static int32_t blkHandleDecode(SBlkHandle *pHandle, char *buf);
32

33
// table meta func
34
static int32_t metaBlockEncode(SMetaBlock *pMeta, char *buf);
35
static int32_t metaBlockDecode(SMetaBlock *pMeta, char *buf);
36

37
static int32_t metaBlockAdd(SBlock *p, SMetaBlock *pMeta);
38
static int32_t metaBlockGet(SBlock *p, SMetaBlock *pMeta);
39

40
// table footer func
41
static int32_t footerEncode(STableFooter *pFooter, char *buf);
42
static int32_t footerDecode(STableFooter *pFooter, char *buf);
43

44
// block func
45
static int32_t blockCreate(int32_t cap, SBlock **pBlock);
46
static void    blockDestroy(SBlock *pBlock);
47
static int32_t blockPut(SBlock *pBlock, int64_t seq, uint8_t *value, int32_t len);
48
static int32_t blockAppendBatch(SBlock *p, uint8_t *value, int32_t len);
49
static int32_t blockEsimateSize(SBlock *pBlock, int32_t extra);
50
static void    blockClear(SBlock *pBlock);
51
static int32_t blockSeek(SBlock *p, int64_t seq, uint8_t **pValue, int32_t *len);
52
static int8_t  blockGetType(SBlock *p);
53

54
static int32_t blockSeekMeta(SBlock *p, int64_t seq, SMetaBlock *pMeta);
55
static int32_t blockGetAllMeta(SBlock *p, SArray *pResult);
56
static int32_t metaBlockAddIndex(SBlock *p, SBlkHandle *pInfo);
57

58
static int32_t tableMetaWriterInit(SBTableMeta *pMeta, char *name, SBtableMetaWriter **ppWriter);
59
static int32_t tableMetaWriterCommit(SBtableMetaWriter *pMeta);
60
static void    tableMetaWriterClose(SBtableMetaWriter *p);
61
static int32_t tableMetaWriteAppendRawBlock(SBtableMetaWriter *pMeta, SBlockWrapper *pBlock, SBlkHandle *pBlkHandle);
62

63
static int32_t tableMetaReaderInit(SBTableMeta *pMeta, char *name, SBtableMetaReader **ppReader);
64
static void    tableMetaReaderClose(SBtableMetaReader *p);
65
static int32_t tableMetaReaderLoadIndex(SBtableMetaReader *p);
66

67
static int32_t tableMetaOpenFile(SBtableMetaWriter *pMeta, int8_t read, char *name);
68

69
static int32_t tableMetaReaderOpenIter(SBtableMetaReader *pReader, SBtableMetaReaderIter **pIter);
70
static int32_t tableMetaReaderIterNext(SBtableMetaReaderIter *pIter, SBlockWrapper *pDataWrapper,
71
                                       SBlkHandle *dstHandle);
72
static void    tableMetaReaderIterClose(SBtableMetaReaderIter *p);
73

74
// STable builder func
75
static int32_t tableBuilderGetBlockSize(STableBuilder *p);
76
static int32_t tableBuilderLoadBlock(STableBuilder *p, SBlkHandle *pHandle, SBlockWrapper *pBlkWrapper);
77
static int32_t tableBuilderSeek(STableBuilder *p, SBlkHandle *pHandle, int64_t seq, uint8_t **pValue, int32_t *len);
78
static void    tableBuilderUpdateBlockRange(STableBuilder *p, SBlockItemInfo *pInfo);
79
static void    tableBuildUpdateTableRange(STableBuilder *p, SBlockItemInfo *pInfo);
80

81
// STable pReaderMgt func
82

83
static int32_t tableReaderInitMeta(STableReader *p, SBlock *pBlock);
84

85
static int32_t tableReaderLoadRawBlock(STableReader *p, SBlkHandle *pHandle, SBlockWrapper *pBlkWrapper);
86
static int32_t tableReaderLoadRawMeta(STableReader *p, SBlkHandle *pHandle, SBlockWrapper *blkWrapper);
87
static int32_t tableReaderLoadRawMetaIndex(STableReader *p, SBlockWrapper *blkWrapper);
88
static int32_t tableReaderLoadRawFooter(STableReader *p, SBlockWrapper *blkWrapper);
89

90
static int32_t tableOpenFile(char *name, int8_t read, TdFilePtr *pFile, int64_t *size);
91
static int32_t tableFlushBlock(TdFilePtr pFile, SBlkHandle *pHandle, SBlockWrapper *pBlk, int32_t *nWrite);
92
static int32_t tableLoadBlock(TdFilePtr pFile, SBlkHandle *pHandle, SBlockWrapper *pBlk);
93
static int32_t tableLoadRawBlock(TdFilePtr pFile, SBlkHandle *pHandle, SBlockWrapper *pBlk, int8_t checkSum);
94

95
/*---block formate----*/
96
//---datatype--|---len---|--data---|--rawdatasize---|--compressType---|---checksum---|
97
//- int8_t   |  int32_t | uint8_t[] |    int32_t     |      int8_t     |    TSCKSUM|
98
#define BLOCK_ROW_SIZE_OFFSET(p)      (sizeof(SBlock) + (p)->len)
99
#define BLOCK_ROW_SIZE(p)             BLOCK_ROW_SIZE_OFFSET(p)
100
#define BLOCK_COMPRESS_TYPE_OFFSET(p) (BLOCK_ROW_SIZE_OFFSET(p) + sizeof(int32_t))
101
#define BLOCK_CHECKSUM_OFFSET(p)      (BLOCK_COMPRESS_TYPE_OFFSET(p) + sizeof(int8_t))
102
#define BLOCK_TOTAL_SIZE(p)           (BLOCK_CHECKSUM_OFFSET(p) + sizeof(TSCKSUM))
103

104
#define BLOCK_SET_ROW_SIZE(p, size) *(int32_t *)((char *)(p) + BLOCK_ROW_SIZE_OFFSET(p)) = (size)
105
#define BLOCK_GET_ROW_SIZE(p)       *(int32_t *)((char *)(p) + BLOCK_ROW_SIZE_OFFSET(p))
106

107
#define BLOCK_SET_COMPRESS_TYPE(p, type) *(int8_t *)((char *)(p) + BLOCK_COMPRESS_TYPE_OFFSET(p)) = (type)
108
#define BLOCK_GET_COMPRESS_TYPE(p)       *(int8_t *)((char *)(p) + BLOCK_COMPRESS_TYPE_OFFSET(p))
109

110
#define BLOCK_TAIL_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(TSCKSUM))
111

112
#define COMREPSS_DATA_SET_TYPE_AND_RAWLEN(p, len, type, rawLen) \
113
  do {                                                          \
114
    *(int32_t *)((char *)(p) + len) = (rawLen);                 \
115
    *(int8_t *)((char *)(p) + len + sizeof(int32_t)) = (type);  \
116
  } while (0);
117
#define COMPRESS_DATA_GET_TYPE_AND_RAWLEN(p, len, type, rawLen)                 \
118
  do {                                                                          \
119
    (rawLen) = *(int32_t *)((char *)(p) + len - BLOCK_TAIL_LEN);                \
120
    (type) = *(int8_t *)((char *)(p) + len - BLOCK_TAIL_LEN + sizeof(int32_t)); \
121
  } while (0);
122

123
int32_t tableBuilderSeek(STableBuilder *p, SBlkHandle *pHandle, int64_t seq, uint8_t **pValue, int32_t *len) {
×
124
  int32_t code = 0;
×
125
  int32_t lino = 0;
×
126

127
  SBlockWrapper blockWrapper = {0};
×
128

129
  code = tableBuilderLoadBlock(p, pHandle, &blockWrapper);
×
130
  TSDB_CHECK_CODE(code, lino, _error);
×
131

132
  code = blockSeek(blockWrapper.data, seq, pValue, len);
×
133
  TSDB_CHECK_CODE(code, lino, _error);
×
134

135
_error:
×
136
  if (code != 0) {
×
137
    bseError("failed to seek data from table builder at lino %d ince %s", lino, tstrerror(code));
×
138
  }
139
  blockWrapperCleanup(&blockWrapper);
×
140
  return code;
×
141
}
142

143
int32_t tableBuilderLoadBlock(STableBuilder *p, SBlkHandle *pHandle, SBlockWrapper *pBlkWrapper) {
×
144
  int32_t code = 0;
×
145
  int32_t lino = 0;
×
146
  code = blockWrapperInit(pBlkWrapper, pHandle->size);
×
147
  TSDB_CHECK_CODE(code, lino, _error);
×
148

149
  // Set pBse pointer for encryption/decryption
150
  pBlkWrapper->pBse = p->pBse;
×
151

152
  code = tableLoadBlock(p->pDataFile, pHandle, pBlkWrapper);
×
153
_error:
×
154
  if (code != 0) {
×
155
    bseError("failed to load block from table builder at lino %d since %s", lino, tstrerror(code));
×
156
  }
157
  return code;
×
158
}
159

160
int32_t tableBuilderOpen(int64_t ts, STableBuilder **pBuilder, SBse *pBse) {
×
161
  int32_t code = 0;
×
162
  int32_t lino = 0;
×
163

164
  char name[TSDB_FILENAME_LEN] = {0};
×
165
  char path[TSDB_FILENAME_LEN] = {0};
×
166
  bseBuildDataName(ts, name);
×
167
  bseBuildFullName(pBse, name, path);
×
168

169
  STableBuilder *p = taosMemoryCalloc(1, sizeof(STableBuilder));
×
170
  if (p == NULL) {
×
171
    TSDB_CHECK_CODE(terrno, lino, _error);
×
172
  }
173
  p->timestamp = ts;
×
174
  memcpy(p->name, name, strlen(name));
×
175

176
  p->blockCap = BSE_BLOCK_SIZE(pBse);
×
177

178
  code = bseMemTableCreate(&p->pMemTable, BSE_BLOCK_SIZE(pBse));
×
179
  TSDB_CHECK_CODE(code, lino, _error);
×
180

181
  p->compressType = BSE_COMPRESS_TYPE(pBse);
×
182
  TSDB_CHECK_CODE(code, lino, _error);
×
183

184
  seqRangeReset(&p->tableRange);
×
185
  seqRangeReset(&p->blockRange);
×
186

187
  p->pBse = pBse;
×
188
  code = tableOpenFile(path, 0, &p->pDataFile, &p->offset);
×
189
  p->blockCap = BSE_BLOCK_SIZE(pBse);
×
190

191
  *pBuilder = p;
×
192
  p->pMemTable->pTableBuilder = p;
×
193

194
_error:
×
195
  if (code != 0) {
×
196
    (void)tableBuilderClose(p, 0);
×
197
    bseError("failed to open table builder at line %d since %s", lino, tstrerror(code));
×
198
  }
199
  return code;
×
200
}
201

202
int32_t tableBuilderGetMetaBlock(STableBuilder *p, SArray **pMetaBlock) {
×
203
  return bseMemTablGetMetaBlock(p->pImmuMemTable, pMetaBlock);
×
204
}
205

206
int32_t tableBuilderAddMeta(STableBuilder *p, SBlkHandle *pHandle, int8_t immu) {
×
207
  int32_t code = 0;
×
208
  int32_t lino = 0;
×
209

210
  STableMemTable *pMemTable = immu ? p->pImmuMemTable : p->pMemTable;
×
211

212
  code = bseMemTableRef(pMemTable);
×
213
  TAOS_CHECK_GOTO(code, &lino, _error);
×
214

215
  code = bseMemTablePush(pMemTable, pHandle);
×
216
  TAOS_CHECK_GOTO(code, &lino, _error);
×
217

218
  seqRangeReset(&pMemTable->range);
×
219
_error:
×
220
  bseMemTableUnRef(pMemTable);
×
221
  return code;
×
222
}
223
int32_t tableBuilderSetBlockInfo(STableMemTable *pMemTable) {
×
224
  int32_t        code = 0;
×
225
  int32_t        lino = 0;
×
226
  SBlockWrapper *pWp = &pMemTable->pBlockWrapper;
×
227

228
  code = blockWrapperResize(pWp, BLOCK_TOTAL_SIZE((SBlock *)(pWp->data)) + pWp->kvSize);
×
229
  TSDB_CHECK_CODE(code, lino, _error);
×
230

231
  SBlock *pBlock = (SBlock *)pWp->data;
×
232

233
  pBlock->offset = pBlock->len;
×
234
  memcpy(pBlock->data + pBlock->len, pWp->kvBuffer, pWp->kvSize);
×
235
  pBlock->len += pWp->kvSize;
×
236
  pBlock->version = BSE_DATA_VER;
×
237
_error:
×
238
  return code;
×
239
}
240
int32_t tableBuilderFlush(STableBuilder *p, int8_t type, int8_t immutable) {
×
241
  int32_t code = 0;
×
242
  int32_t lino = 0;
×
243
  int8_t  inLock = 0;
×
244

245
  STableMemTable *pMemTable = immutable ? p->pImmuMemTable : p->pMemTable;
×
246
  if (p == NULL) return code;
×
247

248
  SBlockWrapper wrapper = {0};
×
249
  code = bseMemTableRef(pMemTable);
×
250
  TSDB_CHECK_CODE(code, lino, _error);
×
251

252
  if (immutable) {
×
253
    taosWLockLatch(&pMemTable->latch);
×
254
    inLock = 1;
×
255
  }
256

257
  code = tableBuilderSetBlockInfo(pMemTable);
×
258
  TSDB_CHECK_CODE(code, lino, _error);
×
259

260
  SBlock *pBlk = pMemTable->pBlockWrapper.data;
×
261
  if (pBlk->len == 0) {
×
262
    goto _error;
×
263
  }
264

265
  int8_t compressType = BSE_COMPRESS_TYPE(p->pBse);
×
266

267
  uint8_t *pWrite = (uint8_t *)pBlk;
×
268
  int32_t  len = BLOCK_TOTAL_SIZE(pBlk);
×
269

270
  pBlk->type = type;
×
271

272
  BLOCK_SET_COMPRESS_TYPE(pBlk, compressType);
×
273
  BLOCK_SET_ROW_SIZE(pBlk, BLOCK_ROW_SIZE(pBlk));
×
274

275
  if (compressType != kNoCompres) {
×
276
    code = blockWrapperInit(&wrapper, len + 16);
×
277
    TSDB_CHECK_CODE(code, lino, _error);
×
278

279
    int32_t compressSize = wrapper.cap;
×
280
    code = bseCompressData(compressType, pWrite, BLOCK_ROW_SIZE(pBlk), wrapper.data, &compressSize);
×
281
    if (code != 0) {
×
282
      bseWarn("failed to compress data since %s, not set compress", tstrerror(TSDB_CODE_THIRDPARTY_ERROR));
×
283

284
      blockWrapperCleanup(&wrapper);
×
285
      BLOCK_SET_COMPRESS_TYPE(pBlk, kNoCompres);
×
286
      BLOCK_SET_ROW_SIZE(pBlk, BLOCK_ROW_SIZE(pBlk));
×
287
    } else {
288
      int32_t rawSize = BLOCK_ROW_SIZE(pBlk);
×
289
      COMREPSS_DATA_SET_TYPE_AND_RAWLEN(wrapper.data, compressSize, compressType, rawSize);
×
290
      len = compressSize + BLOCK_TAIL_LEN;
×
291

292
      pWrite = (uint8_t *)wrapper.data;
×
293
    }
294
  }
295

296
  code = taosCalcChecksumAppend(0, (uint8_t *)pWrite, len);
×
297
  TSDB_CHECK_CODE(code, lino, _error);
×
298

299
  SBlkHandle handle = {.size = len, .offset = p->offset, .range = pMemTable->range};
×
300

301
  bseDebug("bse flush at offset %" PRId64 " len: %d, block range sseq:%" PRId64 ", eseq:%" PRId64 "", p->offset, len,
×
302
           handle.range.sseq, handle.range.eseq);
303

304
  int64_t n = taosLSeekFile(p->pDataFile, handle.offset, SEEK_SET);
×
305
  if (n < 0) {
×
306
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
307
  }
308

309
  int64_t nwrite = taosWriteFile(p->pDataFile, (uint8_t *)pWrite, len);
×
310
  if (nwrite != len) {
×
311
    code = terrno;
×
312
    TSDB_CHECK_CODE(code, lino, _error);
×
313
  }
314
  p->offset += len;
×
315

316
  code = tableBuilderAddMeta(p, &handle, immutable);
×
317

318
_error:
×
319
  if (code != 0) {
×
320
    bseError("failed to flush table builder at line %d since %s", lino, tstrerror(code));
×
321
  }
322

323
  if (pMemTable != NULL) {
×
324
    if (!immutable) blockWrapperClear(&pMemTable->pBlockWrapper);
×
325
    if (inLock) {
×
326
      taosWUnLockLatch(&pMemTable->latch);
×
327
    }
328
    bseMemTableUnRef(pMemTable);
×
329
  }
330
  blockWrapperCleanup(&wrapper);
×
331
  return code;
×
332
}
333

334
void tableBuildUpdateTableRange(STableBuilder *p, SBlockItemInfo *pInfo) {
×
335
  SSeqRange range = {.sseq = pInfo->seq, .eseq = pInfo->seq};
×
336
  seqRangeUpdate(&p->tableRange, &range);
×
337
}
×
338

339
void tableBuilderUpdateBlockRange(STableBuilder *p, SBlockItemInfo *pInfo) {
×
340
  SSeqRange range = {.sseq = pInfo->seq, .eseq = pInfo->seq};
×
341
  seqRangeUpdate(&p->blockRange, &range);
×
342
}
×
343
void memtableUpdateBlockRange(STableMemTable *p, SBlockItemInfo *pInfo) {
×
344
  SSeqRange range = {.sseq = pInfo->seq, .eseq = pInfo->seq};
×
345
  seqRangeUpdate(&p->range, &range);
×
346
  seqRangeUpdate(&p->tableRange, &range);
×
347
}
×
348

349
// table block data
350
// data1 data2 data3 data4 k1v1 k2v2, k3,v3 compresss size raw_size
351
//|seq len value|seq len value| seq len value| seq len value|
352
int32_t tableBuilderPut(STableBuilder *p, SBseBatch *pBatch) {
×
353
  int32_t code = 0;
×
354
  int32_t lino = 0;
×
355
  int32_t len = 0, offset = 0;
×
356
  int8_t  inLock = 0;
×
357

358
  code = bseMemTableRef(p->pMemTable);
×
359
  if (code != 0) {
×
360
    return code;
×
361
  }
362

363
  taosWLockLatch(&p->pMemTable->latch);
×
364
  inLock = 1;
×
365

366
  SBlockWrapper *pBlockWrapper = &p->pMemTable->pBlockWrapper;
×
367

368
  for (int32_t i = 0; i < taosArrayGetSize(pBatch->pSeq);) {
×
369
    SBlockItemInfo *pInfo = taosArrayGet(pBatch->pSeq, i);
×
370
    if (i == 0 || i == taosArrayGetSize(pBatch->pSeq) - 1) {
×
371
      tableBuildUpdateTableRange(p, pInfo);
×
372
      memtableUpdateBlockRange(p->pMemTable, pInfo);
×
373
    }
374

375
    if (atomic_load_8(&p->hasImmuMemTable) ||
×
376
        (blockWrapperSize(pBlockWrapper, len + pInfo->size) < tableBuilderGetBlockSize(p))) {
×
377
      i++;
×
378
      len += pInfo->size;
×
379
      tableBuilderUpdateBlockRange(p, pInfo);
×
380
      memtableUpdateBlockRange(p->pMemTable, pInfo);
×
381

382
      code = blockWrapperPushMeta(pBlockWrapper, pInfo->seq, NULL, pInfo->size);
×
383
      TSDB_CHECK_CODE(code, lino, _error);
×
384

385
      bseTrace("start to insert  bse table builder mem %p, idx %d", p->pMemTable, i);
×
386
      continue;
×
387
    } else {
388
      if (len > 0) {
×
389
        offset += blockAppendBatch(pBlockWrapper->data, pBatch->buf + offset, len);
×
390
      }
391
      bseTrace("start to flush bse table builder mem %p", p->pMemTable);
×
392
      code = tableBuilderFlush(p, BSE_TABLE_DATA_TYPE, 0);
×
393
      TSDB_CHECK_CODE(code, lino, _error);
×
394
      len = 0;
×
395
    }
396
  }
397

398
  if (offset < pBatch->len) {
×
399
    int32_t size = pBatch->len - offset;
×
400
    if (size > 0) {
×
401
      code = blockWrapperResize(pBlockWrapper,
×
402
                                size + BLOCK_TOTAL_SIZE((SBlock *)(pBlockWrapper->data)) + pBlockWrapper->kvSize);
×
403
      TSDB_CHECK_CODE(code, lino, _error);
×
404
    }
405

406
    if (blockAppendBatch(pBlockWrapper->data, pBatch->buf + offset, size) != size) {
×
407
      code = TSDB_CODE_INVALID_PARA;
×
408
    }
409
  }
410
_error:
×
411
  if (code != 0) {
×
412
    bseError("failed to append batch since %s", tstrerror(code));
×
413
  }
414
  if (inLock) {
×
415
    taosWUnLockLatch(&p->pMemTable->latch);
×
416
  }
417

418
  bseMemTableUnRef(p->pMemTable);
×
419
  return code;
×
420
}
421

422
int32_t tableBuilderTruncFile(STableBuilder *p, int64_t size) {
×
423
  int32_t code = 0;
×
424
  int32_t lino = 0;
×
425

426
  if (p->pDataFile == NULL) {
×
427
    return TSDB_CODE_INVALID_PARA;
×
428
  }
429
  code = taosFtruncateFile(p->pDataFile, size);
×
430
  TSDB_CHECK_CODE(code, lino, _error);
×
431

432
_error:
×
433
  if (code != 0) {
×
434
    bseError("failed to truncate file since %s", tstrerror(code));
×
435
  }
436
  return code;
×
437
}
438

439
int32_t compareFunc(const void *pLeft, const void *pRight) {
×
440
  SBlkHandle *p1 = (SBlkHandle *)pLeft;
×
441
  SBlkHandle *p2 = (SBlkHandle *)pRight;
×
442
  if (p1->range.sseq > p2->range.sseq) {
×
443
    return 1;
×
444
  } else if (p1->range.sseq < p2->range.sseq) {
×
445
    return -1;
×
446
  }
447
  return 0;
×
448
}
449
int32_t findTargetBlock(SArray *pMetaHandle, int64_t seq) {
×
450
  SBlkHandle handle = {.range = {.sseq = seq, .eseq = seq}};
×
451
  return taosArraySearchIdx(pMetaHandle, &handle, compareFunc, TD_LE);
×
452
}
453

454
int32_t findInMemtable(STableMemTable *p, int64_t seq, uint8_t **value, int32_t *len) {
×
455
  int32_t code = 0;
×
456
  int8_t  inBuf = 1;
×
457
  int32_t lino = 0;
×
458
  int8_t  inLock = 0;
×
459
  if (p == NULL) {
×
460
    return TSDB_CODE_NOT_FOUND;
×
461
  }
462

463
  code = bseMemTableRef(p);
×
464
  if (code != 0) {
×
465
    return code;
×
466
  }
467

468
  taosRLockLatch(&p->latch);
×
469
  inLock = 1;
×
470

471
  if (!seqRangeContains(&p->tableRange, seq)) {
×
472
    TSDB_CHECK_CODE(TSDB_CODE_NOT_FOUND, lino, _error);
×
473
  }
474

475
  if (taosArrayGetSize(p->pMetaHandle) > 0) {
×
476
    SBlkHandle *pHandle = taosArrayGetLast(p->pMetaHandle);
×
477
    if (!seqRangeIsGreater(&pHandle->range, seq)) {
×
478
      inBuf = 0;
×
479
      int32_t idx = findTargetBlock(p->pMetaHandle, seq);
×
480
      if (idx < 0) {
×
481
        TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_RANGE, lino, _error);
×
482
      }
483
      pHandle = taosArrayGet(p->pMetaHandle, idx);
×
484
      code = tableBuilderSeek(p->pTableBuilder, pHandle, seq, value, len);
×
485
      TSDB_CHECK_CODE(code, lino, _error);
×
486
    }
487
  }
488

489
  if (inBuf == 1) {
×
490
    code = blockWrapperSeek(&p->pBlockWrapper, seq, value, len);
×
491
    if (code != 0) {
×
492
      bseInfo("mem table range [%" PRId64 ", %" PRId64 "]", p->tableRange.sseq, p->tableRange.eseq);
×
493
    }
494
    TSDB_CHECK_CODE(code, lino, _error);
×
495
  }
496
_error:
×
497
  if (inLock) {
×
498
    taosRUnLockLatch(&p->latch);
×
499
  }
500
  bseMemTableUnRef(p);
×
501
  if (code != 0) {
×
502
    bseInfo("failed to find seq %" PRId64 " in memtable %p at line %d since %s", seq, p, lino, tstrerror(code));
×
503
  }
504
  return code;
×
505
}
506
int32_t tableBuilderGet(STableBuilder *p, int64_t seq, uint8_t **value, int32_t *len) {
×
507
  int32_t code = 0;
×
508
  if (p == NULL) {
×
509
    return TSDB_CODE_NOT_FOUND;
×
510
  }
511

512
  code = findInMemtable(p->pMemTable, seq, value, len);
×
513
  if (code != 0) {
×
514
    code = findInMemtable(p->pImmuMemTable, seq, value, len);
×
515
  }
516
  return code;
×
517
}
518

519
static void updateTableRange(SBTableMeta *pTableMeta, SArray *pMetaBlock) {
×
520
  if (pMetaBlock == NULL) {
×
521
    return;
×
522
  }
523

524
  for (int32_t i = 0; i < taosArrayGetSize(pMetaBlock); i++) {
×
525
    SMetaBlock *pMeta = taosArrayGet(pMetaBlock, i);
×
526
    seqRangeUpdate(&pTableMeta->range, &pMeta->range);
×
527
  }
528
}
529
static int32_t tableBuilderClearImmuMemTable(STableBuilder *p) {
×
530
  int32_t           code = 0;
×
531
  STableBuilderMgt *pMgt = p->pBuilderMgt;
×
532
  (void)taosThreadRwlockWrlock(&pMgt->mutex);
×
533
  atomic_store_8(&p->hasImmuMemTable, 0);
×
534
  bseMemTableUnRef(p->pImmuMemTable);
×
535
  p->pImmuMemTable = NULL;
×
536

537
  (void)taosThreadRwlockUnlock(&pMgt->mutex);
×
538
  return code;
×
539
}
540
static int32_t tableBuildeSwapMemTable(STableBuilder *p) {
×
541
  int32_t code = 0;
×
542
  (void)taosThreadRwlockWrlock(&p->pBse->rwlock);
×
543
  p->pImmuMemTable = p->pMemTable;
×
544
  p->pMemTable = NULL;
×
545

546
  atomic_store_8(&p->hasImmuMemTable, 1);
×
547

548
  (void)taosThreadRwlockUnlock(&p->pBse->rwlock);
×
549
  return code;
×
550
}
551

552
int32_t tableBuilderCommit(STableBuilder *p, SBseLiveFileInfo *pInfo) {
×
553
  int32_t code = 0;
×
554
  int32_t lino = 0;
×
555

556
  STableCommitInfo commitInfo = {0};
×
557
  SArray          *pMetaBlock = NULL;
×
558
  if (p == NULL) {
×
559
    return TSDB_CODE_INVALID_PARA;
×
560
  }
561

562
  code = tableBuilderFlush(p, BSE_TABLE_DATA_TYPE, 1);
×
563
  TSDB_CHECK_CODE(code, lino, _error);
×
564

565
  code = taosFsyncFile(p->pDataFile);
×
566
  TSDB_CHECK_CODE(code, lino, _error);
×
567

568
  code = tableBuilderGetMetaBlock(p, &pMetaBlock);
×
569
  TSDB_CHECK_CODE(code, lino, _error);
×
570

571
  if (taosArrayGetSize(pMetaBlock) == 0) {
×
572
    bseDebug("no meta block to commit for table %s", p->name);
×
573
    taosArrayDestroy(pMetaBlock);
×
574
    pMetaBlock = NULL;
×
575
    return code;
×
576
  }
577

578
  code = tableMetaCommit(p->pTableMeta, pMetaBlock);
×
579
  TSDB_CHECK_CODE(code, lino, _error);
×
580

581
  updateTableRange(p->pTableMeta, pMetaBlock);
×
582

583
  pInfo->level = 0;
×
584
  pInfo->range = p->pTableMeta->range;
×
585
  pInfo->timestamp = p->timestamp;
×
586
  pInfo->size = p->offset;
×
587

588
  code = tableBuilderClearImmuMemTable(p);
×
589
  TSDB_CHECK_CODE(code, lino, _error);
×
590

591
_error:
×
592
  if (code != 0) {
×
593
    bseError("failed to commit table builder at line %d since %s ", lino, tstrerror(code));
×
594
  } else {
595
    bseInfo("succ to commit table %s", p->name);
×
596
  }
597
  taosArrayDestroy(pMetaBlock);
×
598
  return code;
×
599
}
600

601
int32_t tableBuilderGetBlockSize(STableBuilder *p) { return p->blockCap; }
×
602

603
void tableBuilderClose(STableBuilder *p, int8_t commited) {
×
604
  if (p == NULL) {
×
605
    return;
×
606
  }
607

608
  bseMemTableUnRef(p->pMemTable);
×
609
  bseMemTableUnRef(p->pImmuMemTable);
×
610

611
  if (taosCloseFile(&p->pDataFile) != 0) {
×
612
    bseError("failed to close table builder file %s since %s", p->name, tstrerror(terrno));
×
613
  }
614
  taosMemoryFree(p);
×
615
}
616

617
static void addSnapshotMetaToBlock(SBlockWrapper *pBlkWrapper, SSeqRange range, int8_t fileType, int8_t blockType,
×
618
                                   int64_t timestamp) {
619
  SBseSnapMeta *pSnapMeta = pBlkWrapper->data;
×
620

621
  pSnapMeta->range = range;
×
622
  pSnapMeta->fileType = fileType;
×
623
  pSnapMeta->blockType = blockType;
×
624
  pSnapMeta->timestamp = timestamp;
×
625
  return;
×
626
}
627

628
static void updateSnapshotMeta(SBlockWrapper *pBlkWrapper, SSeqRange range, int8_t fileType, int8_t blockType,
×
629
                               int64_t timestamp) {
630
  SBseSnapMeta *pSnapMeta = (SBseSnapMeta *)pBlkWrapper->data;
×
631
  pSnapMeta->timestamp = timestamp;
×
632
  return;
×
633
}
634
int32_t tableReaderLoadRawBlock(STableReader *p, SBlkHandle *pHandle, SBlockWrapper *blkWrapper) {
×
635
  int32_t code = 0;
×
636
  int32_t lino = 0;
×
637

638
  code = blockWrapperResize(blkWrapper, pHandle->size + sizeof(SBseSnapMeta));
×
639
  TSDB_CHECK_CODE(code, lino, _error);
×
640

641
  code = tableLoadRawBlock(p->pDataFile, pHandle, blkWrapper, 1);
×
642
  TSDB_CHECK_CODE(code, lino, _error);
×
643

644
  addSnapshotMetaToBlock(blkWrapper, p->range, BSE_TABLE_SNAP, BSE_TABLE_DATA_TYPE, p->timestamp);
×
645

646
_error:
×
647
  if (code != 0) {
×
648
    bseError("table reader failed to load block at line %d since %s", lino, tstrerror(code));
×
649
  }
650
  return code;
×
651
}
652

653
int32_t tableReaderLoadRawMeta(STableReader *p, SBlkHandle *pHandle, SBlockWrapper *blkWrapper) {
×
654
  int32_t code = 0;
×
655
  int32_t lino = 0;
×
656

657
  SBtableMetaReader *pReader = p->pMetaReader;
×
658

659
  code = blockWrapperResize(blkWrapper, pHandle->size + sizeof(SBseSnapMeta));
×
660
  TSDB_CHECK_CODE(code, lino, _error);
×
661

662
  code = tableLoadRawBlock(pReader->pFile, pHandle, blkWrapper, 1);
×
663
  TSDB_CHECK_CODE(code, lino, _error);
×
664

665
  addSnapshotMetaToBlock(blkWrapper, p->range, BSE_TABLE_META_SNAP, BSE_TABLE_META_TYPE, p->timestamp);
×
666
_error:
×
667
  if (code != 0) {
×
668
    bseError("failed to load raw meta from table pReaderMgt at line %d lino since %s", lino, tstrerror(code));
×
669
  }
670
  return code;
×
671
}
672
int32_t tableReaderLoadRawMetaIndex(STableReader *p, SBlockWrapper *blkWrapper) {
×
673
  int32_t code = 0;
×
674
  int32_t lino = 0;
×
675

676
  SBtableMetaReader *pReader = p->pMetaReader;
×
677
  SBlkHandle        *pHandle = p->pMetaReader->footer.metaHandle;
×
678

679
  code = blockWrapperResize(blkWrapper, pHandle->size + sizeof(SBseSnapMeta));
×
680
  TSDB_CHECK_CODE(code, lino, _error);
×
681

682
  code = tableLoadRawBlock(pReader->pFile, pHandle, blkWrapper, 1);
×
683
  TSDB_CHECK_CODE(code, lino, _error);
×
684

685
  addSnapshotMetaToBlock(blkWrapper, p->range, BSE_TABLE_META_SNAP, BSE_TABLE_META_INDEX_TYPE, p->timestamp);
×
686
_error:
×
687
  if (code != 0) {
×
688
    bseError("failed to load raw meta from table pReaderMgt at line %d lino since %s", lino, tstrerror(code));
×
689
  }
690
  return code;
×
691
}
692
int32_t tableReaderLoadRawFooter(STableReader *p, SBlockWrapper *blkWrapper) {
×
693
  int32_t code = 0;
×
694
  int32_t lino = 0;
×
695
  char    buf[kEncodeLen] = {0};
×
696

697
  SBtableMetaReader *pReader = p->pMetaReader;
×
698
  code = footerEncode(&pReader->footer, buf);
×
699
  int32_t len = sizeof(buf);
×
700

701
  int64_t n = taosLSeekFile(pReader->pFile, -kEncodeLen, SEEK_END);
×
702
  if (n < 0) {
×
703
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
704
  }
705

706
  if (taosReadFile(pReader->pFile, buf, sizeof(buf)) != len) {
×
707
    TSDB_CHECK_CODE(terrno, lino, _error);
×
708
  }
709

710
  code = blockWrapperResize(blkWrapper, len + sizeof(SBseSnapMeta));
×
711
  TSDB_CHECK_CODE(code, lino, _error);
×
712

713
  memcpy((uint8_t *)blkWrapper->data + sizeof(SBseSnapMeta), buf, sizeof(buf));
×
714
  blkWrapper->size = len + sizeof(SBseSnapMeta);
×
715

716
  addSnapshotMetaToBlock(blkWrapper, p->range, BSE_TABLE_META_SNAP, BSE_TABLE_FOOTER_TYPE, p->timestamp);
×
717
_error:
×
718
  if (code != 0) {
×
719
    bseError("failed to load raw footer from table pReaderMgt at lino %d since %s", lino, tstrerror(code));
×
720
  }
721
  return code;
×
722
}
723

724
int32_t tableReaderOpen(int64_t timestamp, STableReader **pReader, void *pReaderMgt) {
×
725
  char data[TSDB_FILENAME_LEN] = {0};
×
726
  char meta[TSDB_FILENAME_LEN] = {0};
×
727

728
  char dataPath[TSDB_FILENAME_LEN] = {0};
×
729

730
  int32_t code = 0;
×
731
  int32_t lino = 0;
×
732
  int64_t size = 0;
×
733

734
  STableReaderMgt *pMgt = (STableReaderMgt *)pReaderMgt;
×
735
  if (pMgt == NULL) {
×
736
    return TSDB_CODE_INVALID_CFG;
×
737
  }
738

739
  SSubTableMgt *pMeta = pMgt->pMgt;
×
740

741
  STableReader *p = taosMemCalloc(1, sizeof(STableReader));
×
742
  if (p == NULL) {
×
743
    TSDB_CHECK_CODE(terrno, lino, _error);
×
744
  }
745

746
  bseBuildDataName(timestamp, data);
×
747

748
  p->timestamp = timestamp;
×
749
  p->blockCap = 1024;
×
750
  p->pReaderMgt = pReaderMgt;
×
751
  memcpy(p->name, data, strlen(data));
×
752

753
  bseBuildFullName(pMgt->pBse, data, dataPath);
×
754
  code = tableOpenFile(dataPath, 1, &p->pDataFile, &p->fileSize);
×
755
  TSDB_CHECK_CODE(code, lino, _error);
×
756

757
  code = blockWrapperInit(&p->blockWrapper, 1024);
×
758
  TSDB_CHECK_CODE(code, lino, _error);
×
759

760
  // Set pBse pointer for encryption/decryption
761
  p->blockWrapper.pBse = pMgt->pBse;
×
762

763
  bseBuildMetaName(timestamp, meta);
×
764
  code = tableMetaReaderInit(pMeta->pTableMetaMgt->pTableMeta, meta, &p->pMetaReader);
×
765
  TSDB_CHECK_CODE(code, lino, _error);
×
766

767
  *pReader = p;
×
768

769
_error:
×
770
  if (code != 0) {
×
771
    tableReaderClose(p);
×
772
    bseError("failed to open table pReaderMgt at line %d since %s", lino, tstrerror(code));
×
773
  }
774
  return code;
×
775
}
776
void tableReaderShouldPutToCache(STableReader *p, int8_t cache) { p->putInCache = cache; }
×
777

778
int32_t tableReaderGet(STableReader *p, int64_t seq, uint8_t **pValue, int32_t *len) {
×
779
  int32_t    lino = 0;
×
780
  int32_t    code = 0;
×
781
  SMetaBlock block = {0};
×
782

783
  STableReaderMgt   *pMgt = (STableReaderMgt *)p->pReaderMgt;
×
784
  SBtableMetaReader *pMeta = p->pMetaReader;
×
785

786
  code = tableMetaReaderLoadBlockMeta(pMeta, seq, &block);
×
787
  TSDB_CHECK_CODE(code, lino, _error);
×
788

789
  SBlockWrapper wrapper = {0};
×
790
  SBlkHandle    blkhandle = {.offset = block.offset, .size = block.size, .range = block.range};
×
791

792
  SCacheItem *pItem = NULL;
×
793
  code = blockCacheGet(pMgt->pBlockCache, &blkhandle.range, (void **)&pItem);
×
794
  if (code != 0) {
×
795
    code = blockWrapperInit(&wrapper, block.size + 16);
×
796
    TSDB_CHECK_CODE(code, lino, _error);
×
797

798
    bseDebug("block size:%" PRId64 ", offset:%" PRId64 ", [sseq:%" PRId64 ", eseq:%" PRId64 "]", block.size,
×
799
             block.offset, block.range.sseq, block.range.eseq);
800

801
    code = tableLoadBlock(p->pDataFile, &blkhandle, &wrapper);
×
802
    if (code != 0) {
×
803
      blockWrapperCleanup(&wrapper);
×
804
      TSDB_CHECK_CODE(code, lino, _error);
×
805
    }
806

807
    code = blockCachePut(pMgt->pBlockCache, &block.range, wrapper.data);
×
808
    TSDB_CHECK_CODE(code, lino, _error);
×
809

810
  } else {
811
    wrapper.data = pItem->pItem;
×
812
    wrapper.pCachItem = pItem;
×
813
  }
814

815
  code = blockSeek(wrapper.data, seq, pValue, len);
×
816
  TSDB_CHECK_CODE(code, lino, _error);
×
817

818
  if (wrapper.pCachItem != NULL) {
×
819
    bseCacheUnrefItem(wrapper.pCachItem);
×
820
  }
821
  blockWrapperClearMeta(&wrapper);
×
822

823
_error:
×
824
  if (code != 0) {
×
825
    bseError("failed to get table reader data at line %d since %s", lino, tstrerror(code));
×
826
  }
827
  return code;
×
828
}
829
int32_t tableReaderGetMeta(STableReader *p, SArray **pMeta) {
×
830
  int32_t code = 0;
×
831
  int32_t lino = 0;
×
832

833
  SArray *pMetaHandle = taosArrayInit(128, sizeof(SBlkHandle));
×
834
  if (pMetaHandle == NULL) {
×
835
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
836
  }
837

838
  code = tableMetaReaderLoadAllDataHandle(p->pMetaReader, pMetaHandle);
×
839
  TSDB_CHECK_CODE(code, lino, _error);
×
840

841
  *pMeta = pMetaHandle;
×
842

843
_error:
×
844
  if (code != 0) {
×
845
    bseError("failed to get table reader meta at lino %d since %s", lino, tstrerror(code));
×
846
  }
847
  return code;
×
848
}
849

850
void tableReaderClose(STableReader *p) {
×
851
  if (p == NULL) return;
×
852
  int32_t code = 0;
×
853

854
  taosArrayDestroy(p->pMetaHandle);
×
855

856
  if (taosCloseFile(&p->pDataFile) != 0) {
×
857
    bseError("failed to close table reader file %s since %s", p->name, tstrerror(terrno));
×
858
  }
859
  tableMetaReaderClose(p->pMetaReader);
×
860
  blockWrapperCleanup(&p->blockWrapper);
×
861

862
  taosMemoryFree(p);
×
863
}
864

865
int32_t blockCreate(int32_t cap, SBlock **p) {
×
866
  int32_t code = 0;
×
867
  SBlock *t = taosMemCalloc(1, cap);
×
868
  if (t == NULL) {
×
869
    return terrno;
×
870
  }
871
  *p = t;
×
872
  return code;
×
873
}
874

875
int32_t blockEsimateSize(SBlock *p, int32_t extra) { return BLOCK_TOTAL_SIZE(p) + extra; }
×
876

877
int32_t blockWrapperSize(SBlockWrapper *p, int32_t extra) {
×
878
  if (p == NULL || p->data == NULL) {
×
879
    return 0;
×
880
  }
881

882
  return p->kvSize + blockEsimateSize(p->data, extra) + 12;
×
883
}
884
int32_t blockAppendBatch(SBlock *p, uint8_t *value, int32_t len) {
×
885
  int32_t  code = 0;
×
886
  int32_t  offset = 0;
×
887
  uint8_t *data = (uint8_t *)p->data + p->len;
×
888
  memcpy(data, value, len);
×
889
  p->len += len;
×
890
  return len;
×
891
}
892
int32_t blockPut(SBlock *p, int64_t seq, uint8_t *value, int32_t len) {
×
893
  int32_t  code = 0;
×
894
  uint8_t *data = (uint8_t *)p->data + p->len;
×
895

896
  int32_t offset = taosEncodeVariantI64((void **)&data, seq);
×
897
  offset += taosEncodeVariantI32((void **)&data, len);
×
898
  offset += taosEncodeBinary((void **)&data, value, len);
×
899
  p->len += len;
×
900
  return offset;
×
901
}
902
void blockClear(SBlock *p) {
×
903
  p->len = 0;
×
904
  p->type = 0;
×
905
  p->data[0] = 0;
×
906
}
×
907

908
int32_t blockSeek(SBlock *p, int64_t seq, uint8_t **pValue, int32_t *len) {
×
909
  int8_t  found = 0;
×
910
  int32_t code = 0;
×
911
  int32_t offset = 0;
×
912

913
  uint8_t *p1 = (uint8_t *)p->data;
×
914
  uint8_t *p2 = p1 + p->offset;
×
915
  while (p2 - p1 < p->len) {
×
916
    int64_t k;
917
    int32_t v;
918
    p2 = taosDecodeVariantI64(p2, &k);
×
919
    p2 = taosDecodeVariantI32(p2, &v);
×
920

921
    if (seq == k) {
×
922
      *len = v;
×
923
      found = 1;
×
924
      *pValue = taosMemoryCalloc(1, v);
×
925
      if (*pValue == NULL) {
×
926
        return terrno;
×
927
      }
928
      memcpy(*pValue, (uint8_t *)p->data + offset, v);
×
929
      break;
×
930
    }
931
    offset += v;
×
932
  }
933
  if (found == 0) {
×
934
    code = TSDB_CODE_NOT_FOUND;
×
935
  }
936
  return code;
×
937
}
938

939
int32_t blockWrapperSeek(SBlockWrapper *p, int64_t tgt, uint8_t **pValue, int32_t *len) {
×
940
  int32_t code = 0;
×
941
  if (p == NULL || p->data == NULL) {
×
942
    return TSDB_CODE_NOT_FOUND;
×
943
  }
944
  int32_t  offset = 0;
×
945
  uint8_t *p1 = p->kvBuffer;
×
946
  uint8_t *p2 = p1;
×
947
  SBlock  *pBlk = (SBlock *)p->data;
×
948
  while ((p2 - p1) < p->kvSize) {
×
949
    int64_t seq = 0;
×
950
    int32_t vlen = 0;
×
951
    p2 = taosDecodeVariantI64(p2, &seq);
×
952
    p2 = taosDecodeVariantI32(p2, &vlen);
×
953

954
    if (seq == tgt) {
×
955
      *len = vlen;
×
956
      *pValue = taosMemoryCalloc(1, vlen);
×
957
      if (*pValue == NULL) {
×
958
        return terrno;
×
959
      }
960
      uint8_t *pdata = (uint8_t *)pBlk->data + offset;
×
961
      memcpy(*pValue, pdata, vlen);
×
962
      return 0;
×
963
    }
964
    offset += vlen;
×
965
  }
966
  bseInfo("blockWrapperSeek not found seq:%" PRId64 ", tgt:%" PRId64 "", tgt, tgt);
×
967
  return TSDB_CODE_BLOB_SEQ_NOT_FOUND;
×
968
}
969

970
int8_t blockGetType(SBlock *p) { return p->type; }
×
971
void   blockDestroy(SBlock *pBlock) { taosMemoryFree(pBlock); }
×
972

973
int32_t metaBlockAddIndex(SBlock *p, SBlkHandle *pInfo) {
×
974
  int32_t  code = 0;
×
975
  uint8_t *data = (uint8_t *)p->data + p->len;
×
976
  int32_t  offset = blkHandleEncode(pInfo, (char *)data);
×
977
  p->len += offset;
×
978
  return offset;
×
979
}
980

981
int32_t blkHandleEncode(SBlkHandle *pHandle, char *buf) {
×
982
  char   *p = buf;
×
983
  int32_t tlen = 0;
×
984
  tlen += taosEncodeVariantU64((void **)&p, pHandle->offset);
×
985
  tlen += taosEncodeVariantU64((void **)&p, pHandle->size);
×
986
  tlen += taosEncodeVariantI64((void **)&p, pHandle->range.sseq);
×
987
  tlen += taosEncodeVariantI64((void **)&p, pHandle->range.eseq);
×
988
  return tlen;
×
989
}
990
int32_t blkHandleDecode(SBlkHandle *pHandle, char *buf) {
×
991
  char *p = buf;
×
992
  p = taosDecodeVariantU64(p, &pHandle->offset);
×
993
  p = taosDecodeVariantU64(p, &pHandle->size);
×
994
  p = taosDecodeVariantI64(p, &pHandle->range.sseq);
×
995
  p = taosDecodeVariantI64(p, &pHandle->range.eseq);
×
996
  return p - buf;
×
997
}
998

999
// | meta handle | index handle | padding | magic number high | magic number low |
1000
int32_t footerEncode(STableFooter *pFooter, char *buf) {
×
1001
  char   *p = buf;
×
1002
  int32_t len = 0;
×
1003
  len += blkHandleEncode(pFooter->metaHandle, p + len);
×
1004
  len += blkHandleEncode(pFooter->indexHandle, p + len);
×
1005

1006
  p = buf + kEncodeLen - 8;
×
1007
  len += taosEncodeFixedU32((void **)&p, kMagicNum);
×
1008
  len += taosEncodeFixedU32((void **)&p, kMagicNum);
×
1009
  return 0;
×
1010
}
1011
int32_t footerDecode(STableFooter *pFooter, char *buf) {
×
1012
  int32_t  code = 0;
×
1013
  char    *p = buf;
×
1014
  char    *mp = buf + kEncodeLen - 8;
×
1015
  uint32_t ml, mh;
1016

1017
  if (taosDecodeFixedU32(mp, &ml) == NULL) {
×
1018
    return TSDB_CODE_FILE_CORRUPTED;
×
1019
  }
1020
  if (taosDecodeFixedU32(mp + 4, &mh) == NULL) {
×
1021
    return TSDB_CODE_FILE_CORRUPTED;
×
1022
  }
1023

1024
  if (ml != kMagicNum || mh != kMagicNum) {
×
1025
    return TSDB_CODE_FILE_CORRUPTED;
×
1026
  }
1027

1028
  int32_t len = blkHandleDecode(pFooter->metaHandle, buf);
×
1029
  if (len < 0) {
×
1030
    return TSDB_CODE_FILE_CORRUPTED;
×
1031
  }
1032

1033
  len = blkHandleDecode(pFooter->indexHandle, buf + len);
×
1034
  if (len < 0) {
×
1035
    return TSDB_CODE_FILE_CORRUPTED;
×
1036
  }
1037
  return code;
×
1038
}
1039

1040
int32_t blockSeekMeta(SBlock *pBlock, int64_t seq, SMetaBlock *pMeta) {
×
1041
  int32_t  code = 0;
×
1042
  int32_t  len = 0;
×
1043
  uint8_t *p = (uint8_t *)pBlock->data;
×
1044

1045
  while (len < pBlock->len) {
×
1046
    SMetaBlock meta = {0};
×
1047
    int32_t    offset = metaBlockDecode(&meta, (char *)p);
×
1048
    if (seqRangeContains(&meta.range, seq)) {
×
1049
      memcpy(pMeta, &meta, sizeof(SMetaBlock));
×
1050
      return 0;
×
1051
    }
1052
    len += offset;
×
1053
    p += offset;
×
1054
  }
1055
  return TSDB_CODE_NOT_FOUND;
×
1056
}
1057
int32_t blockGetAllMeta(SBlock *pBlock, SArray *pMeta) {
×
1058
  int32_t  code = 0;
×
1059
  int32_t  len = 0;
×
1060
  uint8_t *p = (uint8_t *)pBlock->data;
×
1061

1062
  while (len < pBlock->len) {
×
1063
    SMetaBlock meta = {0};
×
1064
    int32_t    offset = metaBlockDecode(&meta, (char *)p);
×
1065
    if (taosArrayPush(pMeta, &meta) == NULL) {
×
1066
      return terrno;
×
1067
    }
1068
    len += offset;
×
1069
    p += offset;
×
1070
  }
1071

1072
  return code;
×
1073
}
1074

1075
int32_t metaBlockEncode(SMetaBlock *pMeta, char *buf) {
×
1076
  char   *p = buf;
×
1077
  int32_t len = 0;
×
1078
  len += taosEncodeFixedI8((void **)&p, pMeta->type);
×
1079
  len += taosEncodeFixedI8((void **)&p, pMeta->version);
×
1080
  len += taosEncodeFixedI16((void **)&p, pMeta->reserve);
×
1081
  len += taosEncodeVariantI64((void **)&p, pMeta->offset);
×
1082
  len += taosEncodeVariantI64((void **)&p, pMeta->size);
×
1083
  len += taosEncodeVariantI64((void **)&p, pMeta->range.sseq);
×
1084
  len += taosEncodeVariantI64((void **)&p, pMeta->range.eseq);
×
1085
  return len;
×
1086
}
1087
int32_t metaBlockDecode(SMetaBlock *pMeta, char *buf) {
×
1088
  char   *p = buf;
×
1089
  int32_t len = 0;
×
1090
  p = taosDecodeFixedI8(p, &pMeta->type);
×
1091
  p = taosDecodeFixedI8(p, &pMeta->version);
×
1092
  p = taosDecodeFixedI16(p, &pMeta->reserve);
×
1093
  p = taosDecodeVariantI64(p, &pMeta->offset);
×
1094
  p = taosDecodeVariantI64(p, &pMeta->size);
×
1095
  p = taosDecodeVariantI64(p, &pMeta->range.sseq);
×
1096
  p = taosDecodeVariantI64(p, &pMeta->range.eseq);
×
1097
  return p - buf;
×
1098
}
1099
int32_t metaBlockAdd(SBlock *p, SMetaBlock *pBlk) {
×
1100
  int32_t  code = 0;
×
1101
  uint8_t *data = (uint8_t *)p->data + p->len;
×
1102
  int32_t  offset = metaBlockEncode(pBlk, (char *)data);
×
1103
  p->len += offset;
×
1104
  return offset;
×
1105
}
1106
int32_t metaBlockGet(SBlock *p, SMetaBlock *pBlk) {
×
1107
  int32_t  code = 0;
×
1108
  uint8_t *data = (uint8_t *)p->data + p->len;
×
1109
  int32_t  offset = metaBlockDecode(pBlk, (char *)data);
×
1110
  p->len += offset;
×
1111
  return offset;
×
1112
}
1113

1114
int32_t tableFlushBlock(TdFilePtr pFile, SBlkHandle *pHandle, SBlockWrapper *pBlkW, int32_t *nWrite) {
×
1115
  int32_t code = 0;
×
1116
  int32_t lino = 0;
×
1117

1118
  SBlock *pBlk = pBlkW->data;
×
1119
  if (pBlk->len == 0) {
×
1120
    return 0;
×
1121
  }
1122
  pBlk->version = BSE_META_VER;
×
1123
  int8_t compressType = kNoCompres;
×
1124

1125
  SBlockWrapper wrapper = {0};
×
1126
  uint8_t      *encryptBuf = NULL;
×
1127

1128
  uint8_t *pWrite = (uint8_t *)pBlk;
×
1129
  int32_t  len = BLOCK_TOTAL_SIZE(pBlk);
×
1130
  int32_t  plainLen = len;
×
1131

1132
  BLOCK_SET_COMPRESS_TYPE(pBlk, compressType);
×
1133
  BLOCK_SET_ROW_SIZE(pBlk, BLOCK_ROW_SIZE(pBlk));
×
1134

1135
  if (compressType != kNoCompres) {
×
1136
    code = blockWrapperInit(&wrapper, len + 4);
×
1137
    TSDB_CHECK_CODE(code, lino, _error);
×
1138

1139
    int32_t compressSize = wrapper.cap;
×
1140
    code = bseCompressData(compressType, pWrite, BLOCK_ROW_SIZE(pBlk), wrapper.data, &compressSize);
×
1141
    if (code != 0) {
×
1142
      bseWarn("failed to compress data since %s, not set compress", tstrerror(TSDB_CODE_THIRDPARTY_ERROR));
×
1143

1144
      blockWrapperCleanup(&wrapper);
×
1145
      BLOCK_SET_COMPRESS_TYPE(pBlk, kNoCompres);
×
1146
      BLOCK_SET_ROW_SIZE(pBlk, BLOCK_ROW_SIZE(pBlk));
×
1147
    } else {
1148
      int32_t rawSize = BLOCK_ROW_SIZE(pBlk);
×
1149
      COMREPSS_DATA_SET_TYPE_AND_RAWLEN(wrapper.data, compressSize, compressType, rawSize);
×
1150
      len = compressSize + BLOCK_TAIL_LEN;
×
1151

1152
      pWrite = (uint8_t *)wrapper.data;
×
1153
      plainLen = len;
×
1154
    }
1155
  }
1156

1157
  // Encrypt data if encryption is enabled (before checksum)
1158
  SBse *pBse = (SBse *)pBlkW->pBse;
×
1159
  if (pBse != NULL && pBse->cfg.encryptKey[0] != '\0') {
×
1160
    // Encrypt data excluding BLOCK_TAIL_LEN (to keep compression info readable)
1161
    // plainLen includes checksum, subtract BLOCK_TAIL_LEN
1162
    int32_t plainDataLen = plainLen - BLOCK_TAIL_LEN;
×
1163
    int32_t cryptedLen = ENCRYPTED_LEN(plainDataLen);
×
1164

1165
    // Allocate buffer for encrypted data + BLOCK_TAIL_LEN
1166
    encryptBuf = taosMemoryMalloc(cryptedLen + BLOCK_TAIL_LEN);
×
1167
    if (encryptBuf == NULL) {
×
1168
      TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1169
    }
1170

1171
    // Pad and encrypt data part only
1172
    (void)memset(encryptBuf, 0, cryptedLen);
×
1173
    (void)memcpy(encryptBuf, pWrite, plainDataLen);
×
1174

1175
    // Encrypt using CBC
1176
    SCryptOpts opts = {0};
×
1177
    opts.len = cryptedLen;
×
1178
    opts.source = (char *)encryptBuf;
×
1179
    opts.result = (char *)encryptBuf;  // Encrypt in place
×
1180
    opts.unitLen = 16;
×
1181
    opts.pOsslAlgrName = pBse->cfg.encryptAlgrName;
×
1182
    tstrncpy((char *)opts.key, pBse->cfg.encryptKey, ENCRYPT_KEY_LEN + 1);
×
1183

1184
    int32_t count = CBC_Encrypt(&opts);
×
1185
    if (count != opts.len) {
×
1186
      TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1187
    }
1188

1189
    // Copy BLOCK_TAIL_LEN (compression info) unencrypted
1190
    (void)memcpy(encryptBuf + cryptedLen, pWrite + plainDataLen, BLOCK_TAIL_LEN);
×
1191

1192
    pWrite = encryptBuf;
×
1193
    len = cryptedLen + BLOCK_TAIL_LEN;
×
1194
  }
1195

1196
  code = taosCalcChecksumAppend(0, (uint8_t *)pWrite, len);
×
1197
  TSDB_CHECK_CODE(code, lino, _error);
×
1198

1199
  int64_t n = taosLSeekFile(pFile, pHandle->offset, SEEK_SET);
×
1200
  if (n < 0) {
×
1201
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1202
  }
1203

1204
  int32_t nwrite = taosWriteFile(pFile, (uint8_t *)pWrite, len);
×
1205
  if (nwrite != len) {
×
1206
    code = terrno;
×
1207
    TSDB_CHECK_CODE(code, lino, _error);
×
1208
  }
1209
  *nWrite = nwrite;
×
1210
  blockWrapperCleanup(&wrapper);
×
1211
  taosMemoryFree(encryptBuf);
×
1212
_error:
×
1213
  if (code != 0) {
×
1214
    bseError("failed to flush table builder at line %d since %s", lino, tstrerror(code));
×
1215
  } else {
1216
    bseDebug("flush at offset %" PRId64 ", size %d", pHandle->offset, len);
×
1217
  }
1218
  return code;
×
1219
}
1220
int32_t tableLoadBlock(TdFilePtr pFile, SBlkHandle *pHandle, SBlockWrapper *pBlkW) {
×
1221
  int32_t code = 0;
×
1222
  int32_t lino = 0;
×
1223

1224
  code = blockWrapperResize(pBlkW, pHandle->size + 16);
×
1225
  TSDB_CHECK_CODE(code, lino, _error);
×
1226

1227
  SBlock  *pBlk = pBlkW->data;
×
1228
  uint8_t *pRead = (uint8_t *)pBlk;
×
1229

1230
  SBlockWrapper pHelp = {0};
×
1231
  uint8_t      *decryptBuf = NULL;
×
1232

1233
  int64_t n = taosLSeekFile(pFile, pHandle->offset, SEEK_SET);
×
1234
  if (n < 0) {
×
1235
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1236
  }
1237

1238
  int32_t nr = taosReadFile(pFile, pRead, pHandle->size);
×
1239
  if (nr != pHandle->size) {
×
1240
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _error);
×
1241
  }
1242

1243
  if (taosCheckChecksumWhole((uint8_t *)pRead, pHandle->size) != 1) {
×
1244
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _error);
×
1245
  }
1246

1247
  // Decrypt data if encryption is enabled
1248
  SBse   *pBse = (SBse *)pBlkW->pBse;
×
1249
  int32_t dataLen = pHandle->size;
×
1250
  if (pBse != NULL && pBse->cfg.encryptKey[0] != '\0') {
×
1251
    // Data is encrypted (excluding BLOCK_TAIL_LEN), decrypt it
1252
    int32_t cryptedLen = pHandle->size - BLOCK_TAIL_LEN;
×
1253
    decryptBuf = taosMemoryMalloc(cryptedLen);
×
1254
    if (decryptBuf == NULL) {
×
1255
      TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1256
    }
1257

1258
    // Decrypt using CBC
1259
    SCryptOpts opts = {0};
×
1260
    opts.len = cryptedLen;
×
1261
    opts.source = (char *)pRead;
×
1262
    opts.result = (char *)decryptBuf;
×
1263
    opts.unitLen = 16;
×
1264
    opts.pOsslAlgrName = pBse->cfg.encryptAlgrName;
×
1265
    tstrncpy(opts.key, pBse->cfg.encryptKey, ENCRYPT_KEY_LEN + 1);
×
1266

1267
    int32_t count = CBC_Decrypt(&opts);
×
1268
    if (count != cryptedLen) {
×
1269
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _error);
×
1270
    }
1271

1272
    // Copy decrypted data back, BLOCK_TAIL_LEN remains unencrypted at the end
1273
    (void)memcpy(pRead, decryptBuf, cryptedLen);
×
1274
    // BLOCK_TAIL_LEN at pRead + cryptedLen is already in place (unencrypted)
1275
  }
1276

1277
  uint8_t compressType = 0;
×
1278
  int32_t rawSize = 0;
×
1279

1280
  // Get compression info from decrypted/plain data
1281
  COMPRESS_DATA_GET_TYPE_AND_RAWLEN(pRead, dataLen, compressType, rawSize);
×
1282

1283
  if (compressType != kNoCompres) {
×
1284
    code = blockWrapperInit(&pHelp, rawSize);
×
1285
    TSDB_CHECK_CODE(code, lino, _error);
×
1286

1287
    int32_t unCompressSize = pHelp.cap;
×
1288
    code = bseDecompressData(compressType, pRead, dataLen - BLOCK_TAIL_LEN, pHelp.data, &unCompressSize);
×
1289
    if (code != 0) {
×
1290
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _error);
×
1291
    }
1292

1293
    SBlock *p = pHelp.data;
×
1294
    if (BLOCK_ROW_SIZE_OFFSET(p) != unCompressSize) {
×
1295
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _error);
×
1296
    }
1297
    blockWrapperCleanup(pBlkW);
×
1298

1299
    blockWrapperTransfer(pBlkW, &pHelp);
×
1300

1301
  } else {
1302
    // For uncompressed data, use rawSize (which equals actual data size for no compression)
1303
    // rawSize is stored in BLOCK_TAIL_LEN and is always correct even after encryption
1304
    pBlk = pBlkW->data;
×
1305
    int32_t expectedLen = rawSize - sizeof(SBlock);
×
1306
    if (pBlk->len != expectedLen) {
×
1307
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _error);
×
1308
    }
1309
  }
1310
_error:
×
1311
  if (code != 0) {
×
1312
    bseError("failed to load block at lino %d since %s, read at offset %" PRId64 ", size:%" PRId64 "", lino,
×
1313
             tstrerror(code), pHandle->offset, pHandle->size);
1314
  } else {
1315
    bseDebug("read at offset %" PRId64 ", size %" PRId64 "", pHandle->offset, pHandle->size);
×
1316
  }
1317

1318
  blockWrapperCleanup(&pHelp);
×
1319
  taosMemoryFree(decryptBuf);
×
1320
  return code;
×
1321
}
1322
int32_t tableLoadRawBlock(TdFilePtr pFile, SBlkHandle *pHandle, SBlockWrapper *pBlkW, int8_t checkSum) {
×
1323
  int32_t code = 0;
×
1324
  int32_t lino = 0;
×
1325

1326
  SBlock  *pBlk = pBlkW->data;
×
1327
  uint8_t *pRead = (uint8_t *)pBlk + sizeof(SBseSnapMeta);
×
1328

1329
  int64_t n = taosLSeekFile(pFile, pHandle->offset, SEEK_SET);
×
1330
  if (n < 0) {
×
1331
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1332
  }
1333

1334
  int32_t nr = taosReadFile(pFile, pRead, pHandle->size);
×
1335
  if (nr != pHandle->size) {
×
1336
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _error);
×
1337
  }
1338

1339
  if (checkSum) {
×
1340
    if (taosCheckChecksumWhole((uint8_t *)pRead, pHandle->size) != 1) {
×
1341
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _error);
×
1342
    }
1343
  }
1344

1345
  pBlkW->size = pHandle->size + sizeof(SBseSnapMeta);
×
1346
_error:
×
1347
  if (code != 0) {
×
1348
    bseError("failed to load block at lino %d since %s", lino, tstrerror(code));
×
1349
  }
1350
  return code;
×
1351
}
1352

1353
int8_t seqRangeContains(SSeqRange *p, int64_t seq) { return seq >= p->sseq && seq <= p->eseq; }
×
1354

1355
void seqRangeReset(SSeqRange *p) {
×
1356
  p->sseq = -1;
×
1357
  p->eseq = -1;
×
1358
}
×
1359

1360
int8_t seqRangeIsGreater(SSeqRange *p, int64_t seq) { return seq > p->eseq; }
×
1361

1362
void seqRangeUpdate(SSeqRange *dst, SSeqRange *src) {
×
1363
  if (dst->sseq == -1) {
×
1364
    dst->sseq = src->sseq;
×
1365
  }
1366
  dst->eseq = src->eseq;
×
1367
}
×
1368

1369
int32_t blockWrapperInit(SBlockWrapper *p, int32_t cap) {
×
1370
  int32_t code = 0;
×
1371
  int32_t lino = 0;
×
1372
  p->data = taosMemoryCalloc(1, cap);
×
1373
  if (p->data == NULL) {
×
1374
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1375
  }
1376

1377
  p->kvSize = 0;
×
1378
  p->kvCap = 128;
×
1379
  p->kvBuffer = taosMemoryCalloc(1, p->kvCap);
×
1380
  if (p->kvBuffer == NULL) {
×
1381
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1382
  }
1383

1384
  SBlock *block = (SBlock *)p->data;
×
1385
  block->offset = 0;
×
1386
  block->version = 0;
×
1387
  p->cap = cap;
×
1388
_error:
×
1389
  if (code != 0) {
×
1390
    blockWrapperCleanup(p);
×
1391
  }
1392
  return code;
×
1393
}
1394
int32_t blockWrapperPushMeta(SBlockWrapper *p, int64_t seq, uint8_t *value, int32_t len) {
×
1395
  int32_t code = 0;
×
1396
  if ((p->kvSize + 12) > p->kvCap) {
×
1397
    if (p->kvCap == 0) {
×
1398
      p->kvCap = 128;
×
1399
    } else {
1400
      p->kvCap *= 2;
×
1401
    }
1402

1403
    void *data = taosMemoryRealloc(p->kvBuffer, p->kvCap);
×
1404
    if (data == NULL) {
×
1405
      return terrno;
×
1406
    }
1407
    p->kvBuffer = data;
×
1408
  }
1409
  uint8_t *data = (uint8_t *)p->kvBuffer + p->kvSize;
×
1410
  p->kvSize += taosEncodeVariantI64((void **)&data, seq);
×
1411
  p->kvSize += taosEncodeVariantI32((void **)&data, len);
×
1412
  return code;
×
1413
}
1414

1415
void blockWrapperClearMeta(SBlockWrapper *p) {
×
1416
  if (p->kvBuffer != NULL) {
×
1417
    taosMemoryFree(p->kvBuffer);
×
1418
  }
1419
  p->kvSize = 0;
×
1420
  p->kvCap = 0;
×
1421
}
×
1422

1423
void blockWrapperCleanup(SBlockWrapper *p) {
×
1424
  if (p->data != NULL) {
×
1425
    taosMemoryFree(p->data);
×
1426
    p->data = NULL;
×
1427
  }
1428
  p->kvSize = 0;
×
1429
  taosMemoryFreeClear(p->kvBuffer);
×
1430
  p->cap = 0;
×
1431
}
×
1432

1433
void blockWrapperTransfer(SBlockWrapper *dst, SBlockWrapper *src) {
×
1434
  if (dst == NULL || src == NULL) {
×
1435
    return;
×
1436
  }
1437
  dst->data = src->data;
×
1438
  dst->cap = src->cap;
×
1439

1440
  dst->kvBuffer = src->kvBuffer;
×
1441
  dst->kvSize = src->kvSize;
×
1442
  dst->kvCap = src->kvCap;
×
1443

1444
  src->kvBuffer = NULL;
×
1445
  src->kvSize = 0;
×
1446
  src->kvCap = 0;
×
1447

1448
  src->data = NULL;
×
1449
  src->cap = 0;
×
1450
}
1451

1452
int32_t blockWrapperResize(SBlockWrapper *p, int32_t newCap) {
×
1453
  if (p->cap < newCap) {
×
1454
    int32_t cap = p->cap;
×
1455
    if (cap == 0) cap = 1024;
×
1456
    while (cap < newCap) {
×
1457
      cap = cap * 2;
×
1458
    }
1459
    void *data = taosMemoryRealloc(p->data, cap);
×
1460
    if (data == NULL) {
×
1461
      return terrno;
×
1462
    }
1463
    p->data = data;
×
1464
    p->cap = cap;
×
1465
  }
1466
  return 0;
×
1467
}
1468

1469
void blockWrapperClear(SBlockWrapper *p) {
×
1470
  if (p->data == NULL) {
×
1471
    return;
×
1472
  }
1473
  SBlock *block = (SBlock *)p->data;
×
1474
  p->kvSize = 0;
×
1475
  p->size = 0;
×
1476
  blockClear(block);
×
1477
}
1478

1479
void blockWrapperSetType(SBlockWrapper *p, int8_t type) {
×
1480
  SBlock *block = (SBlock *)p->data;
×
1481
  block->type = type;
×
1482
}
×
1483

1484
int32_t tableReaderIterInit(int64_t timestamp, int8_t type, STableReaderIter **ppIter, SBse *pBse) {
×
1485
  int32_t    code = 0;
×
1486
  int32_t    lino = 0;
×
1487
  STableMgt *pTableMgt = pBse->pTableMgt;
×
1488

1489
  STableReaderIter *p = taosMemCalloc(1, sizeof(STableReaderIter));
×
1490
  if (p == NULL) {
×
1491
    return terrno;
×
1492
  }
1493

1494
  p->timestamp = timestamp;
×
1495
  SSubTableMgt *retentionMgt = NULL;
×
1496

1497
  code = createSubTableMgt(timestamp, 1, pBse->pTableMgt, &retentionMgt);
×
1498
  TSDB_CHECK_CODE(code, lino, _error);
×
1499

1500
  p->pSubMgt = retentionMgt;
×
1501

1502
  code = tableReaderOpen(timestamp, &p->pTableReader, retentionMgt->pReaderMgt);
×
1503
  TSDB_CHECK_CODE(code, lino, _error);
×
1504

1505
  tableReaderShouldPutToCache(p->pTableReader, 0);
×
1506

1507
  p->blockIndex = 0;
×
1508
  p->blockType = type;
×
1509

1510
  if (p->blockType == BSE_TABLE_DATA_TYPE) {
×
1511
    code = tableReaderGetMeta(p->pTableReader, &p->pMetaHandle);
×
1512
    TSDB_CHECK_CODE(code, lino, _error);
×
1513

1514
  } else if (p->blockType == BSE_TABLE_META_TYPE) {
×
1515
    p->pMetaHandle = taosArrayInit(8, sizeof(SBlkHandle));
×
1516
    if (p->pMetaHandle == NULL) {
×
1517
      TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1518
    }
1519
    code = tableMetaReaderLoadMetaHandle(p->pTableReader->pMetaReader, p->pMetaHandle);
×
1520
  } else {
1521
    p->isOver = 1;
×
1522
  }
1523
  *ppIter = p;
×
1524

1525
_error:
×
1526
  if (code != 0) {
×
1527
    bseError("failed to init table reader iter since %s", tstrerror(code));
×
1528
    tableReaderIterDestroy(p);
×
1529
  }
1530
  return code;
×
1531
}
1532

1533
int32_t tableReaderIterNext(STableReaderIter *pIter, uint8_t **pValue, int32_t *len) {
×
1534
  int32_t      code = 0;
×
1535
  int32_t      lino = 0;
×
1536
  SBseSnapMeta snapMeta = {0};
×
1537
  snapMeta.range.sseq = -1;
×
1538
  snapMeta.range.eseq = -1;
×
1539
  snapMeta.timestamp = pIter->timestamp;
×
1540
  snapMeta.fileType = pIter->fileType;
×
1541
  snapMeta.blockType = pIter->blockType;
×
1542

1543
  if (pIter->blockType == BSE_TABLE_DATA_TYPE) {
×
1544
    SBlkHandle *pHandle = NULL;
×
1545
    if (pIter->blockIndex >= taosArrayGetSize(pIter->pMetaHandle)) {
×
1546
      taosArrayDestroy(pIter->pMetaHandle);
×
1547
      pIter->pMetaHandle = NULL;
×
1548
      pIter->blockIndex = 0;
×
1549
      pIter->isOver = 1;
×
1550
      return 0;
×
1551
    } else {
1552
      pHandle = taosArrayGet(pIter->pMetaHandle, pIter->blockIndex);
×
1553
      bseDebug("file type %d, block type: %d,block index %d, offset %" PRId64 ", size %" PRId64 ", range [%" PRId64
×
1554
               ", %" PRId64 "]",
1555
               pIter->fileType, pIter->blockType, pIter->blockIndex, pHandle->offset, pHandle->size,
1556
               pHandle->range.sseq, pHandle->range.eseq);
1557
      code = tableReaderLoadRawBlock(pIter->pTableReader, pHandle, &pIter->blockWrapper);
×
1558
      TSDB_CHECK_CODE(code, lino, _error);
×
1559

1560
      pIter->blockIndex++;
×
1561
    }
1562

1563
  } else if (pIter->blockType == BSE_TABLE_META_TYPE) {
×
1564
    SBlkHandle *pHandle = NULL;
×
1565
    if (pIter->blockIndex >= taosArrayGetSize(pIter->pMetaHandle)) {
×
1566
      taosArrayDestroy(pIter->pMetaHandle);
×
1567
      pIter->pMetaHandle = NULL;
×
1568
      pIter->blockIndex = 0;
×
1569
      pIter->blockType = BSE_TABLE_META_INDEX_TYPE;
×
1570
    } else {
1571
      pHandle = taosArrayGet(pIter->pMetaHandle, pIter->blockIndex);
×
1572

1573
      bseDebug("file type %d, block type: %d,block index %d, offset %" PRId64 ", size %" PRId64 ", range [%" PRId64
×
1574
               ", %" PRId64 "]",
1575
               pIter->fileType, pIter->blockType, pIter->blockIndex, pHandle->offset, pHandle->size,
1576
               pHandle->range.sseq, pHandle->range.eseq);
1577
      code = tableReaderLoadRawMeta(pIter->pTableReader, pHandle, &pIter->blockWrapper);
×
1578
      TSDB_CHECK_CODE(code, lino, _error);
×
1579
      pIter->blockIndex++;
×
1580
    }
1581
  }
1582

1583
  if (pIter->blockType == BSE_TABLE_META_INDEX_TYPE) {
×
1584
    code = tableReaderLoadRawMetaIndex(pIter->pTableReader, &pIter->blockWrapper);
×
1585
    TSDB_CHECK_CODE(code, lino, _error);
×
1586

1587
    pIter->blockType = BSE_TABLE_FOOTER_TYPE;
×
1588
  } else if (pIter->blockType == BSE_TABLE_FOOTER_TYPE) {
×
1589
    code = tableReaderLoadRawFooter(pIter->pTableReader, &pIter->blockWrapper);
×
1590
    TSDB_CHECK_CODE(code, lino, _error);
×
1591

1592
    pIter->blockType = BSE_TABLE_END_TYPE;
×
1593
  } else if (pIter->blockType == BSE_TABLE_END_TYPE) {
×
1594
    pIter->isOver = 1;
×
1595
  }
1596

1597
_error:
×
1598
  if (code != 0) {
×
1599
    bseError("failed to load block since %s", tstrerror(code));
×
1600
    pIter->isOver = 1;
×
1601
  }
1602
  SSeqRange range = {0};
×
1603
  if (pIter->blockWrapper.data != NULL) {
×
1604
    updateSnapshotMeta(&pIter->blockWrapper, range, pIter->fileType, pIter->blockType, snapMeta.timestamp);
×
1605
    *pValue = pIter->blockWrapper.data;
×
1606
    *len = pIter->blockWrapper.size;
×
1607
  }
1608
  return code;
×
1609
}
1610

1611
int8_t tableReaderIterValid(STableReaderIter *pIter) { return pIter->isOver == 0; }
×
1612

1613
int32_t bseReadCurrentSnap(SBse *pBse, uint8_t **pValue, int32_t *len) {
×
1614
  int32_t   code = 0;
×
1615
  char      path[128] = {0};
×
1616
  int32_t   lino = 0;
×
1617
  TdFilePtr fd = NULL;
×
1618
  int64_t   sz = 0;
×
1619
  char      name[TSDB_FILENAME_LEN] = {0};
×
1620

1621
  uint8_t *pCurrent = NULL;
×
1622

1623
  bseBuildCurrentFullName(pBse, name);
×
1624
  if (taosCheckExistFile(name) == 0) {
×
1625
    bseInfo("vgId:%d, no current meta file found, skip recover", BSE_VGID(pBse));
×
1626
    return 0;
×
1627
  }
1628
  code = taosStatFile(name, &sz, NULL, NULL);
×
1629
  TSDB_CHECK_CODE(code, lino, _error);
×
1630

1631
  fd = taosOpenFile(name, TD_FILE_READ);
×
1632
  if (fd == NULL) {
×
1633
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1634
  }
1635
  pCurrent = (uint8_t *)taosMemoryCalloc(1, sizeof(SBseSnapMeta) + sz);
×
1636
  if (pCurrent == NULL) {
×
1637
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1638
  }
1639

1640
  int64_t nread = taosReadFile(fd, pCurrent + sizeof(SBseSnapMeta), sz);
×
1641
  if (nread != sz) {
×
1642
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1643
  }
1644
  if (taosCloseFile(&fd) != 0) {
×
1645
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1646
  }
1647

1648
  SBseSnapMeta *pMeta = (SBseSnapMeta *)(pCurrent);
×
1649
  pMeta->fileType = BSE_CURRENT_SNAP;
×
1650

1651
  *pValue = pCurrent;
×
1652

1653
  *len = sz + sizeof(SBseSnapMeta);
×
1654
_error:
×
1655
  if (code != 0) {
×
1656
    bseError("vgId:%d, failed to read current at line %d since %s", BSE_VGID(pBse), lino, tstrerror(code));
×
1657
    if (taosCloseFile(&fd) != 0) {
×
1658
      bseError("failed to close file %s since %s", name, tstrerror(terrno));
×
1659
    }
1660
    taosMemoryFree(pCurrent);
×
1661
  }
1662
  return code;
×
1663
}
1664

1665
void tableReaderIterDestroy(STableReaderIter *pIter) {
×
1666
  if (pIter == NULL) return;
×
1667

1668
  taosArrayDestroy(pIter->pMetaHandle);
×
1669
  tableReaderClose(pIter->pTableReader);
×
1670
  blockWrapperCleanup(&pIter->blockWrapper);
×
1671
  destroySubTableMgt(pIter->pSubMgt);
×
1672
  taosMemoryFree(pIter);
×
1673
}
1674

1675
int32_t blockWithMetaInit(SBlock *pBlock, SBlockWithMeta **pMeta) {
×
1676
  int32_t code = 0;
×
1677
  int32_t lino = 0;
×
1678

1679
  SBlockWithMeta *p = taosMemCalloc(1, sizeof(SBlockWithMeta));
×
1680
  if (p == NULL) {
×
1681
    return terrno;
×
1682
  }
1683
  p->pBlock = pBlock;
×
1684
  p->pMeta = taosArrayInit(8, sizeof(SBlockIndexMeta));
×
1685
  if (p->pMeta == NULL) {
×
1686
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1687
  }
1688

1689
  uint8_t *p1 = (uint8_t *)pBlock->data;
×
1690
  uint8_t *p2 = (uint8_t *)p1;
×
1691
  while (p2 - p1 < pBlock->len) {
×
1692
    int64_t         k;
1693
    int32_t         vlen = 0;
×
1694
    SBlockIndexMeta meta = {0};
×
1695
    int32_t         offset = 0;
×
1696
    p2 = taosDecodeVariantI64((void **)p2, &k);
×
1697
    offset = p2 - p1;
×
1698
    p2 = taosDecodeVariantI32((void **)p2, &vlen);
×
1699

1700
    meta.seq = k;
×
1701
    meta.offset = offset;
×
1702
    if (taosArrayPush(p->pMeta, &meta) == NULL) {
×
1703
      TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1704
    }
1705
    p2 += vlen;
×
1706
  }
1707

1708
  *pMeta = p;
×
1709
_error:
×
1710
  if (code != 0) {
×
1711
    bseError("failed to init block with meta since %s", tstrerror(code));
×
1712
    blockWithMetaCleanup(p);
×
1713
  }
1714
  return code;
×
1715
}
1716

1717
void blockWithMetaCleanup(SBlockWithMeta *p) {
×
1718
  if (p == NULL) return;
×
1719
  taosArrayDestroy(p->pMeta);
×
1720
  taosMemoryFree(p);
×
1721
  return;
×
1722
}
1723

1724
int comprareFunc(const void *pLeft, const void *pRight) {
×
1725
  SBlockIndexMeta *p1 = (SBlockIndexMeta *)pLeft;
×
1726
  SBlockIndexMeta *p2 = (SBlockIndexMeta *)pRight;
×
1727
  if (p1->seq > p2->seq) {
×
1728
    return 1;
×
1729
  } else if (p1->seq < p2->seq) {
×
1730
    return -1;
×
1731
  }
1732
  return 0;
×
1733
}
1734

1735
int32_t blockWithMetaSeek(SBlockWithMeta *p, int64_t seq, uint8_t **pValue, int32_t *len) {
×
1736
  int32_t         code = 0;
×
1737
  SBlockIndexMeta key = {.seq = seq, .offset = 0};
×
1738
  int32_t         idx = taosArraySearchIdx(p->pMeta, &seq, comprareFunc, TD_EQ);
×
1739
  if (idx < 0) {
×
1740
    return TSDB_CODE_NOT_FOUND;
×
1741
  }
1742
  SBlockIndexMeta *pMeta = taosArrayGet(p->pMeta, idx);
×
1743
  if (pMeta == NULL) {
×
1744
    return TSDB_CODE_NOT_FOUND;
×
1745
  }
1746

1747
  uint8_t *data = (uint8_t *)p->pBlock->data + pMeta->offset;
×
1748

1749
  data = taosDecodeVariantI32((void *)data, len);
×
1750
  if (*len <= 0) {
×
1751
    return TSDB_CODE_NOT_FOUND;
×
1752
  }
1753
  *pValue = taosMemCalloc(1, *len);
×
1754
  if (*pValue == NULL) {
×
1755
    return terrno;
×
1756
  }
1757
  memcpy(*pValue, data, *len);
×
1758

1759
  return code;
×
1760
}
1761

1762
int32_t tableMetaOpen(char *name, SBTableMeta **pMeta, void *pMetaMgt) {
×
1763
  int32_t code = 0;
×
1764
  int32_t lino = 0;
×
1765

1766
  SBTableMeta *p = taosMemCalloc(1, sizeof(SBTableMeta));
×
1767
  if (p == NULL) {
×
1768
    TSDB_CHECK_CODE(code, lino, _error);
×
1769
  }
1770

1771
  if (name != NULL) {
×
1772
    memcpy(p->name, name, strlen(name) + 1);
×
1773
  }
1774
  p->pBse = ((STableMetaMgt *)pMetaMgt)->pBse;
×
1775

1776
  p->blockCap = BSE_BLOCK_SIZE((SBse *)p->pBse);
×
1777

1778
  *pMeta = p;
×
1779
_error:
×
1780
  if (code != 0) {
×
1781
    bseError("failed to open table meta %s at line %d since %s", name, lino, tstrerror(code));
×
1782
    tableMetaClose(p);
×
1783
  }
1784

1785
  return code;
×
1786
}
1787

1788
int32_t tableMetaCommit(SBTableMeta *pMeta, SArray *pBlock) {
×
1789
  int32_t                code = 0;
×
1790
  int32_t                lino = 0;
×
1791
  SBtableMetaWriter     *pWriter = NULL;
×
1792
  SBtableMetaReader     *pReader = NULL;
×
1793
  SBtableMetaReaderIter *pIter = NULL;
×
1794

1795
  char tempMetaName[TSDB_FILENAME_LEN] = {0};
×
1796
  char metaName[TSDB_FILENAME_LEN] = {0};
×
1797

1798
  char tempMetaPath[TSDB_FILENAME_LEN] = {0};
×
1799
  char metaPath[TSDB_FILENAME_LEN] = {0};
×
1800

1801
  bseBuildTempMetaName(pMeta->timestamp, tempMetaName);
×
1802
  bseBuildMetaName(pMeta->timestamp, metaName);
×
1803

1804
  code = tableMetaWriterInit(pMeta, tempMetaName, &pWriter);
×
1805
  TSDB_CHECK_CODE(code, lino, _error);
×
1806

1807
  code = tableMetaReaderInit(pMeta, metaName, &pReader);
×
1808
  TSDB_CHECK_CODE(code, lino, _error);
×
1809

1810
  code = tableMetaReaderOpenIter(pReader, &pIter);
×
1811
  TSDB_CHECK_CODE(code, lino, _error);
×
1812

1813
  while (!pIter->isOver) {
×
1814
    SBlkHandle    blkHandle = {0};
×
1815
    SBlockWrapper wrapper;
1816

1817
    code = tableMetaReaderIterNext(pIter, &wrapper, &blkHandle);
×
1818
    TSDB_CHECK_CODE(code, lino, _error);
×
1819

1820
    if (pIter->isOver) {
×
1821
      break;
×
1822
    }
1823

1824
    blockWrapperSetType(&wrapper, BSE_TABLE_META_TYPE);
×
1825

1826
    code = tableMetaWriteAppendRawBlock(pWriter, &wrapper, &blkHandle);
×
1827
    TSDB_CHECK_CODE(code, lino, _error);
×
1828

1829
    seqRangeUpdate(&pMeta->range, &blkHandle.range);
×
1830
  }
1831

1832
  code = tableMetaWriterAppendBlock(pWriter, pBlock);
×
1833
  TSDB_CHECK_CODE(code, lino, _error);
×
1834

1835
  code = tableMetaWriterCommit(pWriter);
×
1836
  TSDB_CHECK_CODE(code, lino, _error);
×
1837

1838
  tableMetaWriterClose(pWriter);
×
1839
  tableMetaReaderClose(pReader);
×
1840

1841
  pWriter = NULL;
×
1842
  pReader = NULL;
×
1843

1844
  bseBuildFullName(pMeta->pBse, tempMetaName, tempMetaPath);
×
1845
  bseBuildFullName(pMeta->pBse, metaName, metaPath);
×
1846

1847
  code = taosRenameFile(tempMetaPath, metaPath);
×
1848
  TSDB_CHECK_CODE(code, lino, _error);
×
1849

1850
_error:
×
1851
  if (code != 0) {
×
1852
    bseError("failed to commit table meta %s at line %d since %s", pMeta->name, lino, tstrerror(code));
×
1853
  }
1854
  tableMetaReaderIterClose(pIter);
×
1855
  tableMetaWriterClose(pWriter);
×
1856
  tableMetaReaderClose(pReader);
×
1857

1858
  return code;
×
1859
}
1860
int32_t tableMetaWriterAppendBlock(SBtableMetaWriter *pMeta, SArray *pBlock) {
×
1861
  int32_t code = 0;
×
1862
  if (taosArrayAddAll(pMeta->pBlock, pBlock) == NULL) {
×
1863
    return terrno;
×
1864
  }
1865
  return code;
×
1866
}
1867

1868
int32_t tableMetaWriterFlushBlock(SBtableMetaWriter *pMeta) {
×
1869
  int32_t   code = 0;
×
1870
  int32_t   lino = 0;
×
1871
  SSeqRange range = {.sseq = -1, .eseq = -1};
×
1872

1873
  int64_t offset = 0;
×
1874
  int32_t nWrite = 0;
×
1875
  int32_t size = pMeta->blockCap;
×
1876

1877
  blockWrapperClear(&pMeta->blockWrapper);
×
1878
  code = blockWrapperResize(&pMeta->blockWrapper, size);
×
1879
  TSDB_CHECK_CODE(code, lino, _error);
×
1880

1881
  for (int32_t i = 0; i < taosArrayGetSize(pMeta->pBlock); i++) {
×
1882
    SMetaBlock *pBlk = taosArrayGet(pMeta->pBlock, i);
×
1883
    if (blockEsimateSize(pMeta->blockWrapper.data, sizeof(SMetaBlock)) >= pMeta->blockCap) {
×
1884
      SBlkHandle handle = {.offset = pMeta->offset, .size = offset, .range = range};
×
1885

1886
      blockWrapperSetType(&pMeta->blockWrapper, BSE_TABLE_META_TYPE);
×
1887

1888
      code = tableFlushBlock(pMeta->pFile, &handle, &pMeta->blockWrapper, &nWrite);
×
1889
      TSDB_CHECK_CODE(code, lino, _error);
×
1890

1891
      pMeta->offset += nWrite;
×
1892
      handle.size = nWrite;
×
1893

1894
      blockWrapperClear(&pMeta->blockWrapper);
×
1895
      code = blockWrapperResize(&pMeta->blockWrapper, size);
×
1896
      TSDB_CHECK_CODE(code, lino, _error);
×
1897

1898
      if (taosArrayPush(pMeta->pBlkHandle, &handle) == NULL) {
×
1899
        TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1900
      }
1901
      range.sseq = -1;
×
1902
      offset = 0;
×
1903
    }
1904

1905
    offset += metaBlockAdd(pMeta->blockWrapper.data, pBlk);
×
1906

1907
    if (range.sseq == -1) {
×
1908
      range.sseq = pBlk->range.sseq;
×
1909
    }
1910
    range.eseq = pBlk->range.eseq;
×
1911
  }
1912
  if (offset == 0) {
×
1913
    return 0;
×
1914
  }
1915

1916
  blockWrapperSetType(&pMeta->blockWrapper, BSE_TABLE_META_TYPE);
×
1917

1918
  SBlkHandle handle = {.offset = pMeta->offset, .size = offset, .range = range};
×
1919
  code = tableFlushBlock(pMeta->pFile, &handle, &pMeta->blockWrapper, &nWrite);
×
1920
  TSDB_CHECK_CODE(code, lino, _error);
×
1921

1922
  pMeta->offset += nWrite;
×
1923
  handle.size = nWrite;
×
1924

1925
  if (taosArrayPush(pMeta->pBlkHandle, &handle) == NULL) {
×
1926
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
1927
  }
1928
_error:
×
1929
  if (code != 0) {
×
1930
    bseError("failed to flush table meta %s at line %d since %s", pMeta->name, 0, tstrerror(code));
×
1931
    tableMetaWriterClose(pMeta);
×
1932
  }
1933
  return code;
×
1934
}
1935

1936
int32_t tableMetaWriterFlushIndex(SBtableMetaWriter *pMeta) {
×
1937
  int32_t code = 0;
×
1938
  int32_t lino = 0;
×
1939

1940
  int32_t nWrite = 0;
×
1941
  int64_t lastOffset = pMeta->offset;
×
1942
  int32_t blkHandleSize = 0;
×
1943

1944
  int32_t extra = 8;
×
1945
  int32_t size = taosArrayGetSize(pMeta->pBlkHandle) * sizeof(SBlkHandle);
×
1946

1947
  SSeqRange range = {-1, -1};
×
1948

1949
  blockWrapperClear(&pMeta->blockWrapper);
×
1950
  code = blockWrapperResize(&pMeta->blockWrapper, size + extra);
×
1951
  TSDB_CHECK_CODE(code, lino, _error);
×
1952

1953
  for (int32_t i = 0; i < taosArrayGetSize(pMeta->pBlkHandle); i++) {
×
1954
    SBlkHandle *pHandle = taosArrayGet(pMeta->pBlkHandle, i);
×
1955
    if (pHandle == NULL) {
×
1956
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _error);
×
1957
    }
1958
    blkHandleSize += metaBlockAddIndex(pMeta->blockWrapper.data, pHandle);
×
1959

1960
    seqRangeUpdate(&range, &pHandle->range);
×
1961
  }
1962

1963
  blockWrapperSetType(&pMeta->blockWrapper, BSE_TABLE_META_INDEX_TYPE);
×
1964

1965
  SBlkHandle handle = {.offset = lastOffset, .size = blkHandleSize, .range = range};
×
1966
  code = tableFlushBlock(pMeta->pFile, &handle, &pMeta->blockWrapper, &nWrite);
×
1967
  TSDB_CHECK_CODE(code, lino, _error);
×
1968

1969
  SBlkHandle metaHandle = {.offset = pMeta->offset, .size = nWrite, .range = range};
×
1970
  SBlkHandle indexHandle = {.offset = pMeta->offset + nWrite, .size = 0, .range = range};
×
1971
  pMeta->offset += nWrite;
×
1972

1973
  memcpy(pMeta->footer.metaHandle, &metaHandle, sizeof(SBlkHandle));
×
1974
  memcpy(pMeta->footer.indexHandle, &metaHandle, sizeof(SBlkHandle));
×
1975
_error:
×
1976
  if (code != 0) {
×
1977
    bseError("failed to build table meta index at line %d since %s", lino, tstrerror(code));
×
1978
  }
1979
  return code;
×
1980
}
1981

1982
int32_t tableMetaWriterFlushFooter(SBtableMetaWriter *p) {
×
1983
  char buf[kEncodeLen] = {0};
×
1984

1985
  int32_t code = 0;
×
1986
  int32_t lino = 0;
×
1987

1988
  code = footerEncode(&p->footer, buf);
×
1989
  TSDB_CHECK_CODE(code, lino, _error);
×
1990

1991
  p->offset += sizeof(buf);
×
1992

1993
  int32_t nwrite = taosWriteFile(p->pFile, buf, sizeof(buf));
×
1994
  if (nwrite != sizeof(buf)) {
×
1995
    code = terrno;
×
1996
    TSDB_CHECK_CODE(code, lino, _error);
×
1997
  }
1998

1999
_error:
×
2000
  if (code != 0) {
×
2001
    bseError("failed to add footer to table builder at line %d since %s", lino, tstrerror(code));
×
2002
  }
2003
  return code;
×
2004
}
2005
int32_t tableMetaWriterCommit(SBtableMetaWriter *pMeta) {
×
2006
  int32_t code = 0;
×
2007
  int32_t lino = 0;
×
2008

2009
  code = tableMetaWriterFlushBlock(pMeta);
×
2010
  TSDB_CHECK_CODE(code, lino, _error);
×
2011

2012
  code = tableMetaWriterFlushIndex(pMeta);
×
2013
  TSDB_CHECK_CODE(code, lino, _error);
×
2014

2015
  code = tableMetaWriterFlushFooter(pMeta);
×
2016
  TSDB_CHECK_CODE(code, lino, _error);
×
2017
_error:
×
2018
  if (code != 0) {
×
2019
    bseError("failed to commit table meta %s at line %d since %s", pMeta->name, lino, tstrerror(code));
×
2020
    tableMetaWriterClose(pMeta);
×
2021
  }
2022
  return code;
×
2023
}
2024
int32_t tableMetaWriteAppendRawBlock(SBtableMetaWriter *pMeta, SBlockWrapper *pBlock, SBlkHandle *pBlkHandle) {
×
2025
  int32_t code = 0;
×
2026
  int32_t lino = 0;
×
2027

2028
  int32_t nwrite = 0;
×
2029
  code = tableFlushBlock(pMeta->pFile, pBlkHandle, pBlock, &nwrite);
×
2030
  TSDB_CHECK_CODE(code, lino, _error);
×
2031

2032
  SBlkHandle handle = {.offset = pMeta->offset, .size = nwrite, .range = pBlkHandle->range};
×
2033
  if (taosArrayPush(pMeta->pBlkHandle, &handle) == NULL) {
×
2034
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
2035
  }
2036
  pMeta->offset += nwrite;
×
2037
_error:
×
2038
  if (code != 0) {
×
2039
    bseError("failed to append block to table meta %s at line %d since %s", pMeta->name, lino, tstrerror(code));
×
2040
    tableMetaWriterClose(pMeta);
×
2041
  }
2042
  return code;
×
2043
}
2044

2045
int32_t tableMetaReaderLoadFooter(SBtableMetaReader *pMeta) {
×
2046
  int32_t code = 0;
×
2047
  int32_t lino = 0;
×
2048
  char    footer[kEncodeLen] = {0};
×
2049

2050
  if (pMeta->pFile == NULL) {
×
2051
    return 0;
×
2052
  }
2053
  int64_t n = taosLSeekFile(pMeta->pFile, -kEncodeLen, SEEK_END);
×
2054
  if (n < 0) {
×
2055
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
2056
  }
2057

2058
  if (taosReadFile(pMeta->pFile, footer, kEncodeLen) != kEncodeLen) {
×
2059
    code = terrno;
×
2060
    TSDB_CHECK_CODE(code, lino, _error);
×
2061
  }
2062

2063
  code = footerDecode(&pMeta->footer, footer);
×
2064
  TSDB_CHECK_CODE(code, lino, _error);
×
2065
_error:
×
2066
  if (code != 0) {
×
2067
    bseError("failed to load table meta footer %s at line %d since %s", pMeta->name, lino, tstrerror(code));
×
2068
  }
2069
  return code;
×
2070
}
2071

2072
int32_t tableOpenFile(char *name, int8_t read, TdFilePtr *pFile, int64_t *size) {
×
2073
  int32_t lino = 0;
×
2074
  int32_t code = 0;
×
2075
  int32_t opt = 0;
×
2076

2077
  TdFilePtr p = NULL;
×
2078
  if (read) {
×
2079
    opt = TD_FILE_READ;
×
2080
  } else {
2081
    opt = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_APPEND;
×
2082
  }
2083

2084
  if (!taosCheckExistFile(name)) {
×
2085
    if (read) {
×
2086
      return 0;
×
2087
    }
2088

2089
    p = taosOpenFile(name, opt);
×
2090
    if (p == NULL) {
×
2091
      TSDB_CHECK_CODE(code = terrno, lino, _error);
×
2092
    }
2093

2094
    *pFile = p;
×
2095
    return code;
×
2096
  }
2097

2098
  code = taosStatFile(name, size, NULL, NULL);
×
2099
  TSDB_CHECK_CODE(code, lino, _error);
×
2100
  if (*size <= 0) {
×
2101
    TSDB_CHECK_CODE(code = TSDB_CODE_NOT_FOUND, lino, _error);
×
2102
  }
2103

2104
  p = taosOpenFile(name, opt);
×
2105
  if (p == NULL) {
×
2106
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
2107
  }
2108
  *pFile = p;
×
2109

2110
_error:
×
2111
  if (code != 0) {
×
2112
    bseError("failed to open table meta %s at line %d since %s", name, lino, tstrerror(code));
×
2113
  }
2114
  return code;
×
2115
}
2116
int32_t tableMetaOpenFile(SBtableMetaWriter *pMeta, int8_t read, char *name) {
×
2117
  int32_t code = 0;
×
2118
  int64_t size = 0;
×
2119
  int32_t lino = 0;
×
2120

2121
  code = tableOpenFile(name, read, &pMeta->pFile, &size);
×
2122
  TSDB_CHECK_CODE(code, lino, _error);
×
2123

2124
_error:
×
2125
  if (code != 0) {
×
2126
    bseError("failed to open table meta %s at line %d since %s", pMeta->name, lino, tstrerror(code));
×
2127
  }
2128

2129
  return code;
×
2130
}
2131

2132
int32_t tableMetaReaderLoad(SBtableMetaReader *pMeta) {
×
2133
  int32_t code = 0;
×
2134
  int32_t lino = 0;
×
2135

2136
  code = tableMetaOpenFile(pMeta, 1, pMeta->name);
×
2137
  TSDB_CHECK_CODE(code, lino, _error);
×
2138

2139
  code = tableMetaReaderLoadFooter(pMeta);
×
2140
  TSDB_CHECK_CODE(code, lino, _error);
×
2141

2142
  code = tableMetaReaderLoadIndex(pMeta);
×
2143
  TSDB_CHECK_CODE(code, lino, _error);
×
2144

2145
_error:
×
2146
  if (code != 0) {
×
2147
    bseError("failed to load table meta %s at line %d since %s", pMeta->name, lino, tstrerror(code));
×
2148
  }
2149
  return code;
×
2150
}
2151

2152
void tableMetaClose(SBTableMeta *p) {
×
2153
  if (p == NULL) return;
×
2154
  taosMemoryFree(p);
×
2155
}
2156

2157
int32_t tableMetaWriterInit(SBTableMeta *pMeta, char *name, SBtableMetaWriter **ppWriter) {
×
2158
  int32_t code = 0;
×
2159
  int32_t lino = 0;
×
2160

2161
  char path[TSDB_FILENAME_LEN] = {0};
×
2162
  bseBuildFullName(pMeta->pBse, name, path);
×
2163

2164
  SBtableMetaWriter *p = taosMemCalloc(1, sizeof(SBtableMetaWriter));
×
2165
  if (p == NULL) {
×
2166
    return terrno;
×
2167
  }
2168
  p->pTableMeta = pMeta;
×
2169

2170
  p->blockCap = pMeta->blockCap;
×
2171

2172
  p->pBlkHandle = taosArrayInit(128, sizeof(SBlkHandle));
×
2173
  if (p->pBlkHandle == NULL) {
×
2174
    TSDB_CHECK_CODE(code, lino, _error);
×
2175
  }
2176

2177
  p->pBlock = taosArrayInit(128, sizeof(SMetaBlock));
×
2178
  if (p->pBlock == NULL) {
×
2179
    TSDB_CHECK_CODE(code, lino, _error);
×
2180
  }
2181

2182
  code = blockWrapperInit(&p->blockWrapper, 1024);
×
2183
  TSDB_CHECK_CODE(code, lino, _error);
×
2184

2185
  // Set pBse pointer for encryption/decryption
2186
  p->blockWrapper.pBse = pMeta->pBse;
×
2187

2188
  code = tableMetaOpenFile(p, 0, path);
×
2189
  TSDB_CHECK_CODE(code, lino, _error);
×
2190

2191
  *ppWriter = p;
×
2192
_error:
×
2193
  if (code != 0) {
×
2194
    bseError("failed to init table meta writer %s at line %d since %s", pMeta->name, lino, tstrerror(code));
×
2195
    tableMetaWriterClose(p);
×
2196
  }
2197
  return code;
×
2198
}
2199

2200
void tableMetaWriterClose(SBtableMetaWriter *p) {
×
2201
  if (p == NULL) return;
×
2202
  if (taosCloseFile(&p->pFile) != 0) {
×
2203
    bseError("failed to close table meta writer file since %s", tstrerror(terrno));
×
2204
  }
2205
  taosArrayDestroy(p->pBlkHandle);
×
2206
  taosArrayDestroy(p->pBlock);
×
2207
  blockWrapperCleanup(&p->blockWrapper);
×
2208
  taosMemoryFree(p);
×
2209
}
2210

2211
int32_t tableMetaReaderInit(SBTableMeta *pMeta, char *name, SBtableMetaReader **ppReader) {
×
2212
  int32_t code = 0;
×
2213
  int32_t lino = 0;
×
2214
  char    path[TSDB_FILENAME_LEN] = {0};
×
2215
  bseBuildFullName(pMeta->pBse, name, path);
×
2216

2217
  SBtableMetaReader *p = taosMemCalloc(1, sizeof(SBtableMetaReader));
×
2218
  if (p == NULL) {
×
2219
    return terrno;
×
2220
  }
2221
  memcpy(p->name, path, sizeof(path));
×
2222
  p->pTableMeta = pMeta;
×
2223

2224
  p->pBlkHandle = taosArrayInit(128, sizeof(SBlkHandle));
×
2225
  if (p->pBlkHandle == NULL) {
×
2226
    TSDB_CHECK_CODE(code, lino, _error);
×
2227
  }
2228

2229
  code = blockWrapperInit(&p->blockWrapper, 1024);
×
2230
  TSDB_CHECK_CODE(code, lino, _error);
×
2231

2232
  // Set pBse pointer for encryption/decryption
2233
  p->blockWrapper.pBse = pMeta->pBse;
×
2234

2235
  code = tableMetaReaderLoad(p);
×
2236
  TSDB_CHECK_CODE(code, lino, _error);
×
2237

2238
  *ppReader = p;
×
2239
_error:
×
2240
  if (code != 0) {
×
2241
    bseError("failed to init table meta reader %s at line %d since %s", pMeta->name, lino, tstrerror(code));
×
2242
    tableMetaReaderClose(p);
×
2243
  }
2244
  return code;
×
2245
}
2246

2247
void tableMetaReaderClose(SBtableMetaReader *p) {
×
2248
  if (p == NULL) return;
×
2249
  if (taosCloseFile(&p->pFile) != 0) {
×
2250
    bseError("failed to close table meta reader file since %s", tstrerror(terrno));
×
2251
  }
2252
  taosArrayDestroy(p->pBlkHandle);
×
2253
  blockWrapperCleanup(&p->blockWrapper);
×
2254
  taosMemoryFree(p);
×
2255
}
2256
int32_t tableMetaReaderLoadBlockMeta(SBtableMetaReader *p, int64_t seq, SMetaBlock *pMetaBlock) {
×
2257
  int32_t            code = 0;
×
2258
  int32_t            lino = 0;
×
2259
  SBtableMetaReader *pMeta = p;
×
2260
  SSeqRange          range = {.sseq = seq, .eseq = seq};
×
2261

2262
  SBlkHandle  handle = {.range = range};
×
2263
  int32_t     index = taosArraySearchIdx(p->pBlkHandle, &handle, compareFunc, TD_LE);
×
2264
  SBlkHandle *pHandle = taosArrayGet(p->pBlkHandle, index);
×
2265
  if (pHandle == NULL) {
×
2266
    return TSDB_CODE_NOT_FOUND;
×
2267
  }
2268

2269
  code = tableLoadBlock(p->pFile, pHandle, &p->blockWrapper);
×
2270
  TSDB_CHECK_CODE(code, lino, _error);
×
2271

2272
  code = blockSeekMeta(p->blockWrapper.data, seq, pMetaBlock);
×
2273
  TSDB_CHECK_CODE(code, lino, _error);
×
2274

2275
_error:
×
2276
  return code;
×
2277
}
2278
int32_t tableMetaReaderLoadAllDataHandle(SBtableMetaReader *p, SArray *dataHandle) {
×
2279
  int32_t lino = 0;
×
2280
  int32_t code = 0;
×
2281

2282
  SArray *pMeta = taosArrayInit(8, sizeof(SMetaBlock));
×
2283
  if (pMeta == NULL) {
×
2284
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
2285
  }
2286

2287
  for (int32_t i = 0; i < taosArrayGetSize(p->pBlkHandle); i++) {
×
2288
    SBlkHandle *pHandle = taosArrayGet(p->pBlkHandle, i);
×
2289
    if (pHandle == NULL) {
×
2290
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
2291
    }
2292

2293
    code = tableLoadBlock(p->pFile, pHandle, &p->blockWrapper);
×
2294
    TSDB_CHECK_CODE(code, lino, _exit);
×
2295

2296
    code = blockGetAllMeta(p->blockWrapper.data, pMeta);
×
2297
    TSDB_CHECK_CODE(code, lino, _exit);
×
2298

2299
    if (taosArrayGetSize(pMeta) == 0) {
×
2300
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
2301
    }
2302

2303
    for (int32_t j = 0; j < taosArrayGetSize(pMeta); j++) {
×
2304
      SMetaBlock *pBlk = taosArrayGet(pMeta, j);
×
2305
      SBlkHandle  handle = {.offset = pBlk->offset, .size = pBlk->size, .range = pBlk->range};
×
2306
      if (taosArrayPush(dataHandle, &handle) == NULL) {
×
2307
        TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
2308
      }
2309
    }
2310
  }
2311
_exit:
×
2312
  taosArrayDestroy(pMeta);
×
2313
  return code;
×
2314
}
2315

2316
int32_t tableMetaReaderLoadMetaHandle(SBtableMetaReader *p, SArray *pMetaHandle) {
×
2317
  int32_t code = 0;
×
2318
  int32_t lino = 0;
×
2319

2320
  if (taosArrayGetSize(p->pBlkHandle) == 0) {
×
2321
    return TSDB_CODE_NOT_FOUND;
×
2322
  }
2323

2324
  for (int32_t i = 0; i < taosArrayGetSize(p->pBlkHandle); i++) {
×
2325
    SBlkHandle *pHandle = taosArrayGet(p->pBlkHandle, i);
×
2326
    if (pHandle == NULL) {
×
2327
      return TSDB_CODE_FILE_CORRUPTED;
×
2328
    }
2329
    if (taosArrayPush(pMetaHandle, pHandle) == NULL) {
×
2330
      TSDB_CHECK_CODE(code = terrno, lino, _error);
×
2331
    }
2332
  }
2333
_error:
×
2334
  return code;
×
2335
}
2336

2337
int32_t tableMetaReaderLoadIndex(SBtableMetaReader *p) {
×
2338
  int32_t code = 0;
×
2339
  int32_t lino = 0;
×
2340
  int32_t offset = 0;
×
2341
  SBtableMetaReader *pMeta = p;
×
2342

2343
  if (pMeta->pFile == NULL) {
×
2344
    return 0;
×
2345
  }
2346

2347
  pMeta->blockWrapper.type = BSE_TABLE_META_TYPE;
×
2348

2349
  code = tableLoadBlock(pMeta->pFile, pMeta->footer.metaHandle, &pMeta->blockWrapper);
×
2350
  TSDB_CHECK_CODE(code, lino, _error);
×
2351

2352
  if (blockGetType(p->blockWrapper.data) != BSE_TABLE_META_INDEX_TYPE) {
×
2353
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _error);
×
2354
  }
2355

2356
  SBlock  *pBlk = (SBlock *)pMeta->blockWrapper.data;
×
2357
  uint8_t *data = (uint8_t *)pBlk->data;
×
2358

2359
  do {
2360
    SBlkHandle handle = {0};
×
2361
    offset += blkHandleDecode(&handle, (char *)data + offset);
×
2362
    if (taosArrayPush(pMeta->pBlkHandle, &handle) == NULL) {
×
2363
      TSDB_CHECK_CODE(terrno, lino, _error);
×
2364
    }
2365
  } while (offset < pBlk->len);
×
2366

2367
_error:
×
2368
  if (code != 0) {
×
2369
    bseError("failed to load table meta blk handle %s at line %d since %s", pMeta->name, lino, tstrerror(code));
×
2370
  }
2371
  return code;
×
2372
}
2373

2374
int32_t tableMetaReaderOpenIter(SBtableMetaReader *pReader, SBtableMetaReaderIter **pIter) {
×
2375
  int32_t code = 0;
×
2376
  int32_t lino = 0;
×
2377

2378
  SBtableMetaReaderIter *p = taosMemCalloc(1, sizeof(SBtableMetaReaderIter));
×
2379
  if (p == NULL) {
×
2380
    return terrno;
×
2381
  }
2382
  p->pReader = pReader;
×
2383

2384
  code = blockWrapperInit(&p->pBlockWrapper, 1024);
×
2385
  if (code != 0) {
×
2386
    return code;
×
2387
  }
2388

2389
  // Set pBse pointer for encryption/decryption
2390
  p->pBlockWrapper.pBse = ((SBTableMeta *)pReader->pTableMeta)->pBse;
×
2391

2392
  *pIter = p;
×
2393
  if (taosArrayGetSize(pReader->pBlkHandle) == 0) {
×
2394
    p->isOver = 1;
×
2395
    return 0;
×
2396
  }
2397

2398
  return 0;
×
2399
}
2400

2401
int32_t tableMetaReaderIterNext(SBtableMetaReaderIter *pIter, SBlockWrapper *pDataWrapper, SBlkHandle *dstHandle) {
×
2402
  int32_t code = 0;
×
2403
  int32_t lino = 0;
×
2404

2405
  if (pIter->blkIdx >= taosArrayGetSize(pIter->pReader->pBlkHandle)) {
×
2406
    pIter->isOver = 1;
×
2407
    return 0;
×
2408
  }
2409

2410
  SBlkHandle *pHandle = taosArrayGet(pIter->pReader->pBlkHandle, pIter->blkIdx);
×
2411
  if (pHandle == NULL) {
×
2412
    return TSDB_CODE_FILE_CORRUPTED;
×
2413
  }
2414

2415
  SBlockWrapper *pWrapper = &pIter->pBlockWrapper;
×
2416
  code = blockWrapperResize(pWrapper, pHandle->size);
×
2417
  TSDB_CHECK_CODE(code, lino, _error);
×
2418

2419
  code = tableLoadBlock(pIter->pReader->pFile, pHandle, pWrapper);
×
2420
  TSDB_CHECK_CODE(code, lino, _error);
×
2421

2422
  pIter->blkIdx++;
×
2423

2424
  if (blockGetType(pWrapper->data) != BSE_TABLE_META_TYPE) {
×
2425
    pIter->isOver = 1;
×
2426
    return 0;
×
2427
  }
2428

2429
  *pDataWrapper = *pWrapper;
×
2430
  *dstHandle = *pHandle;
×
2431

2432
_error:
×
2433
  if (code != 0) {
×
2434
    bseError("failed to load table meta blk handle %s at line %d since %s", pIter->pReader->name, lino,
×
2435
             tstrerror(code));
2436
    pIter->pReader = NULL;
×
2437
  }
2438
  return code;
×
2439
}
2440

2441
void tableMetaReaderIterClose(SBtableMetaReaderIter *p) {
×
2442
  if (p == NULL) return;
×
2443
  blockWrapperCleanup(&p->pBlockWrapper);
×
2444
  taosMemoryFree(p);
×
2445
}
2446

2447
int32_t bseMemTableCreate(STableMemTable **pMemTable, int32_t cap) {
×
2448
  int32_t code = 0;
×
2449
  int32_t lino = 0;
×
2450

2451
  STableMemTable *p = taosMemoryCalloc(1, sizeof(STableMemTable));
×
2452
  if (p == NULL) {
×
2453
    return terrno;
×
2454
  }
2455

2456
  p->pMetaHandle = taosArrayInit(8, sizeof(SBlkHandle));
×
2457
  if (p->pMetaHandle == NULL) {
×
2458
    TAOS_CHECK_GOTO(terrno, &lino, _error);
×
2459
  }
2460

2461
  code = blockWrapperInit(&p->pBlockWrapper, cap);
×
2462
  TAOS_CHECK_GOTO(code, &lino, _error);
×
2463

2464
  taosInitRWLatch(&p->latch);
×
2465

2466
  seqRangeReset(&p->range);
×
2467
  seqRangeReset(&p->tableRange);
×
2468
  p->ref = 1;
×
2469
  bseTrace("create mem table %p", p);
×
2470

2471
_error:
×
2472
  if (code != 0) {
×
2473
    bseMemTableDestroy(p);
×
2474
  }
2475
  *pMemTable = p;
×
2476

2477
  return code;
×
2478
}
2479

2480
int32_t bseMemTableRef(STableMemTable *pMemTable) {
×
2481
  int32_t code = 0;
×
2482
  if (pMemTable == NULL) {
×
2483
    return TSDB_CODE_INVALID_CFG;
×
2484
  }
2485

2486
  SBse *pBse = (SBse *)pMemTable->pBse;
×
2487
  bseTrace("ref mem table %p", pMemTable);
×
2488

2489
  int32_t nRef = atomic_fetch_add_32(&pMemTable->ref, 1);
×
2490
  if (nRef <= 0) {
×
2491
    bseError("vgId:%d, memtable ref count is invalid, ref:%d", BSE_VGID(pBse), nRef);
×
2492
    return TSDB_CODE_INVALID_CFG;
×
2493
  }
2494
  return code;
×
2495
}
2496

2497
void bseMemTableUnRef(STableMemTable *pMemTable) {
×
2498
  int32_t code = 0;
×
2499

2500
  bseTrace("unref mem table %p", pMemTable);
×
2501
  if (pMemTable == NULL) {
×
2502
    return;
×
2503
  }
2504
  if (atomic_sub_fetch_32(&pMemTable->ref, 1) == 0) {
×
2505
    bseMemTableDestroy(pMemTable);
×
2506
    bseTrace("destroy mem table %p", pMemTable);
×
2507
  }
2508
}
2509
void bseMemTableDestroy(STableMemTable *pMemTable) {
×
2510
  if (pMemTable == NULL) return;
×
2511
  taosArrayDestroy(pMemTable->pMetaHandle);
×
2512
  blockWrapperCleanup(&pMemTable->pBlockWrapper);
×
2513
  taosMemoryFree(pMemTable);
×
2514
}
2515
int32_t bseMemTablePush(STableMemTable *pMemTable, void *pHandle) {
×
2516
  int32_t code = 0;
×
2517
  if (pMemTable == NULL || pHandle == NULL) {
×
2518
    code = TSDB_CODE_INVALID_PARA;
×
2519
    return code;
×
2520
  }
2521

2522
  if (taosArrayPush(pMemTable->pMetaHandle, pHandle) == NULL) {
×
2523
    code = terrno;
×
2524
    bseError("Failed to push handle to memtable since %s", tstrerror(code));
×
2525

2526
    return code;
×
2527
  }
2528
  return code;
×
2529
}
2530
int32_t bseMemTablGetMetaBlock(STableMemTable *p, SArray **pMetaBlock) {
×
2531
  int32_t inLock = 0;
×
2532
  int32_t lino = 0;
×
2533
  int32_t code = bseMemTableRef(p);
×
2534
  if (code != 0) {
×
2535
    bseError("Failed to ref memtable since %s", tstrerror(code));
×
2536
    return code;
×
2537
  }
2538

2539
  SArray *pBlock = taosArrayInit(8, sizeof(SMetaBlock));
×
2540
  if (pBlock == NULL) {
×
2541
    TSDB_CHECK_CODE(code = terrno, lino, _error);
×
2542
  }
2543
  taosRLockLatch(&p->latch);
×
2544
  inLock = 1;
×
2545

2546
  for (int32_t i = 0; i < taosArrayGetSize(p->pMetaHandle); i++) {
×
2547
    SBlkHandle *handle = taosArrayGet(p->pMetaHandle, i);
×
2548
    SMetaBlock  block = {.type = BSE_TABLE_META_TYPE,
×
2549
                         .version = BSE_DATA_VER,
2550
                         .range = handle->range,
2551
                         .offset = handle->offset,
×
2552
                         .size = handle->size};
×
2553
    if (taosArrayPush(pBlock, &block) == NULL) {
×
2554
      TSDB_CHECK_CODE(code = terrno, lino, _error);
×
2555
    }
2556
  }
2557
_error:
×
2558
  if (inLock) taosRUnLockLatch(&p->latch);
×
2559
  if (code != 0) {
×
2560
    bseError("failed to get meta block from memtable since %s", tstrerror(code));
×
2561
    taosArrayDestroy(pBlock);
×
2562
    pBlock = NULL;
×
2563
  }
2564
  bseMemTableUnRef(p);
×
2565

2566
  *pMetaBlock = pBlock;
×
2567

2568
  return code;
×
2569
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc