• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3545

02 Dec 2024 06:22AM UTC coverage: 60.839% (-0.04%) from 60.88%
#3545

push

travis-ci

web-flow
Merge pull request #28961 from taosdata/fix/refactor-vnode-management-open-vnode

fix/refactor-vnode-management-open-vnode

120592 of 253473 branches covered (47.58%)

Branch coverage included in aggregate %.

102 of 145 new or added lines in 3 files covered. (70.34%)

477 existing lines in 108 files now uncovered.

201840 of 276506 relevant lines covered (73.0%)

19392204.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.67
/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tsdbDataFileRW.h"
17
#include "meta.h"
18

19
// SDataFileReader =============================================
20
struct SDataFileReader {
21
  SDataFileReaderConfig config[1];
22

23
  SBuffer  local[10];
24
  SBuffer *buffers;
25

26
  struct {
27
    bool headFooterLoaded;
28
    bool tombFooterLoaded;
29
    bool brinBlkLoaded;
30
    bool tombBlkLoaded;
31
  } ctx[1];
32

33
  STsdbFD *fd[TSDB_FTYPE_MAX];
34

35
  SHeadFooter   headFooter[1];
36
  STombFooter   tombFooter[1];
37
  TBrinBlkArray brinBlkArray[1];
38
  TTombBlkArray tombBlkArray[1];
39
};
40

41
static int32_t tsdbDataFileReadHeadFooter(SDataFileReader *reader) {
144,829✔
42
  if (reader->ctx->headFooterLoaded) {
144,829!
43
    return 0;
×
44
  }
45

46
  int32_t code = 0;
144,829✔
47
  int32_t lino = 0;
144,829✔
48

49
  int32_t ftype = TSDB_FTYPE_HEAD;
144,829✔
50
  if (reader->fd[ftype]) {
144,829✔
51
    int32_t encryptAlgorithm = reader->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
136,272✔
52
    char   *encryptKey = reader->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
136,272✔
53
#if 1
54
    TAOS_CHECK_GOTO(tsdbReadFile(reader->fd[ftype], reader->config->files[ftype].file.size - sizeof(SHeadFooter),
136,272!
55
                                 (uint8_t *)reader->headFooter, sizeof(SHeadFooter), 0, encryptAlgorithm, encryptKey),
56
                    &lino, _exit);
57
#else
58
    int64_t size = reader->config->files[ftype].file.size;
59
    for (; size > TSDB_FHDR_SIZE; size--) {
60
      code = tsdbReadFile(reader->fd[ftype], size - sizeof(SHeadFooter), (uint8_t *)reader->headFooter,
61
                          sizeof(SHeadFooter), 0, encryptAlgorithm, encryptKey);
62
      if (code) continue;
63
      if (reader->headFooter->brinBlkPtr->offset + reader->headFooter->brinBlkPtr->size + sizeof(SHeadFooter) == size) {
64
        break;
65
      }
66
    }
67
    if (size <= TSDB_FHDR_SIZE) {
68
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
69
    }
70
#endif
71
  }
72

73
  reader->ctx->headFooterLoaded = true;
144,687✔
74

75
_exit:
144,687✔
76
  if (code) {
144,687!
77
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, lino,
×
78
              tstrerror(code));
79
  }
80
  return code;
144,717✔
81
}
82

83
static int32_t tsdbDataFileReadTombFooter(SDataFileReader *reader) {
23,013✔
84
  if (reader->ctx->tombFooterLoaded) {
23,013!
85
    return 0;
×
86
  }
87

88
  int32_t code = 0;
23,013✔
89
  int32_t lino = 0;
23,013✔
90

91
  int32_t ftype = TSDB_FTYPE_TOMB;
23,013✔
92
  if (reader->fd[ftype]) {
23,013✔
93
    int32_t encryptAlgorithm = reader->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
22,581✔
94
    char   *encryptKey = reader->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
22,581✔
95
    TAOS_CHECK_GOTO(tsdbReadFile(reader->fd[ftype], reader->config->files[ftype].file.size - sizeof(STombFooter),
22,581!
96
                                 (uint8_t *)reader->tombFooter, sizeof(STombFooter), 0, encryptAlgorithm, encryptKey),
97
                    &lino, _exit);
98
  }
99
  reader->ctx->tombFooterLoaded = true;
23,013✔
100

101
_exit:
23,013✔
102
  if (code) {
23,013!
103
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, lino,
×
104
              tstrerror(code));
105
  }
106
  return code;
23,013✔
107
}
108

109
int32_t tsdbDataFileReaderOpen(const char *fname[], const SDataFileReaderConfig *config, SDataFileReader **reader) {
156,071✔
110
  int32_t code = 0;
156,071✔
111
  int32_t lino = 0;
156,071✔
112

113
  if ((*reader = taosMemoryCalloc(1, sizeof(**reader))) == NULL) {
156,071!
UNCOV
114
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
115
  }
116

117
  for (int32_t i = 0; i < ARRAY_SIZE(reader[0]->local); i++) {
1,716,262✔
118
    tBufferInit(reader[0]->local + i);
1,560,195✔
119
  }
120

121
  reader[0]->config[0] = config[0];
156,067✔
122
  reader[0]->buffers = config->buffers;
156,067✔
123
  if (reader[0]->buffers == NULL) {
156,067✔
124
    reader[0]->buffers = reader[0]->local;
154,021✔
125
  }
126

127
  if (fname) {
156,067✔
128
    for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) {
769,267✔
129
      if (fname[i]) {
615,396✔
130
        int32_t lcn = config->files[i].file.lcn;
459,668✔
131
        TAOS_CHECK_GOTO(tsdbOpenFile(fname[i], config->tsdb, TD_FILE_READ, &reader[0]->fd[i], lcn), &lino, _exit);
459,668!
132
      }
133
    }
134
  } else {
135
    for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) {
10,869✔
136
      if (config->files[i].exist) {
8,687✔
137
        char fname1[TSDB_FILENAME_LEN];
138
        tsdbTFileName(config->tsdb, &config->files[i].file, fname1);
5,828✔
139
        int32_t lcn = config->files[i].file.lcn;
5,829✔
140
        TAOS_CHECK_GOTO(tsdbOpenFile(fname1, config->tsdb, TD_FILE_READ, &reader[0]->fd[i], lcn), &lino, _exit);
5,829!
141
      }
142
    }
143
  }
144

145
_exit:
2,182✔
146
  if (code) {
156,053!
147
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(config->tsdb->pVnode), __func__, __FILE__, lino,
×
148
              tstrerror(code));
149
  }
150
  return code;
156,031✔
151
}
152

153
void tsdbDataFileReaderClose(SDataFileReader **reader) {
176,714✔
154
  if (reader[0] == NULL) {
176,714✔
155
    return;
20,781✔
156
  }
157

158
  TARRAY2_DESTROY(reader[0]->tombBlkArray, NULL);
155,933✔
159
  TARRAY2_DESTROY(reader[0]->brinBlkArray, NULL);
155,933✔
160

161
  for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) {
780,264✔
162
    if (reader[0]->fd[i]) {
624,029✔
163
      tsdbCloseFile(&reader[0]->fd[i]);
465,476✔
164
    }
165
  }
166

167
  for (int32_t i = 0; i < ARRAY_SIZE(reader[0]->local); ++i) {
1,716,653✔
168
    tBufferDestroy(reader[0]->local + i);
1,560,420✔
169
  }
170

171
  taosMemoryFree(reader[0]);
156,233✔
172
  reader[0] = NULL;
156,064✔
173
}
174

175
int32_t tsdbDataFileReadBrinBlk(SDataFileReader *reader, const TBrinBlkArray **brinBlkArray) {
144,850✔
176
  int32_t code = 0;
144,850✔
177
  int32_t lino = 0;
144,850✔
178
  void   *data = NULL;
144,850✔
179

180
  if (!reader->ctx->brinBlkLoaded) {
144,850!
181
    TAOS_CHECK_GOTO(tsdbDataFileReadHeadFooter(reader), &lino, _exit);
144,857!
182

183
    if (reader->headFooter->brinBlkPtr->size > 0) {
144,740✔
184
      data = taosMemoryMalloc(reader->headFooter->brinBlkPtr->size);
136,169✔
185
      if (data == NULL) {
136,262!
186
        TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
187
      }
188

189
      int32_t encryptAlgorithm = reader->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
136,262✔
190
      char   *encryptKey = reader->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
136,262✔
191

192
      TAOS_CHECK_GOTO(tsdbReadFile(reader->fd[TSDB_FTYPE_HEAD], reader->headFooter->brinBlkPtr->offset, data,
136,262!
193
                                   reader->headFooter->brinBlkPtr->size, 0, encryptAlgorithm, encryptKey),
194
                      &lino, _exit);
195

196
      int32_t size = reader->headFooter->brinBlkPtr->size / sizeof(SBrinBlk);
136,240✔
197
      TARRAY2_INIT_EX(reader->brinBlkArray, size, size, data);
136,240✔
198
    } else {
199
      TARRAY2_INIT(reader->brinBlkArray);
8,571✔
200
    }
201

202
    reader->ctx->brinBlkLoaded = true;
144,811✔
203
  }
204
  brinBlkArray[0] = reader->brinBlkArray;
144,804✔
205

206
_exit:
144,804✔
207
  if (code) {
144,804!
208
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, lino,
×
209
              tstrerror(code));
210
    taosMemoryFree(data);
×
211
  }
212
  return code;
144,815✔
213
}
214

215
int32_t tsdbDataFileReadBrinBlock(SDataFileReader *reader, const SBrinBlk *brinBlk, SBrinBlock *brinBlock) {
201,553✔
216
  int32_t code = 0;
201,553✔
217
  int32_t lino = 0;
201,553✔
218

219
  SBuffer *buffer = reader->buffers + 0;
201,553✔
220
  SBuffer *assist = reader->buffers + 1;
201,553✔
221

222
  int32_t encryptAlgorithm = reader->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
201,553✔
223
  char   *encryptKey = reader->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
201,553✔
224
  // load data
225
  tBufferClear(buffer);
226
  TAOS_CHECK_GOTO(tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_HEAD], brinBlk->dp->offset, brinBlk->dp->size, buffer, 0,
201,553!
227
                                       encryptAlgorithm, encryptKey),
228
                  &lino, _exit);
229

230
  // decode brin block
231
  SBufferReader br = BUFFER_READER_INITIALIZER(0, buffer);
201,648✔
232
  tBrinBlockClear(brinBlock);
201,648✔
233
  brinBlock->numOfPKs = brinBlk->numOfPKs;
201,690✔
234
  brinBlock->numOfRecords = brinBlk->numRec;
201,690✔
235
  for (int32_t i = 0; i < 10; i++) {  // int64_t
2,216,962✔
236

237
    SCompressInfo cinfo = {
2,015,209✔
238
        .cmprAlg = brinBlk->cmprAlg,
2,015,209✔
239
        .dataType = TSDB_DATA_TYPE_BIGINT,
240
        .compressedSize = brinBlk->size[i],
2,015,209✔
241
        .originalSize = brinBlk->numRec * sizeof(int64_t),
2,015,209✔
242
    };
243
    TAOS_CHECK_GOTO(tDecompressDataToBuffer(BR_PTR(&br), &cinfo, brinBlock->buffers + i, assist), &lino, _exit);
2,015,209!
244
    br.offset += brinBlk->size[i];
2,015,272✔
245
  }
246

247
  for (int32_t i = 10; i < 15; i++) {  // int32_t
1,209,750✔
248
    SCompressInfo cinfo = {
1,008,048✔
249
        .cmprAlg = brinBlk->cmprAlg,
1,008,048✔
250
        .dataType = TSDB_DATA_TYPE_INT,
251
        .compressedSize = brinBlk->size[i],
1,008,048✔
252
        .originalSize = brinBlk->numRec * sizeof(int32_t),
1,008,048✔
253
    };
254
    TAOS_CHECK_GOTO(tDecompressDataToBuffer(BR_PTR(&br), &cinfo, brinBlock->buffers + i, assist), &lino, _exit);
1,008,048!
255
    br.offset += brinBlk->size[i];
1,007,997✔
256
  }
257

258
  // primary keys
259
  if (brinBlk->numOfPKs > 0) {  // decode the primary keys
201,702✔
260
    SValueColumnCompressInfo firstInfos[TD_MAX_PK_COLS];
261
    SValueColumnCompressInfo lastInfos[TD_MAX_PK_COLS];
262

263
    for (int32_t i = 0; i < brinBlk->numOfPKs; i++) {
11,366✔
264
      TAOS_CHECK_GOTO(tValueColumnCompressInfoDecode(&br, firstInfos + i), &lino, _exit);
5,683!
265
    }
266
    for (int32_t i = 0; i < brinBlk->numOfPKs; i++) {
11,366✔
267
      TAOS_CHECK_GOTO(tValueColumnCompressInfoDecode(&br, lastInfos + i), &lino, _exit);
5,683!
268
    }
269

270
    for (int32_t i = 0; i < brinBlk->numOfPKs; i++) {
11,366✔
271
      SValueColumnCompressInfo *info = firstInfos + i;
5,683✔
272

273
      TAOS_CHECK_GOTO(tValueColumnDecompress(BR_PTR(&br), info, brinBlock->firstKeyPKs + i, assist), &lino, _exit);
5,683!
274
      br.offset += (info->offsetCompressedSize + info->dataCompressedSize);
5,683✔
275
    }
276

277
    for (int32_t i = 0; i < brinBlk->numOfPKs; i++) {
11,366✔
278
      SValueColumnCompressInfo *info = lastInfos + i;
5,683✔
279

280
      TAOS_CHECK_GOTO(tValueColumnDecompress(BR_PTR(&br), info, brinBlock->lastKeyPKs + i, assist), &lino, _exit);
5,683!
281
      br.offset += (info->offsetCompressedSize + info->dataCompressedSize);
5,683✔
282
    }
283
  }
284

285
  if (br.offset != br.buffer->size) {
201,702!
286
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
287
  }
288

289
_exit:
201,702✔
290
  if (code) {
201,702!
291
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, lino,
×
292
              tstrerror(code));
293
  }
294
  return code;
201,681✔
295
}
296

297
extern int32_t tBlockDataDecompress(SBufferReader *br, SBlockData *blockData, SBuffer *assist);
298

299
int32_t tsdbDataFileReadBlockData(SDataFileReader *reader, const SBrinRecord *record, SBlockData *bData) {
7,935✔
300
  int32_t code = 0;
7,935✔
301
  int32_t lino = 0;
7,935✔
302

303
  SBuffer *buffer = reader->buffers + 0;
7,935✔
304
  SBuffer *assist = reader->buffers + 1;
7,935✔
305

306
  int32_t encryptAlgorithm = reader->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
7,935✔
307
  char   *encryptKey = reader->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
7,935✔
308
  // load data
309
  tBufferClear(buffer);
310
  TAOS_CHECK_GOTO(tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_DATA], record->blockOffset, record->blockSize, buffer, 0,
7,935!
311
                                       encryptAlgorithm, encryptKey),
312
                  &lino, _exit);
313

314
  // decompress
315
  SBufferReader br = BUFFER_READER_INITIALIZER(0, buffer);
7,935✔
316
  TAOS_CHECK_GOTO(tBlockDataDecompress(&br, bData, assist), &lino, _exit);
7,935!
317

318
  if (br.offset != buffer->size) {
7,935!
319
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
320
  }
321

322
_exit:
7,935✔
323
  if (code) {
7,935!
324
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, lino,
×
325
              tstrerror(code));
326
  }
327
  return code;
7,935✔
328
}
329

330
int32_t tsdbDataFileReadBlockDataByColumn(SDataFileReader *reader, const SBrinRecord *record, SBlockData *bData,
3,159,675✔
331
                                          STSchema *pTSchema, int16_t cids[], int32_t ncid) {
332
  int32_t code = 0;
3,159,675✔
333
  int32_t lino = 0;
3,159,675✔
334

335
  SDiskDataHdr hdr;
336
  SBuffer     *buffer0 = reader->buffers + 0;
3,159,675✔
337
  SBuffer     *buffer1 = reader->buffers + 1;
3,159,675✔
338
  SBuffer     *assist = reader->buffers + 2;
3,159,675✔
339

340
  int32_t encryptAlgorithm = reader->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
3,159,675✔
341
  char   *encryptKey = reader->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
3,159,675✔
342
  // load key part
343
  tBufferClear(buffer0);
344
  TAOS_CHECK_GOTO(tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_DATA], record->blockOffset, record->blockKeySize, buffer0,
3,159,675!
345
                                       0, encryptAlgorithm, encryptKey),
346
                  &lino, _exit);
347

348
  // SDiskDataHdr
349
  SBufferReader br = BUFFER_READER_INITIALIZER(0, buffer0);
3,159,693✔
350
  TAOS_CHECK_GOTO(tGetDiskDataHdr(&br, &hdr), &lino, _exit);
3,159,693!
351

352
  if (hdr.delimiter != TSDB_FILE_DLMT) {
3,159,737!
353
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
354
  }
355

356
  tBlockDataReset(bData);
3,159,737✔
357
  bData->suid = hdr.suid;
3,159,530✔
358
  bData->uid = hdr.uid;
3,159,530✔
359
  bData->nRow = hdr.nRow;
3,159,530✔
360

361
  // Key part
362
  TAOS_CHECK_GOTO(tBlockDataDecompressKeyPart(&hdr, &br, bData, assist), &lino, _exit);
3,159,530!
363
  if (br.offset != buffer0->size) {
3,159,876!
364
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
365
  }
366

367
  int extraColIdx = -1;
3,159,876✔
368
  for (int i = 0; i < ncid; i++) {
3,297,276✔
369
    if (tBlockDataGetColData(bData, cids[i]) == NULL) {
2,058,549✔
370
      extraColIdx = i;
1,921,152✔
371
      break;
1,921,152✔
372
    }
373
  }
374

375
  if (extraColIdx < 0) {
3,159,879✔
376
    goto _exit;
1,238,640✔
377
  }
378

379
  // load SBlockCol part
380
  tBufferClear(buffer0);
381
  TAOS_CHECK_GOTO(tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_DATA], record->blockOffset + record->blockKeySize,
1,921,239!
382
                                       hdr.szBlkCol, buffer0, 0, encryptAlgorithm, encryptKey),
383
                  &lino, _exit);
384

385
  // calc szHint
386
  int64_t szHint = 0;
1,921,221✔
387
  int     extraCols = 1;
1,921,221✔
388
  for (int i = extraColIdx + 1; i < ncid; ++i) {
1,921,216✔
389
    if (tBlockDataGetColData(bData, cids[i]) == NULL) {
112,222!
390
      ++extraCols;
112,223✔
391
      break;
112,223✔
392
    }
393
  }
394

395
  if (extraCols >= 2) {
1,921,217✔
396
    br = BUFFER_READER_INITIALIZER(0, buffer0);
112,221✔
397

398
    SBlockCol blockCol = {.cid = 0};
112,221✔
399
    for (int32_t i = extraColIdx; i < ncid; ++i) {
112,221✔
400
      int16_t extraColCid = cids[i];
112,179✔
401

402
      while (extraColCid > blockCol.cid) {
224,363✔
403
        if (br.offset >= buffer0->size) {
112,228!
404
          blockCol.cid = INT16_MAX;
×
405
          break;
×
406
        }
407

408
        TAOS_CHECK_GOTO(tGetBlockCol(&br, &blockCol, hdr.fmtVer, hdr.cmprAlg), &lino, _exit);
112,228!
409
      }
410

411
      if (extraColCid == blockCol.cid || blockCol.cid == INT16_MAX) {
112,135!
412
        extraColIdx = i;
112,135✔
413
        break;
112,135✔
414
      }
415
    }
416

417
    if (blockCol.cid > 0 && blockCol.cid < INT16_MAX /*&& blockCol->flag == HAS_VALUE*/) {
112,177!
418
      int64_t   offset = blockCol.offset;
112,182✔
419
      SBlockCol lastNonNoneBlockCol = {.cid = 0};
112,182✔
420

421
      for (int32_t i = extraColIdx; i < ncid; ++i) {
873,160✔
422
        int16_t extraColCid = cids[i];
763,845✔
423

424
        while (extraColCid > blockCol.cid) {
1,418,288✔
425
          if (br.offset >= buffer0->size) {
657,310✔
426
            blockCol.cid = INT16_MAX;
2,580✔
427
            break;
2,580✔
428
          }
429

430
          TAOS_CHECK_GOTO(tGetBlockCol(&br, &blockCol, hdr.fmtVer, hdr.cmprAlg), &lino, _exit);
654,730!
431
        }
432

433
        if (extraColCid == blockCol.cid) {
763,558✔
434
          lastNonNoneBlockCol = blockCol;
760,959✔
435
          continue;
760,959✔
436
        }
437

438
        if (blockCol.cid == INT16_MAX) {
2,599✔
439
          break;
2,580✔
440
        }
441
      }
442

443
      if (lastNonNoneBlockCol.cid > 0) {
111,895!
444
        szHint = lastNonNoneBlockCol.offset + lastNonNoneBlockCol.szBitmap + lastNonNoneBlockCol.szOffset +
112,247✔
445
                 lastNonNoneBlockCol.szValue - offset;
112,247✔
446
      }
447
    }
448
  }
449

450
  // load each column
451
  SBlockCol blockCol = {
1,920,886✔
452
      .cid = 0,
453
  };
454
  bool firstRead = true;
1,920,886✔
455
  br = BUFFER_READER_INITIALIZER(0, buffer0);
1,920,886✔
456
  for (int32_t i = 0; i < ncid; i++) {
4,622,232✔
457
    int16_t cid = cids[i];
2,701,327✔
458

459
    if (tBlockDataGetColData(bData, cid)) {  // already loaded
2,701,327✔
460
      continue;
105,605✔
461
    }
462

463
    while (cid > blockCol.cid) {
5,926,401✔
464
      if (br.offset >= buffer0->size) {
3,333,575✔
465
        blockCol.cid = INT16_MAX;
2,580✔
466
        break;
2,580✔
467
      }
468

469
      TAOS_CHECK_GOTO(tGetBlockCol(&br, &blockCol, hdr.fmtVer, hdr.cmprAlg), &lino, _exit);
3,330,995!
470
    }
471

472
    if (cid < blockCol.cid) {
2,595,406✔
473
      const STColumn *tcol = tTSchemaSearchColumn(pTSchema, cid);
25,798✔
474
      TSDB_CHECK_NULL(tcol, code, lino, _exit, TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER);
25,798!
475
      SBlockCol none = {
25,798✔
476
          .cid = cid,
477
          .type = tcol->type,
25,798✔
478
          .cflag = tcol->flags,
25,798✔
479
          .flag = HAS_NONE,
480
          .szOrigin = 0,
481
          .szBitmap = 0,
482
          .szOffset = 0,
483
          .szValue = 0,
484
          .offset = 0,
485
      };
486
      TAOS_CHECK_GOTO(tBlockDataDecompressColData(&hdr, &none, &br, bData, assist), &lino, _exit);
25,798!
487
    } else if (cid == blockCol.cid) {
2,569,608!
488
      int32_t encryptAlgorithm = reader->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
2,569,685✔
489
      char   *encryptKey = reader->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
2,569,685✔
490
      // load from file
491
      tBufferClear(buffer1);
492
      TAOS_CHECK_GOTO(tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_DATA],
2,569,685!
493
                                           record->blockOffset + record->blockKeySize + hdr.szBlkCol + blockCol.offset,
494
                                           blockCol.szBitmap + blockCol.szOffset + blockCol.szValue, buffer1,
495
                                           firstRead ? szHint : 0, encryptAlgorithm, encryptKey),
496
                      &lino, _exit);
497

498
      firstRead = false;
2,570,166✔
499

500
      // decode the buffer
501
      SBufferReader br1 = BUFFER_READER_INITIALIZER(0, buffer1);
2,570,166✔
502
      TAOS_CHECK_GOTO(tBlockDataDecompressColData(&hdr, &blockCol, &br1, bData, assist), &lino, _exit);
2,570,166!
503
    }
504
  }
505

506
_exit:
1,920,905✔
507
  if (code) {
3,159,545!
508
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, lino,
×
509
              tstrerror(code));
510
  }
511
  return code;
3,159,829✔
512
}
513

514
int32_t tsdbDataFileReadBlockSma(SDataFileReader *reader, const SBrinRecord *record,
2,494,397✔
515
                                 TColumnDataAggArray *columnDataAggArray) {
516
  int32_t  code = 0;
2,494,397✔
517
  int32_t  lino = 0;
2,494,397✔
518
  SBuffer *buffer = reader->buffers + 0;
2,494,397✔
519

520
  TARRAY2_CLEAR(columnDataAggArray, NULL);
2,494,397✔
521
  if (record->smaSize > 0) {
2,494,397!
522
    tBufferClear(buffer);
523
    int32_t encryptAlgorithm = reader->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
2,494,397✔
524
    char   *encryptKey = reader->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
2,494,397✔
525
    TAOS_CHECK_GOTO(tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_SMA], record->smaOffset, record->smaSize, buffer, 0,
2,494,397!
526
                                         encryptAlgorithm, encryptKey),
527
                    &lino, _exit);
528

529
    // decode sma data
530
    SBufferReader br = BUFFER_READER_INITIALIZER(0, buffer);
2,494,397✔
531
    while (br.offset < record->smaSize) {
9,977,874✔
532
      SColumnDataAgg sma[1];
533

534
      TAOS_CHECK_GOTO(tGetColumnDataAgg(&br, sma), &lino, _exit);
7,483,482!
535
      TAOS_CHECK_GOTO(TARRAY2_APPEND_PTR(columnDataAggArray, sma), &lino, _exit);
14,966,954!
536
    }
537
    if (br.offset != record->smaSize) {
2,494,392!
538
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
539
    }
540
  }
541

UNCOV
542
_exit:
×
543
  if (code) {
2,494,392!
544
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, lino,
×
545
              tstrerror(code));
546
  }
547
  return code;
2,494,396✔
548
}
549

550
int32_t tsdbDataFileReadTombBlk(SDataFileReader *reader, const TTombBlkArray **tombBlkArray) {
35,316✔
551
  int32_t code = 0;
35,316✔
552
  int32_t lino = 0;
35,316✔
553
  void   *data = NULL;
35,316✔
554

555
  if (!reader->ctx->tombBlkLoaded) {
35,316✔
556
    TAOS_CHECK_GOTO(tsdbDataFileReadTombFooter(reader), &lino, _exit);
23,013!
557

558
    if (reader->tombFooter->tombBlkPtr->size > 0) {
23,013✔
559
      if ((data = taosMemoryMalloc(reader->tombFooter->tombBlkPtr->size)) == NULL) {
22,581!
560
        TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
561
      }
562

563
      int32_t encryptAlgorithm = reader->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
22,581✔
564
      char   *encryptKey = reader->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
22,581✔
565
      TAOS_CHECK_GOTO(tsdbReadFile(reader->fd[TSDB_FTYPE_TOMB], reader->tombFooter->tombBlkPtr->offset, data,
22,581!
566
                                   reader->tombFooter->tombBlkPtr->size, 0, encryptAlgorithm, encryptKey),
567
                      &lino, _exit);
568

569
      int32_t size = reader->tombFooter->tombBlkPtr->size / sizeof(STombBlk);
22,581✔
570
      TARRAY2_INIT_EX(reader->tombBlkArray, size, size, data);
22,581✔
571
    } else {
572
      TARRAY2_INIT(reader->tombBlkArray);
432✔
573
    }
574

575
    reader->ctx->tombBlkLoaded = true;
23,013✔
576
  }
577
  tombBlkArray[0] = reader->tombBlkArray;
35,316✔
578

579
_exit:
35,316✔
580
  if (code) {
35,316!
581
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, lino,
×
582
              tstrerror(code));
583
    taosMemoryFree(data);
×
584
  }
585
  return code;
35,316✔
586
}
587

588
int32_t tsdbDataFileReadTombBlock(SDataFileReader *reader, const STombBlk *tombBlk, STombBlock *tData) {
25,406✔
589
  int32_t code = 0;
25,406✔
590
  int32_t lino = 0;
25,406✔
591

592
  SBuffer *buffer0 = reader->buffers + 0;
25,406✔
593
  SBuffer *assist = reader->buffers + 1;
25,406✔
594

595
  tBufferClear(buffer0);
596
  int32_t encryptAlgorithm = reader->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
25,406✔
597
  char   *encryptKey = reader->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
25,406✔
598
  TAOS_CHECK_GOTO(tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_TOMB], tombBlk->dp->offset, tombBlk->dp->size, buffer0, 0,
25,406!
599
                                       encryptAlgorithm, encryptKey),
600
                  &lino, _exit);
601

602
  int32_t       size = 0;
25,406✔
603
  SBufferReader br = BUFFER_READER_INITIALIZER(0, buffer0);
25,406✔
604
  tTombBlockClear(tData);
25,406✔
605
  tData->numOfRecords = tombBlk->numRec;
25,406✔
606
  for (int32_t i = 0; i < ARRAY_SIZE(tData->buffers); ++i) {
152,436✔
607
    SCompressInfo cinfo = {
127,030✔
608
        .cmprAlg = tombBlk->cmprAlg,
127,030✔
609
        .dataType = TSDB_DATA_TYPE_BIGINT,
610
        .originalSize = tombBlk->numRec * sizeof(int64_t),
127,030✔
611
        .compressedSize = tombBlk->size[i],
127,030✔
612
    };
613
    TAOS_CHECK_GOTO(tDecompressDataToBuffer(BR_PTR(&br), &cinfo, tData->buffers + i, assist), &lino, _exit);
127,030!
614
    br.offset += tombBlk->size[i];
127,030✔
615
  }
616

617
_exit:
25,406✔
618
  if (code) {
25,406!
619
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, lino,
×
620
              tstrerror(code));
621
  }
622
  return code;
25,406✔
623
}
624

625
// SDataFileWriter =============================================
626
struct SDataFileWriter {
627
  SDataFileWriterConfig config[1];
628

629
  SSkmInfo skmTb[1];
630
  SSkmInfo skmRow[1];
631
  SBuffer  local[10];
632
  SBuffer *buffers;
633

634
  struct {
635
    bool             opened;
636
    SDataFileReader *reader;
637

638
    // for ts data
639
    TABLEID tbid[1];
640
    bool    tbHasOldData;
641

642
    const TBrinBlkArray *brinBlkArray;
643
    int32_t              brinBlkArrayIdx;
644
    SBrinBlock           brinBlock[1];
645
    int32_t              brinBlockIdx;
646
    SBlockData           blockData[1];
647
    int32_t              blockDataIdx;
648
    // for tomb data
649
    bool                 hasOldTomb;
650
    const TTombBlkArray *tombBlkArray;
651
    int32_t              tombBlkArrayIdx;
652
    STombBlock           tombBlock[1];
653
    int32_t              tombBlockIdx;
654
    // range
655
    SVersionRange range;
656
    SVersionRange tombRange;
657
  } ctx[1];
658

659
  STFile   files[TSDB_FTYPE_MAX];
660
  STsdbFD *fd[TSDB_FTYPE_MAX];
661

662
  SHeadFooter headFooter[1];
663
  STombFooter tombFooter[1];
664

665
  TBrinBlkArray brinBlkArray[1];
666
  SBrinBlock    brinBlock[1];
667
  SBlockData    blockData[1];
668

669
  TTombBlkArray tombBlkArray[1];
670
  STombBlock    tombBlock[1];
671
};
672

673
static int32_t tsdbDataFileWriterCloseAbort(SDataFileWriter *writer) {
×
674
  tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, __LINE__,
×
675
            "not implemented");
676
  return 0;
×
677
}
678

679
static void tsdbDataFileWriterDoClose(SDataFileWriter *writer) {
4,394✔
680
  if (writer->ctx->reader) {
4,394✔
681
    tsdbDataFileReaderClose(&writer->ctx->reader);
2,033✔
682
  }
683

684
  tTombBlockDestroy(writer->tombBlock);
4,394✔
685
  TARRAY2_DESTROY(writer->tombBlkArray, NULL);
4,394✔
686
  tBlockDataDestroy(writer->blockData);
4,394✔
687
  tBrinBlockDestroy(writer->brinBlock);
4,394✔
688
  TARRAY2_DESTROY(writer->brinBlkArray, NULL);
4,394✔
689

690
  tTombBlockDestroy(writer->ctx->tombBlock);
4,394✔
691
  tBlockDataDestroy(writer->ctx->blockData);
4,394✔
692
  tBrinBlockDestroy(writer->ctx->brinBlock);
4,394✔
693

694
  for (int32_t i = 0; i < ARRAY_SIZE(writer->local); ++i) {
48,334✔
695
    tBufferDestroy(writer->local + i);
43,940!
696
  }
697

698
  tDestroyTSchema(writer->skmRow->pTSchema);
4,394!
699
  tDestroyTSchema(writer->skmTb->pTSchema);
4,394!
700
}
4,394✔
701

702
static int32_t tsdbDataFileWriterDoOpenReader(SDataFileWriter *writer) {
4,393✔
703
  int32_t code = 0;
4,393✔
704
  int32_t lino = 0;
4,393✔
705

706
  for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) {
15,410✔
707
    if (writer->config->files[i].exist) {
13,049✔
708
      SDataFileReaderConfig config[1] = {{
2,032✔
709
          .tsdb = writer->config->tsdb,
2,032✔
710
          .szPage = writer->config->szPage,
2,032✔
711
          .buffers = writer->buffers,
2,032✔
712
      }};
713

714
      for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) {
10,158✔
715
        config->files[i].exist = writer->config->files[i].exist;
8,126✔
716
        if (config->files[i].exist) {
8,126✔
717
          config->files[i].file = writer->config->files[i].file;
5,671✔
718
        }
719
      }
720

721
      TAOS_CHECK_GOTO(tsdbDataFileReaderOpen(NULL, config, &writer->ctx->reader), &lino, _exit);
2,032!
722
      break;
2,033✔
723
    }
724
  }
725

726
_exit:
2,361✔
727
  if (code) {
4,394!
728
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
729
              tstrerror(code));
730
  }
731
  return code;
4,393✔
732
}
733

734
static int32_t tsdbDataFileWriterDoOpen(SDataFileWriter *writer) {
4,394✔
735
  int32_t code = 0;
4,394✔
736
  int32_t lino = 0;
4,394✔
737
  int32_t ftype;
738

739
  if (!writer->config->skmTb) writer->config->skmTb = writer->skmTb;
4,394!
740
  if (!writer->config->skmRow) writer->config->skmRow = writer->skmRow;
4,394!
741
  writer->buffers = writer->config->buffers;
4,394✔
742
  if (writer->buffers == NULL) {
4,394!
743
    writer->buffers = writer->local;
×
744
  }
745

746
  // open reader
747
  TAOS_CHECK_GOTO(tsdbDataFileWriterDoOpenReader(writer), &lino, _exit);
4,394!
748

749
  // .head
750
  ftype = TSDB_FTYPE_HEAD;
4,393✔
751
  writer->files[ftype] = (STFile){
4,393✔
752
      .type = ftype,
753
      .did = writer->config->did,
4,393✔
754
      .fid = writer->config->fid,
4,393✔
755
      .cid = writer->config->cid,
4,393✔
756
      .size = 0,
757
      .minVer = VERSION_MAX,
758
      .maxVer = VERSION_MIN,
759
  };
760

761
  // .data
762
  ftype = TSDB_FTYPE_DATA;
4,393✔
763
  if (writer->config->files[ftype].exist) {
4,393✔
764
    writer->files[ftype] = writer->config->files[ftype].file;
1,507✔
765
  } else {
766
    writer->files[ftype] = (STFile){
5,772✔
767
        .type = ftype,
768
        .did = writer->config->did,
2,886✔
769
        .fid = writer->config->fid,
2,886✔
770
        .cid = writer->config->cid,
2,886✔
771
        .size = 0,
772
        .lcn = writer->config->lcn == -1 ? 0 : -1,
2,886✔
773
        .minVer = VERSION_MAX,
774
        .maxVer = VERSION_MIN,
775
    };
776
  }
777

778
  // .sma
779
  ftype = TSDB_FTYPE_SMA;
4,393✔
780
  if (writer->config->files[ftype].exist) {
4,393✔
781
    writer->files[ftype] = writer->config->files[ftype].file;
1,507✔
782
  } else {
783
    writer->files[ftype] = (STFile){
2,886✔
784
        .type = ftype,
785
        .did = writer->config->did,
2,886✔
786
        .fid = writer->config->fid,
2,886✔
787
        .cid = writer->config->cid,
2,886✔
788
        .size = 0,
789
        .minVer = VERSION_MAX,
790
        .maxVer = VERSION_MIN,
791
    };
792
  }
793

794
  // .tomb
795
  ftype = TSDB_FTYPE_TOMB;
4,393✔
796
  writer->files[ftype] = (STFile){
4,393✔
797
      .type = ftype,
798
      .did = writer->config->did,
4,393✔
799
      .fid = writer->config->fid,
4,393✔
800
      .cid = writer->config->cid,
4,393✔
801
      .size = 0,
802
      .minVer = VERSION_MAX,
803
      .maxVer = VERSION_MIN,
804
  };
805

806
  // range
807
  writer->ctx->range = (SVersionRange){.minVer = VERSION_MAX, .maxVer = VERSION_MIN};
4,393✔
808
  writer->ctx->tombRange = (SVersionRange){.minVer = VERSION_MAX, .maxVer = VERSION_MIN};
4,393✔
809

810
  writer->ctx->opened = true;
4,393✔
811

812
_exit:
4,393✔
813
  if (code) {
4,393!
814
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
815
              tstrerror(code));
816
  }
817
  return code;
4,393✔
818
}
819

820
void tsdbWriterUpdVerRange(SVersionRange *range, int64_t minVer, int64_t maxVer) {
627,577✔
821
  range->minVer = TMIN(range->minVer, minVer);
627,577✔
822
  range->maxVer = TMAX(range->maxVer, maxVer);
627,577✔
823
}
627,577✔
824

825
int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, uint32_t cmprAlg, int64_t *fileSize,
48,961✔
826
                               TBrinBlkArray *brinBlkArray, SBuffer *buffers, SVersionRange *range,
827
                               int32_t encryptAlgorithm, char *encryptKey) {
828
  if (brinBlock->numOfRecords == 0) {
48,961!
829
    return 0;
×
830
  }
831

832
  int32_t  code;
833
  SBuffer *buffer0 = buffers + 0;
48,961✔
834
  SBuffer *buffer1 = buffers + 1;
48,961✔
835
  SBuffer *assist = buffers + 2;
48,961✔
836

837
  SBrinBlk brinBlk = {
48,961✔
838
      .dp[0] =
839
          {
840
              .offset = *fileSize,
48,961✔
841
              .size = 0,
842
          },
843
      .numRec = brinBlock->numOfRecords,
48,961✔
844
      .numOfPKs = brinBlock->numOfPKs,
48,961✔
845
      .cmprAlg = cmprAlg,
846
  };
847
  for (int i = 0; i < brinBlock->numOfRecords; i++) {
11,849,765✔
848
    SBrinRecord record;
849

850
    TAOS_CHECK_RETURN(tBrinBlockGet(brinBlock, i, &record));
11,800,804!
851
    if (i == 0) {
11,800,804✔
852
      brinBlk.minTbid.suid = record.suid;
48,961✔
853
      brinBlk.minTbid.uid = record.uid;
48,961✔
854
      brinBlk.minVer = record.minVer;
48,961✔
855
      brinBlk.maxVer = record.maxVer;
48,961✔
856
    }
857
    if (i == brinBlock->numOfRecords - 1) {
11,800,804✔
858
      brinBlk.maxTbid.suid = record.suid;
48,961✔
859
      brinBlk.maxTbid.uid = record.uid;
48,961✔
860
    }
861
    if (record.minVer < brinBlk.minVer) {
11,800,804✔
862
      brinBlk.minVer = record.minVer;
5,842✔
863
    }
864
    if (record.maxVer > brinBlk.maxVer) {
11,800,804✔
865
      brinBlk.maxVer = record.maxVer;
4,629,802✔
866
    }
867
  }
868

869
  tsdbWriterUpdVerRange(range, brinBlk.minVer, brinBlk.maxVer);
48,961✔
870

871
  // write to file
872
  for (int32_t i = 0; i < 10; ++i) {
538,566✔
873
    SCompressInfo info = {
489,606✔
874
        .cmprAlg = cmprAlg,
875
        .dataType = TSDB_DATA_TYPE_BIGINT,
876
        .originalSize = brinBlock->buffers[i].size,
489,606✔
877
    };
878

879
    tBufferClear(buffer0);
880
    TAOS_CHECK_RETURN(tCompressDataToBuffer(brinBlock->buffers[i].data, &info, buffer0, assist));
489,606!
881
    TAOS_CHECK_RETURN(tsdbWriteFile(fd, *fileSize, buffer0->data, buffer0->size, encryptAlgorithm, encryptKey));
489,603!
882
    brinBlk.size[i] = info.compressedSize;
489,605✔
883
    brinBlk.dp->size += info.compressedSize;
489,605✔
884
    *fileSize += info.compressedSize;
489,605✔
885
  }
886
  for (int32_t i = 10; i < 15; ++i) {
293,764✔
887
    SCompressInfo info = {
244,804✔
888
        .cmprAlg = cmprAlg,
889
        .dataType = TSDB_DATA_TYPE_INT,
890
        .originalSize = brinBlock->buffers[i].size,
244,804✔
891
    };
892

893
    tBufferClear(buffer0);
894
    TAOS_CHECK_RETURN(tCompressDataToBuffer(brinBlock->buffers[i].data, &info, buffer0, assist));
244,804!
895
    TAOS_CHECK_RETURN(tsdbWriteFile(fd, *fileSize, buffer0->data, buffer0->size, encryptAlgorithm, encryptKey));
244,804!
896
    brinBlk.size[i] = info.compressedSize;
244,804✔
897
    brinBlk.dp->size += info.compressedSize;
244,804✔
898
    *fileSize += info.compressedSize;
244,804✔
899
  }
900

901
  // write primary keys to file
902
  if (brinBlock->numOfPKs > 0) {
48,960✔
903
    tBufferClear(buffer0);
904
    tBufferClear(buffer1);
905

906
    // encode
907
    for (int i = 0; i < brinBlock->numOfPKs; i++) {
3,092✔
908
      SValueColumnCompressInfo info = {.cmprAlg = cmprAlg};
1,546✔
909
      TAOS_CHECK_RETURN(tValueColumnCompress(&brinBlock->firstKeyPKs[i], &info, buffer1, assist));
1,546!
910
      TAOS_CHECK_RETURN(tValueColumnCompressInfoEncode(&info, buffer0));
1,546!
911
    }
912
    for (int i = 0; i < brinBlock->numOfPKs; i++) {
3,092✔
913
      SValueColumnCompressInfo info = {.cmprAlg = cmprAlg};
1,546✔
914
      TAOS_CHECK_RETURN(tValueColumnCompress(&brinBlock->lastKeyPKs[i], &info, buffer1, assist));
1,546!
915
      TAOS_CHECK_RETURN(tValueColumnCompressInfoEncode(&info, buffer0));
1,546!
916
    }
917

918
    // write to file
919
    TAOS_CHECK_RETURN(tsdbWriteFile(fd, *fileSize, buffer0->data, buffer0->size, encryptAlgorithm, encryptKey));
1,546!
920
    *fileSize += buffer0->size;
1,546✔
921
    brinBlk.dp->size += buffer0->size;
1,546✔
922
    TAOS_CHECK_RETURN(tsdbWriteFile(fd, *fileSize, buffer1->data, buffer1->size, encryptAlgorithm, encryptKey));
1,546!
923
    *fileSize += buffer1->size;
1,546✔
924
    brinBlk.dp->size += buffer1->size;
1,546✔
925
  }
926

927
  // append to brinBlkArray
928
  TAOS_CHECK_RETURN(TARRAY2_APPEND_PTR(brinBlkArray, &brinBlk));
97,920!
929

930
  tBrinBlockClear(brinBlock);
48,960✔
931

932
  return 0;
48,961✔
933
}
934

935
static int32_t tsdbDataFileWriteBrinBlock(SDataFileWriter *writer) {
48,988✔
936
  if (writer->brinBlock->numOfRecords == 0) {
48,988✔
937
    return 0;
27✔
938
  }
939

940
  int32_t code = 0;
48,961✔
941
  int32_t lino = 0;
48,961✔
942

943
  int32_t encryptAlgorithm = writer->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
48,961✔
944
  char   *encryptKey = writer->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
48,961✔
945

946
  TAOS_CHECK_GOTO(tsdbFileWriteBrinBlock(writer->fd[TSDB_FTYPE_HEAD], writer->brinBlock, writer->config->cmprAlg,
48,961!
947
                                         &writer->files[TSDB_FTYPE_HEAD].size, writer->brinBlkArray, writer->buffers,
948
                                         &writer->ctx->range, encryptAlgorithm, encryptKey),
949
                  &lino, _exit);
950

951
_exit:
48,961✔
952
  if (code) {
48,961!
953
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
954
              tstrerror(code));
955
  }
956
  return code;
48,961✔
957
}
958

959
static int32_t tsdbDataFileWriteBrinRecord(SDataFileWriter *writer, const SBrinRecord *record) {
11,800,800✔
960
  int32_t code = 0;
11,800,800✔
961
  int32_t lino = 0;
11,800,800✔
962

963
  for (;;) {
964
    code = tBrinBlockPut(writer->brinBlock, record);
11,800,805✔
965
    if (code == TSDB_CODE_INVALID_PARA) {
11,800,808✔
966
      // different records with different primary keys
967
      TAOS_CHECK_GOTO(tsdbDataFileWriteBrinBlock(writer), &lino, _exit);
4!
968
      continue;
5✔
969
    } else {
970
      TSDB_CHECK_CODE(code, lino, _exit);
11,800,804!
971
    }
972
    break;
11,800,804✔
973
  }
974

975
  if ((writer->brinBlock->numOfRecords) >= 256) {
11,800,804✔
976
    TAOS_CHECK_GOTO(tsdbDataFileWriteBrinBlock(writer), &lino, _exit);
45,631!
977
  }
978

979
_exit:
11,800,804✔
980
  if (code) {
11,800,804!
981
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
982
              tstrerror(code));
983
  }
984
  return code;
11,800,803✔
985
}
986

987
static int32_t tsdbDataFileDoWriteBlockData(SDataFileWriter *writer, SBlockData *bData) {
246,819✔
988
  if (bData->nRow == 0) {
246,819✔
989
    return 0;
36,050✔
990
  }
991

992
  if (!bData->uid) {
210,769!
993
    return TSDB_CODE_INVALID_PARA;
×
994
  }
995

996
  int32_t  code = 0;
210,769✔
997
  int32_t  lino = 0;
210,769✔
998
  SBuffer *buffers = writer->buffers;
210,769✔
999
  SBuffer *assist = writer->buffers + 4;
210,769✔
1000

1001
  SColCompressInfo cmprInfo = {.pColCmpr = NULL, .defaultCmprAlg = writer->config->cmprAlg};
210,769✔
1002

1003
  SBrinRecord record[1] = {{
210,769✔
1004
      .suid = bData->suid,
210,769✔
1005
      .uid = bData->uid,
210,769✔
1006
      .minVer = bData->aVersion[0],
210,769✔
1007
      .maxVer = bData->aVersion[0],
210,769✔
1008
      .blockOffset = writer->files[TSDB_FTYPE_DATA].size,
210,769✔
1009
      .smaOffset = writer->files[TSDB_FTYPE_SMA].size,
210,769✔
1010
      .blockSize = 0,
1011
      .blockKeySize = 0,
1012
      .smaSize = 0,
1013
      .numRow = bData->nRow,
210,769✔
1014
      .count = 1,
1015
  }};
1016

1017
  tsdbRowGetKey(&tsdbRowFromBlockData(bData, 0), &record->firstKey);
210,769✔
1018
  tsdbRowGetKey(&tsdbRowFromBlockData(bData, bData->nRow - 1), &record->lastKey);
210,769✔
1019

1020
  for (int32_t i = 1; i < bData->nRow; ++i) {
707,695,015✔
1021
    if (tsdbRowCompareWithoutVersion(&tsdbRowFromBlockData(bData, i - 1), &tsdbRowFromBlockData(bData, i)) != 0) {
707,484,246✔
1022
      record->count++;
707,522,042✔
1023
    }
1024
    if (bData->aVersion[i] < record->minVer) {
707,524,649✔
1025
      record->minVer = bData->aVersion[i];
18,387✔
1026
    }
1027
    if (bData->aVersion[i] > record->maxVer) {
707,524,649✔
1028
      record->maxVer = bData->aVersion[i];
9,889,545✔
1029
    }
1030
  }
1031

1032
  tsdbWriterUpdVerRange(&writer->ctx->range, record->minVer, record->maxVer);
210,769✔
1033

1034
  code = metaGetColCmpr(writer->config->tsdb->pVnode->pMeta, bData->suid != 0 ? bData->suid : bData->uid,
210,769✔
1035
                        &cmprInfo.pColCmpr);
1036
  if (code) {
210,768!
1037
    tsdbWarn("vgId:%d failed to get column compress algrithm", TD_VID(writer->config->tsdb->pVnode));
×
1038
  }
1039

1040
  TAOS_CHECK_GOTO(tBlockDataCompress(bData, &cmprInfo, buffers, assist), &lino, _exit);
210,768!
1041

1042
  record->blockKeySize = buffers[0].size + buffers[1].size;
210,769✔
1043
  record->blockSize = record->blockKeySize + buffers[2].size + buffers[3].size;
210,769✔
1044

1045
  int32_t encryptAlgorithm = writer->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
210,769✔
1046
  char   *encryptKey = writer->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
210,769✔
1047
  for (int i = 0; i < 4; i++) {
1,053,835✔
1048
    TAOS_CHECK_GOTO(tsdbWriteFile(writer->fd[TSDB_FTYPE_DATA], writer->files[TSDB_FTYPE_DATA].size, buffers[i].data,
843,066!
1049
                                  buffers[i].size, encryptAlgorithm, encryptKey),
1050
                    &lino, _exit);
1051
    writer->files[TSDB_FTYPE_DATA].size += buffers[i].size;
843,066✔
1052
  }
1053

1054
  // to .sma file
1055
  tBufferClear(&buffers[0]);
1056
  for (int32_t i = 0; i < bData->nColData; ++i) {
910,898✔
1057
    SColData *colData = bData->aColData + i;
700,129✔
1058
    if ((colData->cflag & COL_SMA_ON) == 0 || ((colData->flag & HAS_VALUE) == 0)) continue;
700,129✔
1059

1060
    SColumnDataAgg sma[1] = {{.colId = colData->cid}};
642,715✔
1061
    tColDataCalcSMA[colData->type](colData, &sma->sum, &sma->max, &sma->min, &sma->numOfNull);
642,715✔
1062

1063
    TAOS_CHECK_GOTO(tPutColumnDataAgg(&buffers[0], sma), &lino, _exit);
642,723!
1064
  }
1065
  record->smaSize = buffers[0].size;
210,769✔
1066

1067
  if (record->smaSize > 0) {
210,769✔
1068
    TAOS_CHECK_GOTO(tsdbWriteFile(writer->fd[TSDB_FTYPE_SMA], record->smaOffset, buffers[0].data, record->smaSize,
197,431!
1069
                                  encryptAlgorithm, encryptKey),
1070
                    &lino, _exit);
1071
    writer->files[TSDB_FTYPE_SMA].size += record->smaSize;
197,430✔
1072
  }
1073

1074
  // append SBrinRecord
1075
  TAOS_CHECK_GOTO(tsdbDataFileWriteBrinRecord(writer, record), &lino, _exit);
210,768!
1076

1077
  tBlockDataClear(bData);
210,767✔
1078

1079
_exit:
210,766✔
1080
  if (code) {
210,766!
1081
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1082
              tstrerror(code));
1083
  }
1084
  taosHashCleanup(cmprInfo.pColCmpr);
210,766✔
1085
  return code;
210,767✔
1086
}
1087

1088
static int32_t tsdbDataFileDoWriteTSRow(SDataFileWriter *writer, TSDBROW *row) {
134,716,071✔
1089
  int32_t code = 0;
134,716,071✔
1090
  int32_t lino = 0;
134,716,071✔
1091

1092
  // update/append
1093
  if (row->type == TSDBROW_ROW_FMT) {
134,716,071!
1094
    TAOS_CHECK_GOTO(
×
1095
        tsdbUpdateSkmRow(writer->config->tsdb, writer->ctx->tbid, TSDBROW_SVERSION(row), writer->config->skmRow), &lino,
1096
        _exit);
1097
  }
1098

1099
  if (TSDBROW_VERSION(row) <= writer->config->compactVersion  //
134,716,071!
1100
      && writer->blockData->nRow > 0                          //
134,721,726✔
1101
      &&
134,552,094✔
1102
      tsdbRowCompareWithoutVersion(row, &tsdbRowFromBlockData(writer->blockData, writer->blockData->nRow - 1)) == 0  //
134,681,148✔
1103
  ) {
1104
    TAOS_CHECK_GOTO(tBlockDataUpdateRow(writer->blockData, row, writer->config->skmRow->pTSchema), &lino, _exit);
5,702,206!
1105
  } else {
1106
    if (writer->blockData->nRow >= writer->config->maxRow) {
128,884,811✔
1107
      TAOS_CHECK_GOTO(tsdbDataFileDoWriteBlockData(writer, writer->blockData), &lino, _exit);
4,192!
1108
    }
1109

1110
    TAOS_CHECK_GOTO(
128,884,811!
1111
        tBlockDataAppendRow(writer->blockData, row, writer->config->skmRow->pTSchema, writer->ctx->tbid->uid), &lino,
1112
        _exit);
1113
  }
1114

1115
_exit:
134,961,600✔
1116
  if (code) {
134,961,600!
1117
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1118
              tstrerror(code));
1119
  }
1120
  return code;
134,958,527✔
1121
}
1122

1123
static FORCE_INLINE int32_t tsdbRowKeyCmprNullAsLargest(const STsdbRowKey *key1, const STsdbRowKey *key2) {
1124
  if (key1 == NULL) {
45,471,136!
1125
    return 1;
496✔
1126
  } else if (key2 == NULL) {
45,470,640!
1127
    return -1;
8,810✔
1128
  } else {
1129
    return tsdbRowKeyCmpr(key1, key2);
45,461,830✔
1130
  }
1131
}
1132

1133
static int32_t tsdbDataFileDoWriteTableOldData(SDataFileWriter *writer, const STsdbRowKey *key) {
10,369,057✔
1134
  if (writer->ctx->tbHasOldData == false) {
10,369,057!
1135
    return 0;
×
1136
  }
1137

1138
  int32_t     code = 0;
10,369,057✔
1139
  int32_t     lino = 0;
10,369,057✔
1140
  STsdbRowKey rowKey;
1141

1142
  for (;;) {
×
1143
    for (;;) {
1144
      // SBlockData
1145
      for (; writer->ctx->blockDataIdx < writer->ctx->blockData->nRow; writer->ctx->blockDataIdx++) {
23,445,066✔
1146
        TSDBROW row = tsdbRowFromBlockData(writer->ctx->blockData, writer->ctx->blockDataIdx);
23,180,177✔
1147

1148
        tsdbRowGetKey(&row, &rowKey);
23,180,177✔
1149
        if (tsdbRowKeyCmprNullAsLargest(&rowKey, key) < 0) {  // key <= rowKey
23,100,380✔
1150
          TAOS_CHECK_GOTO(tsdbDataFileDoWriteTSRow(writer, &row), &lino, _exit);
23,100,380!
1151
        } else {
1152
          goto _exit;
10,177,732✔
1153
        }
1154
      }
1155

1156
      // SBrinBlock
1157
      if (writer->ctx->brinBlockIdx >= writer->ctx->brinBlock->numOfRecords) {
264,889✔
1158
        break;
43,985✔
1159
      }
1160

1161
      for (; writer->ctx->brinBlockIdx < writer->ctx->brinBlock->numOfRecords; writer->ctx->brinBlockIdx++) {
11,375,198✔
1162
        SBrinRecord record;
1163
        code = tBrinBlockGet(writer->ctx->brinBlock, writer->ctx->brinBlockIdx, &record);
11,185,802✔
1164
        TSDB_CHECK_CODE(code, lino, _exit);
11,209,782!
1165
        if (record.uid != writer->ctx->tbid->uid) {
11,185,801✔
1166
          writer->ctx->tbHasOldData = false;
4,764✔
1167
          goto _exit;
4,764✔
1168
        }
1169

1170
        if (tsdbRowKeyCmprNullAsLargest(key, &record.firstKey) < 0) {  // key < record->firstKey
11,181,037✔
1171
          goto _exit;
19,217✔
1172
        } else {
1173
          SBrinRecord record[1];
1174
          code = tBrinBlockGet(writer->ctx->brinBlock, writer->ctx->brinBlockIdx, record);
11,161,820✔
1175
          TSDB_CHECK_CODE(code, lino, _exit);
11,161,820!
1176
          if (tsdbRowKeyCmprNullAsLargest(key, &record->lastKey) > 0) {  // key > record->lastKey
11,161,820✔
1177
            if (writer->blockData->nRow > 0) {
11,154,293✔
1178
              TAOS_CHECK_GOTO(tsdbDataFileDoWriteBlockData(writer, writer->blockData), &lino, _exit);
19!
1179
            }
1180

1181
            TAOS_CHECK_GOTO(tsdbDataFileWriteBrinRecord(writer, record), &lino, _exit);
11,154,293!
1182
          } else {
1183
            TAOS_CHECK_GOTO(tsdbDataFileReadBlockData(writer->ctx->reader, record, writer->ctx->blockData), &lino,
7,527!
1184
                            _exit);
1185

1186
            writer->ctx->blockDataIdx = 0;
7,527✔
1187
            writer->ctx->brinBlockIdx++;
7,527✔
1188
            break;
7,527✔
1189
          }
1190
        }
1191
      }
1192
    }
1193

1194
    // SBrinBlk
1195
    if (writer->ctx->brinBlkArrayIdx >= TARRAY2_SIZE(writer->ctx->brinBlkArray)) {
43,985✔
1196
      writer->ctx->brinBlkArray = NULL;
480✔
1197
      writer->ctx->tbHasOldData = false;
480✔
1198
      goto _exit;
480✔
1199
    } else {
1200
      const SBrinBlk *brinBlk = TARRAY2_GET_PTR(writer->ctx->brinBlkArray, writer->ctx->brinBlkArrayIdx);
43,505✔
1201

1202
      if (brinBlk->minTbid.uid != writer->ctx->tbid->uid) {
43,505✔
1203
        writer->ctx->tbHasOldData = false;
16✔
1204
        goto _exit;
16✔
1205
      }
1206

1207
      TAOS_CHECK_GOTO(tsdbDataFileReadBrinBlock(writer->ctx->reader, brinBlk, writer->ctx->brinBlock), &lino, _exit);
43,489!
1208

1209
      writer->ctx->brinBlockIdx = 0;
×
1210
      writer->ctx->brinBlkArrayIdx++;
×
1211
    }
1212
  }
1213

1214
_exit:
10,344,233✔
1215
  if (code) {
10,344,233!
1216
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1217
              tstrerror(code));
1218
  }
1219
  return code;
10,351,649✔
1220
}
1221

1222
static int32_t tsdbDataFileDoWriteTSData(SDataFileWriter *writer, TSDBROW *row) {
122,008,874✔
1223
  int32_t code = 0;
122,008,874✔
1224
  int32_t lino = 0;
122,008,874✔
1225

1226
  if (writer->ctx->tbHasOldData) {
122,008,874✔
1227
    STsdbRowKey key;
1228
    tsdbRowGetKey(row, &key);
10,399,001✔
1229
    TAOS_CHECK_GOTO(tsdbDataFileDoWriteTableOldData(writer, &key), &lino, _exit);
10,370,704!
1230
  }
1231

1232
  TAOS_CHECK_GOTO(tsdbDataFileDoWriteTSRow(writer, row), &lino, _exit);
121,953,828!
1233

1234
_exit:
122,011,529✔
1235
  if (code) {
122,011,529!
1236
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1237
              tstrerror(code));
1238
  }
1239
  return code;
122,013,903✔
1240
}
1241

1242
static int32_t tsdbDataFileWriteTableDataEnd(SDataFileWriter *writer) {
63,854✔
1243
  if (writer->ctx->tbid->uid == 0) {
63,854✔
1244
    return 0;
3,352✔
1245
  }
1246

1247
  int32_t code = 0;
60,502✔
1248
  int32_t lino = 0;
60,502✔
1249

1250
  if (writer->ctx->tbHasOldData) {
60,502✔
1251
    TAOS_CHECK_GOTO(tsdbDataFileDoWriteTableOldData(writer, NULL /* as the largest key */), &lino, _exit);
11!
1252
  }
1253

1254
  TAOS_CHECK_GOTO(tsdbDataFileDoWriteBlockData(writer, writer->blockData), &lino, _exit);
60,502!
1255

1256
_exit:
60,504✔
1257
  if (code) {
60,504!
1258
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1259
              tstrerror(code));
1260
  }
1261
  return code;
60,504✔
1262
}
1263

1264
static int32_t tsdbDataFileWriteTableDataBegin(SDataFileWriter *writer, const TABLEID *tbid) {
63,854✔
1265
  int32_t code = 0;
63,854✔
1266
  int32_t lino = 0;
63,854✔
1267

1268
  SMetaInfo info;
1269
  bool      drop = false;
63,854✔
1270
  TABLEID   tbid1[1];
1271
  writer->ctx->tbHasOldData = false;
63,854✔
1272
  while (writer->ctx->brinBlkArray) {  // skip data of previous table
66,336✔
1273
    for (; writer->ctx->brinBlockIdx < writer->ctx->brinBlock->numOfRecords; writer->ctx->brinBlockIdx++) {
466,476✔
1274
      SBrinRecord record;
1275
      TAOS_CHECK_GOTO(tBrinBlockGet(writer->ctx->brinBlock, writer->ctx->brinBlockIdx, &record), &lino, _exit);
463,412!
1276

1277
      if (record.uid == tbid->uid) {
463,410✔
1278
        writer->ctx->tbHasOldData = true;
5,260✔
1279
        goto _begin;
26,847✔
1280
      } else if (record.suid > tbid->suid || (record.suid == tbid->suid && record.uid > tbid->uid)) {
458,150✔
1281
        goto _begin;
21,587✔
1282
      } else {
1283
        if (record.uid != writer->ctx->tbid->uid) {
436,563✔
1284
          if (drop && tbid1->uid == record.uid) {
235,533✔
1285
            continue;
824✔
1286
          } else if (metaGetInfo(writer->config->tsdb->pVnode->pMeta, record.uid, &info, NULL) != 0) {
235,524✔
1287
            drop = true;
815✔
1288
            tbid1->suid = record.suid;
815✔
1289
            tbid1->uid = record.uid;
815✔
1290
            continue;
815✔
1291
          } else {
1292
            drop = false;
234,709✔
1293
            writer->ctx->tbid->suid = record.suid;
234,709✔
1294
            writer->ctx->tbid->uid = record.uid;
234,709✔
1295
          }
1296
        }
1297

1298
        TAOS_CHECK_GOTO(tsdbDataFileWriteBrinRecord(writer, &record), &lino, _exit);
435,739!
1299
      }
1300
    }
1301

1302
    if (writer->ctx->brinBlkArrayIdx >= TARRAY2_SIZE(writer->ctx->brinBlkArray)) {
3,064✔
1303
      writer->ctx->brinBlkArray = NULL;
582✔
1304
      break;
582✔
1305
    } else {
1306
      const SBrinBlk *brinBlk = TARRAY2_GET_PTR(writer->ctx->brinBlkArray, writer->ctx->brinBlkArrayIdx);
2,482✔
1307

1308
      TAOS_CHECK_GOTO(tsdbDataFileReadBrinBlock(writer->ctx->reader, brinBlk, writer->ctx->brinBlock), &lino, _exit);
2,482!
1309

1310
      writer->ctx->brinBlockIdx = 0;
2,482✔
1311
      writer->ctx->brinBlkArrayIdx++;
2,482✔
1312
    }
1313
  }
1314

1315
_begin:
36,425✔
1316
  writer->ctx->tbid[0] = *tbid;
63,854✔
1317

1318
  if (tbid->uid == INT64_MAX) {
63,854✔
1319
    goto _exit;
3,352✔
1320
  }
1321

1322
  TAOS_CHECK_GOTO(tsdbUpdateSkmTb(writer->config->tsdb, tbid, writer->config->skmTb), &lino, _exit);
60,502!
1323
  TAOS_CHECK_GOTO(tBlockDataInit(writer->blockData, writer->ctx->tbid, writer->config->skmTb->pTSchema, NULL, 0), &lino,
60,503!
1324
                  _exit);
1325

1326
_exit:
60,504✔
1327
  if (code) {
63,856!
1328
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1329
              tstrerror(code));
1330
  }
1331
  return code;
63,856✔
1332
}
1333

1334
int32_t tsdbFileWriteHeadFooter(STsdbFD *fd, int64_t *fileSize, const SHeadFooter *footer, int32_t encryptAlgorithm,
3,352✔
1335
                                char *encryptKey) {
1336
  TAOS_CHECK_RETURN(
3,352!
1337
      tsdbWriteFile(fd, *fileSize, (const uint8_t *)footer, sizeof(*footer), encryptAlgorithm, encryptKey));
1338
  *fileSize += sizeof(*footer);
3,352✔
1339
  return 0;
3,352✔
1340
}
1341

1342
int32_t tsdbFileWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAlg, int64_t *fileSize,
11,308✔
1343
                               TTombBlkArray *tombBlkArray, SBuffer *buffers, SVersionRange *range,
1344
                               int32_t encryptAlgorithm, char *encryptKey) {
1345
  int32_t code;
1346

1347
  if (TOMB_BLOCK_SIZE(tombBlock) == 0) {
11,308!
1348
    return 0;
×
1349
  }
1350

1351
  SBuffer *buffer0 = buffers + 0;
11,308✔
1352
  SBuffer *assist = buffers + 1;
11,308✔
1353

1354
  STombBlk tombBlk = {
11,308✔
1355
      .dp[0] =
1356
          {
1357
              .offset = *fileSize,
11,308✔
1358
              .size = 0,
1359
          },
1360
      .numRec = TOMB_BLOCK_SIZE(tombBlock),
11,308✔
1361
      .cmprAlg = cmprAlg,
1362
  };
1363
  for (int i = 0; i < TOMB_BLOCK_SIZE(tombBlock); i++) {
383,438✔
1364
    STombRecord record;
1365
    TAOS_CHECK_RETURN(tTombBlockGet(tombBlock, i, &record));
372,130!
1366

1367
    if (i == 0) {
372,130✔
1368
      tombBlk.minTbid.suid = record.suid;
11,308✔
1369
      tombBlk.minTbid.uid = record.uid;
11,308✔
1370
      tombBlk.minVer = record.version;
11,308✔
1371
      tombBlk.maxVer = record.version;
11,308✔
1372
    }
1373
    if (i == TOMB_BLOCK_SIZE(tombBlock) - 1) {
372,130✔
1374
      tombBlk.maxTbid.suid = record.suid;
11,308✔
1375
      tombBlk.maxTbid.uid = record.uid;
11,308✔
1376
    }
1377
    if (record.version < tombBlk.minVer) {
372,130✔
1378
      tombBlk.minVer = record.version;
2,390✔
1379
    }
1380
    if (record.version > tombBlk.maxVer) {
372,130✔
1381
      tombBlk.maxVer = record.version;
185,958✔
1382
    }
1383
  }
1384

1385
  tsdbWriterUpdVerRange(range, tombBlk.minVer, tombBlk.maxVer);
11,308✔
1386

1387
  for (int32_t i = 0; i < ARRAY_SIZE(tombBlock->buffers); i++) {
67,848✔
1388
    tBufferClear(buffer0);
1389

1390
    SCompressInfo cinfo = {
56,540✔
1391
        .cmprAlg = cmprAlg,
1392
        .dataType = TSDB_DATA_TYPE_BIGINT,
1393
        .originalSize = tombBlock->buffers[i].size,
56,540✔
1394
    };
1395
    TAOS_CHECK_RETURN(tCompressDataToBuffer(tombBlock->buffers[i].data, &cinfo, buffer0, assist));
56,540!
1396
    TAOS_CHECK_RETURN(tsdbWriteFile(fd, *fileSize, buffer0->data, buffer0->size, encryptAlgorithm, encryptKey));
56,540!
1397

1398
    tombBlk.size[i] = cinfo.compressedSize;
56,540✔
1399
    tombBlk.dp->size += tombBlk.size[i];
56,540✔
1400
    *fileSize += tombBlk.size[i];
56,540✔
1401
  }
1402

1403
  TAOS_CHECK_RETURN(TARRAY2_APPEND_PTR(tombBlkArray, &tombBlk));
22,616!
1404

1405
  tTombBlockClear(tombBlock);
11,308✔
1406
  return 0;
11,308✔
1407
}
1408

1409
static int32_t tsdbDataFileWriteHeadFooter(SDataFileWriter *writer) {
3,352✔
1410
  int32_t code = 0;
3,352✔
1411
  int32_t lino = 0;
3,352✔
1412

1413
  int32_t encryptAlgorithm = writer->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
3,352✔
1414
  char   *encryptKey = writer->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
3,352✔
1415

1416
  TAOS_CHECK_GOTO(tsdbFileWriteHeadFooter(writer->fd[TSDB_FTYPE_HEAD], &writer->files[TSDB_FTYPE_HEAD].size,
3,352!
1417
                                          writer->headFooter, encryptAlgorithm, encryptKey),
1418
                  &lino, _exit);
1419

1420
_exit:
3,352✔
1421
  if (code) {
3,352!
1422
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1423
              tstrerror(code));
1424
  }
1425
  return code;
3,352✔
1426
}
1427

1428
static int32_t tsdbDataFileDoWriteTombBlock(SDataFileWriter *writer) {
2,081✔
1429
  if (TOMB_BLOCK_SIZE(writer->tombBlock) == 0) return 0;
2,081!
1430

1431
  int32_t code = 0;
2,081✔
1432
  int32_t lino = 0;
2,081✔
1433

1434
  int32_t encryptAlgorithm = writer->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
2,081✔
1435
  char   *encryptKey = writer->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
2,081✔
1436

1437
  TAOS_CHECK_GOTO(tsdbFileWriteTombBlock(writer->fd[TSDB_FTYPE_TOMB], writer->tombBlock, writer->config->cmprAlg,
2,081!
1438
                                         &writer->files[TSDB_FTYPE_TOMB].size, writer->tombBlkArray, writer->buffers,
1439
                                         &writer->ctx->tombRange, encryptAlgorithm, encryptKey),
1440
                  &lino, _exit);
1441

1442
_exit:
2,081✔
1443
  if (code) {
2,081!
1444
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1445
              tstrerror(code));
1446
  }
1447
  return code;
2,081✔
1448
}
1449

1450
int32_t tsdbFileWriteTombBlk(STsdbFD *fd, const TTombBlkArray *tombBlkArray, SFDataPtr *ptr, int64_t *fileSize,
155,372✔
1451
                             int32_t encryptAlgorithm, char *encryptKey) {
1452
  ptr->size = TARRAY2_DATA_LEN(tombBlkArray);
155,372✔
1453
  if (ptr->size > 0) {
155,372✔
1454
    ptr->offset = *fileSize;
11,308✔
1455

1456
    TAOS_CHECK_RETURN(tsdbWriteFile(fd, *fileSize, (const uint8_t *)TARRAY2_DATA(tombBlkArray), ptr->size,
11,308!
1457
                                    encryptAlgorithm, encryptKey));
1458

1459
    *fileSize += ptr->size;
11,308✔
1460
  }
1461
  return 0;
155,372✔
1462
}
1463

1464
static int32_t tsdbDataFileDoWriteTombBlk(SDataFileWriter *writer) {
2,081✔
1465
  if (TARRAY2_SIZE(writer->tombBlkArray) <= 0) {
2,081!
1466
    return TSDB_CODE_INVALID_PARA;
×
1467
  }
1468

1469
  int32_t code = 0;
2,081✔
1470
  int32_t lino = 0;
2,081✔
1471

1472
  int32_t encryptAlgorithm = writer->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
2,081✔
1473
  char   *encryptKey = writer->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
2,081✔
1474

1475
  TAOS_CHECK_GOTO(
2,081!
1476
      tsdbFileWriteTombBlk(writer->fd[TSDB_FTYPE_TOMB], writer->tombBlkArray, writer->tombFooter->tombBlkPtr,
1477
                           &writer->files[TSDB_FTYPE_TOMB].size, encryptAlgorithm, encryptKey),
1478
      &lino, _exit);
1479

1480
_exit:
2,081✔
1481
  if (code) {
2,081!
1482
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1483
              tstrerror(code));
1484
  }
1485
  return code;
2,081✔
1486
}
1487

1488
int32_t tsdbFileWriteTombFooter(STsdbFD *fd, const STombFooter *footer, int64_t *fileSize, int32_t encryptAlgorithm,
2,081✔
1489
                                char *encryptKey) {
1490
  TAOS_CHECK_RETURN(
2,081!
1491
      tsdbWriteFile(fd, *fileSize, (const uint8_t *)footer, sizeof(*footer), encryptAlgorithm, encryptKey));
1492
  *fileSize += sizeof(*footer);
2,081✔
1493
  return 0;
2,081✔
1494
}
1495

1496
static int32_t tsdbDataFileWriteTombFooter(SDataFileWriter *writer) {
2,081✔
1497
  int32_t code = 0;
2,081✔
1498
  int32_t lino = 0;
2,081✔
1499

1500
  int32_t encryptAlgorithm = writer->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
2,081✔
1501
  char   *encryptKey = writer->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
2,081✔
1502

1503
  TAOS_CHECK_GOTO(tsdbFileWriteTombFooter(writer->fd[TSDB_FTYPE_TOMB], writer->tombFooter,
2,081!
1504
                                          &writer->files[TSDB_FTYPE_TOMB].size, encryptAlgorithm, encryptKey),
1505
                  &lino, _exit);
1506

1507
_exit:
2,081✔
1508
  if (code) {
2,081!
1509
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1510
              tstrerror(code));
1511
  }
1512
  return code;
2,081✔
1513
}
1514

1515
static int32_t tsdbDataFileDoWriteTombRecord(SDataFileWriter *writer, const STombRecord *record) {
11,348✔
1516
  int32_t code = 0;
11,348✔
1517
  int32_t lino = 0;
11,348✔
1518

1519
  while (writer->ctx->hasOldTomb) {
12,495✔
1520
    for (; writer->ctx->tombBlockIdx < TOMB_BLOCK_SIZE(writer->ctx->tombBlock); writer->ctx->tombBlockIdx++) {
293,216✔
1521
      STombRecord record1[1];
1522
      TAOS_CHECK_GOTO(tTombBlockGet(writer->ctx->tombBlock, writer->ctx->tombBlockIdx, record1), &lino, _exit);
290,922!
1523

1524
      int32_t c = tTombRecordCompare(record, record1);
290,922✔
1525
      if (c < 0) {
290,922✔
1526
        goto _write;
2,427✔
1527
      } else if (c > 0) {
288,495!
1528
        TAOS_CHECK_GOTO(tTombBlockPut(writer->tombBlock, record1), &lino, _exit);
288,495!
1529

1530
        tsdbTrace("vgId:%d write tomb record to tomb file:%s, cid:%" PRId64 ", suid:%" PRId64 ", uid:%" PRId64
288,495!
1531
                  ", version:%" PRId64,
1532
                  TD_VID(writer->config->tsdb->pVnode), writer->fd[TSDB_FTYPE_TOMB]->path, writer->config->cid,
1533
                  record1->suid, record1->uid, record1->version);
1534

1535
        if (TOMB_BLOCK_SIZE(writer->tombBlock) >= writer->config->maxRow) {
288,495!
1536
          TAOS_CHECK_GOTO(tsdbDataFileDoWriteTombBlock(writer), &lino, _exit);
×
1537
        }
1538
      } else {
1539
        tsdbError("vgId:%d duplicate tomb record, cid:%" PRId64 ", suid:%" PRId64 ", uid:%" PRId64 ", version:%" PRId64,
×
1540
                  TD_VID(writer->config->tsdb->pVnode), writer->config->cid, record->suid, record->uid,
1541
                  record->version);
1542
      }
1543
    }
1544

1545
    if (writer->ctx->tombBlkArrayIdx >= TARRAY2_SIZE(writer->ctx->tombBlkArray)) {
2,294✔
1546
      writer->ctx->hasOldTomb = false;
1,147✔
1547
      break;
1,147✔
1548
    } else {
1549
      const STombBlk *tombBlk = TARRAY2_GET_PTR(writer->ctx->tombBlkArray, writer->ctx->tombBlkArrayIdx);
1,147✔
1550

1551
      TAOS_CHECK_GOTO(tsdbDataFileReadTombBlock(writer->ctx->reader, tombBlk, writer->ctx->tombBlock), &lino, _exit);
1,147!
1552

1553
      writer->ctx->tombBlockIdx = 0;
1,147✔
1554
      writer->ctx->tombBlkArrayIdx++;
1,147✔
1555
    }
1556
  }
1557

1558
_write:
7,774✔
1559
  if (record->suid == INT64_MAX) {
11,348✔
1560
    goto _exit;
2,081✔
1561
  }
1562

1563
  TAOS_CHECK_GOTO(tTombBlockPut(writer->tombBlock, record), &lino, _exit);
9,267!
1564

1565
  tsdbTrace("vgId:%d write tomb record to tomb file:%s, cid:%" PRId64 ", suid:%" PRId64 ", uid:%" PRId64
9,267!
1566
            ", version:%" PRId64,
1567
            TD_VID(writer->config->tsdb->pVnode), writer->fd[TSDB_FTYPE_TOMB]->path, writer->config->cid, record->suid,
1568
            record->uid, record->version);
1569

1570
  if (TOMB_BLOCK_SIZE(writer->tombBlock) >= writer->config->maxRow) {
9,267!
1571
    TAOS_CHECK_GOTO(tsdbDataFileDoWriteTombBlock(writer), &lino, _exit);
×
1572
  }
1573

1574
_exit:
9,267✔
1575
  if (code) {
11,348!
1576
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1577
              tstrerror(code));
1578
  }
1579
  return code;
11,348✔
1580
}
1581

1582
int32_t tsdbFileWriteBrinBlk(STsdbFD *fd, TBrinBlkArray *brinBlkArray, SFDataPtr *ptr, int64_t *fileSize,
3,352✔
1583
                             int32_t encryptAlgorithm, char *encryptKey) {
1584
  if (TARRAY2_SIZE(brinBlkArray) <= 0) {
3,352!
1585
    return TSDB_CODE_INVALID_PARA;
×
1586
  }
1587
  ptr->offset = *fileSize;
3,352✔
1588
  ptr->size = TARRAY2_DATA_LEN(brinBlkArray);
3,352✔
1589

1590
  TAOS_CHECK_RETURN(
3,352!
1591
      tsdbWriteFile(fd, ptr->offset, (uint8_t *)TARRAY2_DATA(brinBlkArray), ptr->size, encryptAlgorithm, encryptKey));
1592

1593
  *fileSize += ptr->size;
3,352✔
1594
  return 0;
3,352✔
1595
}
1596

1597
static int32_t tsdbDataFileWriteBrinBlk(SDataFileWriter *writer) {
3,352✔
1598
  int32_t code = 0;
3,352✔
1599
  int32_t lino = 0;
3,352✔
1600

1601
  int32_t encryptAlgorithm = writer->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
3,352✔
1602
  char   *encryptKey = writer->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
3,352✔
1603

1604
  TAOS_CHECK_GOTO(
3,352!
1605
      tsdbFileWriteBrinBlk(writer->fd[TSDB_FTYPE_HEAD], writer->brinBlkArray, writer->headFooter->brinBlkPtr,
1606
                           &writer->files[TSDB_FTYPE_HEAD].size, encryptAlgorithm, encryptKey),
1607
      &lino, _exit);
1608

1609
_exit:
3,352✔
1610
  if (code) {
3,352!
1611
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1612
              tstrerror(code));
1613
  }
1614
  return code;
3,352✔
1615
}
1616

1617
void tsdbTFileUpdVerRange(STFile *f, SVersionRange range) {
171,054✔
1618
  f->minVer = TMIN(f->minVer, range.minVer);
171,054✔
1619
  f->maxVer = TMAX(f->maxVer, range.maxVer);
171,054✔
1620
}
171,054✔
1621

1622
static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer, TFileOpArray *opArr) {
4,394✔
1623
  int32_t code = 0;
4,394✔
1624
  int32_t lino = 0;
4,394✔
1625

1626
  int32_t  ftype;
1627
  STFileOp op;
1628

1629
  if (writer->fd[TSDB_FTYPE_HEAD]) {
4,394✔
1630
    TABLEID tbid[1] = {{
3,352✔
1631
        .suid = INT64_MAX,
1632
        .uid = INT64_MAX,
1633
    }};
1634

1635
    TAOS_CHECK_GOTO(tsdbDataFileWriteTableDataEnd(writer), &lino, _exit);
3,352!
1636
    TAOS_CHECK_GOTO(tsdbDataFileWriteTableDataBegin(writer, tbid), &lino, _exit);
3,352!
1637
    TAOS_CHECK_GOTO(tsdbDataFileWriteBrinBlock(writer), &lino, _exit);
3,352!
1638
    TAOS_CHECK_GOTO(tsdbDataFileWriteBrinBlk(writer), &lino, _exit);
3,352!
1639
    TAOS_CHECK_GOTO(tsdbDataFileWriteHeadFooter(writer), &lino, _exit);
3,352!
1640

1641
    SVersionRange ofRange = {.minVer = VERSION_MAX, .maxVer = VERSION_MIN};
3,352✔
1642

1643
    // .head
1644
    ftype = TSDB_FTYPE_HEAD;
3,352✔
1645
    if (writer->config->files[ftype].exist) {
3,352✔
1646
      op = (STFileOp){
1,062✔
1647
          .optype = TSDB_FOP_REMOVE,
1648
          .fid = writer->config->fid,
1,062✔
1649
          .of = writer->config->files[ftype].file,
1,062✔
1650
      };
1651
      ofRange = (SVersionRange){.minVer = op.of.minVer, .maxVer = op.of.maxVer};
1,062✔
1652
      TAOS_CHECK_GOTO(TARRAY2_APPEND(opArr, op), &lino, _exit);
2,124!
1653
    }
1654
    op = (STFileOp){
3,352✔
1655
        .optype = TSDB_FOP_CREATE,
1656
        .fid = writer->config->fid,
3,352✔
1657
        .nf = writer->files[ftype],
3,352✔
1658
    };
1659
    tsdbTFileUpdVerRange(&op.nf, ofRange);
3,352✔
1660
    tsdbTFileUpdVerRange(&op.nf, writer->ctx->range);
3,352✔
1661
    TAOS_CHECK_GOTO(TARRAY2_APPEND(opArr, op), &lino, _exit);
6,704!
1662

1663
    // .data
1664
    ftype = TSDB_FTYPE_DATA;
3,352✔
1665
    if (!writer->config->files[ftype].exist) {
3,352✔
1666
      op = (STFileOp){
2,290✔
1667
          .optype = TSDB_FOP_CREATE,
1668
          .fid = writer->config->fid,
2,290✔
1669
          .nf = writer->files[ftype],
2,290✔
1670
      };
1671
      tsdbTFileUpdVerRange(&op.nf, writer->ctx->range);
2,290✔
1672
      TAOS_CHECK_GOTO(TARRAY2_APPEND(opArr, op), &lino, _exit);
4,580!
1673
    } else if (writer->config->files[ftype].file.size != writer->files[ftype].size) {
1,062!
1674
      op = (STFileOp){
1,062✔
1675
          .optype = TSDB_FOP_MODIFY,
1676
          .fid = writer->config->fid,
1,062✔
1677
          .of = writer->config->files[ftype].file,
1,062✔
1678
          .nf = writer->files[ftype],
1,062✔
1679
      };
1680
      tsdbTFileUpdVerRange(&op.nf, writer->ctx->range);
1,062✔
1681
      TAOS_CHECK_GOTO(TARRAY2_APPEND(opArr, op), &lino, _exit);
2,124!
1682
    }
1683

1684
    // .sma
1685
    ftype = TSDB_FTYPE_SMA;
3,352✔
1686
    if (!writer->config->files[ftype].exist) {
3,352✔
1687
      op = (STFileOp){
2,290✔
1688
          .optype = TSDB_FOP_CREATE,
1689
          .fid = writer->config->fid,
2,290✔
1690
          .nf = writer->files[ftype],
2,290✔
1691
      };
1692
      tsdbTFileUpdVerRange(&op.nf, writer->ctx->range);
2,290✔
1693
      TAOS_CHECK_GOTO(TARRAY2_APPEND(opArr, op), &lino, _exit);
4,580!
1694
    } else if (writer->config->files[ftype].file.size != writer->files[ftype].size) {
1,062✔
1695
      op = (STFileOp){
1,060✔
1696
          .optype = TSDB_FOP_MODIFY,
1697
          .fid = writer->config->fid,
1,060✔
1698
          .of = writer->config->files[ftype].file,
1,060✔
1699
          .nf = writer->files[ftype],
1,060✔
1700
      };
1701
      tsdbTFileUpdVerRange(&op.nf, writer->ctx->range);
1,060✔
1702
      TAOS_CHECK_GOTO(TARRAY2_APPEND(opArr, op), &lino, _exit);
2,120!
1703
    }
1704
  }
1705

1706
  if (writer->fd[TSDB_FTYPE_TOMB]) {
4,394✔
1707
    STombRecord record[1] = {{
2,081✔
1708
        .suid = INT64_MAX,
1709
        .uid = INT64_MAX,
1710
        .version = INT64_MAX,
1711
    }};
1712

1713
    TAOS_CHECK_GOTO(tsdbDataFileDoWriteTombRecord(writer, record), &lino, _exit);
2,081!
1714
    TAOS_CHECK_GOTO(tsdbDataFileDoWriteTombBlock(writer), &lino, _exit);
2,081!
1715
    TAOS_CHECK_GOTO(tsdbDataFileDoWriteTombBlk(writer), &lino, _exit);
2,081!
1716
    TAOS_CHECK_GOTO(tsdbDataFileWriteTombFooter(writer), &lino, _exit);
2,081!
1717

1718
    SVersionRange ofRange = (SVersionRange){.minVer = VERSION_MAX, .maxVer = VERSION_MIN};
2,081✔
1719

1720
    ftype = TSDB_FTYPE_TOMB;
2,081✔
1721
    if (writer->config->files[ftype].exist) {
2,081✔
1722
      op = (STFileOp){
1,147✔
1723
          .optype = TSDB_FOP_REMOVE,
1724
          .fid = writer->config->fid,
1,147✔
1725
          .of = writer->config->files[ftype].file,
1,147✔
1726
      };
1727
      ofRange = (SVersionRange){.minVer = op.of.minVer, .maxVer = op.of.maxVer};
1,147✔
1728
      TAOS_CHECK_GOTO(TARRAY2_APPEND(opArr, op), &lino, _exit);
2,294!
1729
    }
1730
    op = (STFileOp){
2,081✔
1731
        .optype = TSDB_FOP_CREATE,
1732
        .fid = writer->config->fid,
2,081✔
1733
        .nf = writer->files[ftype],
2,081✔
1734
    };
1735
    tsdbTFileUpdVerRange(&op.nf, ofRange);
2,081✔
1736
    tsdbTFileUpdVerRange(&op.nf, writer->ctx->tombRange);
2,081✔
1737
    TAOS_CHECK_GOTO(TARRAY2_APPEND(opArr, op), &lino, _exit);
4,162!
1738
  }
1739
  int32_t encryptAlgorithm = writer->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
4,394✔
1740
  char   *encryptKey = writer->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
4,394✔
1741
  for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) {
21,970✔
1742
    if (writer->fd[i]) {
17,576✔
1743
      TAOS_CHECK_GOTO(tsdbFsyncFile(writer->fd[i], encryptAlgorithm, encryptKey), &lino, _exit);
12,136!
1744
      tsdbCloseFile(&writer->fd[i]);
12,137✔
1745
    }
1746
  }
1747

1748
_exit:
4,394✔
1749
  if (code) {
4,394!
1750
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1751
              tstrerror(code));
1752
  }
1753
  return code;
4,394✔
1754
}
1755

1756
static int32_t tsdbDataFileWriterOpenDataFD(SDataFileWriter *writer) {
3,351✔
1757
  int32_t code = 0;
3,351✔
1758
  int32_t lino = 0;
3,351✔
1759

1760
  int32_t ftypes[] = {TSDB_FTYPE_HEAD, TSDB_FTYPE_DATA, TSDB_FTYPE_SMA};
3,351✔
1761

1762
  for (int32_t i = 0; i < ARRAY_SIZE(ftypes); ++i) {
13,407✔
1763
    int32_t ftype = ftypes[i];
10,054✔
1764

1765
    char    fname[TSDB_FILENAME_LEN];
1766
    int32_t flag = TD_FILE_READ | TD_FILE_WRITE;
10,054✔
1767

1768
    if (writer->files[ftype].size == 0) {
10,054✔
1769
      flag |= (TD_FILE_CREATE | TD_FILE_TRUNC);
7,930✔
1770
    }
1771

1772
    int32_t lcn = writer->files[ftype].lcn;
10,054✔
1773
    tsdbTFileName(writer->config->tsdb, &writer->files[ftype], fname);
10,054✔
1774
    TAOS_CHECK_GOTO(tsdbOpenFile(fname, writer->config->tsdb, flag, &writer->fd[ftype], lcn), &lino, _exit);
10,055!
1775

1776
    if (writer->files[ftype].size == 0) {
10,055✔
1777
      uint8_t hdr[TSDB_FHDR_SIZE] = {0};
7,931✔
1778

1779
      int32_t encryptAlgorithm = writer->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
7,931✔
1780
      char   *encryptKey = writer->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
7,931✔
1781

1782
      TAOS_CHECK_GOTO(tsdbWriteFile(writer->fd[ftype], 0, hdr, TSDB_FHDR_SIZE, encryptAlgorithm, encryptKey), &lino,
7,931!
1783
                      _exit);
1784

1785
      writer->files[ftype].size += TSDB_FHDR_SIZE;
7,932✔
1786
    }
1787
  }
1788

1789
  if (writer->ctx->reader) {
3,353✔
1790
    TAOS_CHECK_GOTO(tsdbDataFileReadBrinBlk(writer->ctx->reader, &writer->ctx->brinBlkArray), &lino, _exit);
1,062!
1791
  }
1792

1793
_exit:
3,353✔
1794
  if (code) {
3,353!
1795
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1796
              tstrerror(code));
1797
  }
1798
  return code;
3,352✔
1799
}
1800

1801
int32_t tsdbDataFileWriterOpen(const SDataFileWriterConfig *config, SDataFileWriter **writer) {
5,799✔
1802
  writer[0] = taosMemoryCalloc(1, sizeof(*writer[0]));
5,799✔
1803
  if (!writer[0]) {
5,799!
1804
    return terrno;
×
1805
  }
1806

1807
  writer[0]->config[0] = config[0];
5,799✔
1808
  return 0;
5,799✔
1809
}
1810

1811
int32_t tsdbDataFileWriterClose(SDataFileWriter **writer, bool abort, TFileOpArray *opArr) {
5,800✔
1812
  if (writer[0] == NULL) return 0;
5,800!
1813

1814
  int32_t code = 0;
5,800✔
1815
  int32_t lino = 0;
5,800✔
1816

1817
  if (writer[0]->ctx->opened) {
5,800✔
1818
    if (abort) {
4,394!
1819
      TAOS_CHECK_GOTO(tsdbDataFileWriterCloseAbort(writer[0]), &lino, _exit);
×
1820
    } else {
1821
      TAOS_CHECK_GOTO(tsdbDataFileWriterCloseCommit(writer[0], opArr), &lino, _exit);
4,394!
1822
    }
1823
    tsdbDataFileWriterDoClose(writer[0]);
4,394✔
1824
  }
1825
  taosMemoryFree(writer[0]);
5,800✔
1826
  writer[0] = NULL;
5,800✔
1827

1828
_exit:
5,800✔
1829
  if (code) {
5,800!
1830
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer[0]->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1831
              tstrerror(code));
1832
  }
1833
  return code;
5,800✔
1834
}
1835

1836
int32_t tsdbDataFileWriteRow(SDataFileWriter *writer, SRowInfo *row) {
111,679,385✔
1837
  int32_t code = 0;
111,679,385✔
1838
  int32_t lino = 0;
111,679,385✔
1839

1840
  if (!writer->ctx->opened) {
111,679,385✔
1841
    TAOS_CHECK_GOTO(tsdbDataFileWriterDoOpen(writer), &lino, _exit);
55!
1842
  }
1843

1844
  if (writer->fd[TSDB_FTYPE_HEAD] == NULL) {
111,679,385✔
1845
    TAOS_CHECK_GOTO(tsdbDataFileWriterOpenDataFD(writer), &lino, _exit);
55!
1846
  }
1847

1848
  if (row->uid != writer->ctx->tbid->uid) {
111,679,385✔
1849
    TAOS_CHECK_GOTO(tsdbDataFileWriteTableDataEnd(writer), &lino, _exit);
7,174!
1850
    TAOS_CHECK_GOTO(tsdbDataFileWriteTableDataBegin(writer, (TABLEID *)row), &lino, _exit);
7,174!
1851
  }
1852

1853
  TAOS_CHECK_GOTO(tsdbDataFileDoWriteTSData(writer, &row->row), &lino, _exit);
111,679,385!
1854

1855
_exit:
111,680,802✔
1856
  if (code) {
111,680,802!
1857
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1858
              tstrerror(code));
1859
  }
1860
  return code;
111,683,277✔
1861
}
1862

1863
int32_t tsdbDataFileWriteBlockData(SDataFileWriter *writer, SBlockData *bData) {
169,008✔
1864
  if (bData->nRow == 0) {
169,008!
1865
    return 0;
×
1866
  }
1867

1868
  int32_t code = 0;
169,008✔
1869
  int32_t lino = 0;
169,008✔
1870

1871
  if (!bData->uid) {
169,008!
1872
    return TSDB_CODE_INVALID_PARA;
×
1873
  }
1874

1875
  if (!writer->ctx->opened) {
169,008✔
1876
    TAOS_CHECK_GOTO(tsdbDataFileWriterDoOpen(writer), &lino, _exit);
3,176!
1877
  }
1878

1879
  if (writer->fd[TSDB_FTYPE_DATA] == NULL) {
169,007✔
1880
    TAOS_CHECK_GOTO(tsdbDataFileWriterOpenDataFD(writer), &lino, _exit);
3,296!
1881
  }
1882

1883
  if (bData->uid != writer->ctx->tbid->uid) {
169,008✔
1884
    TAOS_CHECK_GOTO(tsdbDataFileWriteTableDataEnd(writer), &lino, _exit);
53,329!
1885
    TAOS_CHECK_GOTO(tsdbDataFileWriteTableDataBegin(writer, (TABLEID *)bData), &lino, _exit);
53,328!
1886
  }
1887

1888
  if (writer->ctx->tbHasOldData) {
169,009✔
1889
    STsdbRowKey key;
1890

1891
    tsdbRowGetKey(&tsdbRowFromBlockData(bData, 0), &key);
7,639✔
1892
    TAOS_CHECK_GOTO(tsdbDataFileDoWriteTableOldData(writer, &key), &lino, _exit);
7,639!
1893
  }
1894

1895
  if (!writer->ctx->tbHasOldData       //
169,009✔
1896
      && writer->blockData->nRow == 0  //
162,137!
1897
  ) {
1898
    TAOS_CHECK_GOTO(tsdbDataFileDoWriteBlockData(writer, bData), &lino, _exit);
162,137!
1899

1900
  } else {
1901
    for (int32_t i = 0; i < bData->nRow; ++i) {
10,337,556✔
1902
      TSDBROW row[1] = {tsdbRowFromBlockData(bData, i)};
10,331,001✔
1903
      TAOS_CHECK_GOTO(tsdbDataFileDoWriteTSData(writer, row), &lino, _exit);
10,331,001!
1904
    }
1905
  }
1906

1907
_exit:
6,555✔
1908
  if (code) {
168,691!
1909
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1910
              tstrerror(code));
1911
  }
1912
  return code;
169,007✔
1913
}
1914

1915
int32_t tsdbDataFileFlush(SDataFileWriter *writer) {
19,989✔
1916
  if (!writer->ctx->opened) {
19,989!
1917
    return TSDB_CODE_INVALID_PARA;
×
1918
  }
1919

1920
  if (writer->blockData->nRow == 0) return 0;
19,989!
1921
  if (writer->ctx->tbHasOldData) return 0;
19,989✔
1922

1923
  return tsdbDataFileDoWriteBlockData(writer, writer->blockData);
19,967✔
1924
}
1925

1926
static int32_t tsdbDataFileWriterOpenTombFD(SDataFileWriter *writer) {
2,081✔
1927
  int32_t code = 0;
2,081✔
1928
  int32_t lino = 0;
2,081✔
1929

1930
  char    fname[TSDB_FILENAME_LEN];
1931
  int32_t ftype = TSDB_FTYPE_TOMB;
2,081✔
1932

1933
  int32_t flag = (TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
2,081✔
1934

1935
  int32_t lcn = writer->files[ftype].lcn;
2,081✔
1936
  tsdbTFileName(writer->config->tsdb, writer->files + ftype, fname);
2,081✔
1937

1938
  TAOS_CHECK_GOTO(tsdbOpenFile(fname, writer->config->tsdb, flag, &writer->fd[ftype], lcn), &lino, _exit);
2,081!
1939

1940
  uint8_t hdr[TSDB_FHDR_SIZE] = {0};
2,081✔
1941
  int32_t encryptAlgorithm = writer->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
2,081✔
1942
  char   *encryptKey = writer->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
2,081✔
1943

1944
  TAOS_CHECK_GOTO(tsdbWriteFile(writer->fd[ftype], 0, hdr, TSDB_FHDR_SIZE, encryptAlgorithm, encryptKey), &lino, _exit);
2,081!
1945
  writer->files[ftype].size += TSDB_FHDR_SIZE;
2,081✔
1946

1947
  if (writer->ctx->reader) {
2,081✔
1948
    TAOS_CHECK_GOTO(tsdbDataFileReadTombBlk(writer->ctx->reader, &writer->ctx->tombBlkArray), &lino, _exit);
1,456!
1949

1950
    if (TARRAY2_SIZE(writer->ctx->tombBlkArray) > 0) {
1,456✔
1951
      writer->ctx->hasOldTomb = true;
1,147✔
1952
    }
1953

1954
    writer->ctx->tombBlkArrayIdx = 0;
1,456✔
1955
    tTombBlockClear(writer->ctx->tombBlock);
1,456✔
1956
    writer->ctx->tombBlockIdx = 0;
1,456✔
1957
  }
1958

1959
_exit:
625✔
1960
  if (code) {
2,081!
1961
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1962
              tstrerror(code));
1963
  }
1964
  return code;
2,081✔
1965
}
1966

1967
int32_t tsdbDataFileWriteTombRecord(SDataFileWriter *writer, const STombRecord *record) {
9,267✔
1968
  int32_t code = 0;
9,267✔
1969
  int32_t lino = 0;
9,267✔
1970

1971
  if (!writer->ctx->opened) {
9,267✔
1972
    TAOS_CHECK_GOTO(tsdbDataFileWriterDoOpen(writer), &lino, _exit);
1,163!
1973
  }
1974

1975
  if (writer->fd[TSDB_FTYPE_TOMB] == NULL) {
9,267✔
1976
    TAOS_CHECK_GOTO(tsdbDataFileWriterOpenTombFD(writer), &lino, _exit);
2,081!
1977
  }
1978

1979
  TAOS_CHECK_GOTO(tsdbDataFileDoWriteTombRecord(writer, record), &lino, _exit);
9,267!
1980

1981
_exit:
9,267✔
1982
  if (code) {
9,267!
1983
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1984
              tstrerror(code));
1985
  }
1986
  return code;
9,267✔
1987
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc