• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3597

05 Feb 2025 01:41AM UTC coverage: 63.546% (-0.02%) from 63.562%
#3597

push

travis-ci

web-flow
Merge pull request #29639 from taosdata/feat/3.0/TS-5795

Enh(tsdb):print fid while data file corrupted.

141230 of 285630 branches covered (49.45%)

Branch coverage included in aggregate %.

2 of 4 new or added lines in 1 file covered. (50.0%)

398 existing lines in 102 files now uncovered.

220015 of 282846 relevant lines covered (77.79%)

18937864.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.09
/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "meta.h"
17
#include "tsdbDataFileRW.h"
18

19
// SDataFileReader =============================================
20
struct SDataFileReader {
21
  SDataFileReaderConfig config[1];
22

23
  SBuffer  local[10];
24
  SBuffer *buffers;
25

26
  struct {
27
    bool headFooterLoaded;
28
    bool tombFooterLoaded;
29
    bool brinBlkLoaded;
30
    bool tombBlkLoaded;
31
  } ctx[1];
32

33
  STsdbFD *fd[TSDB_FTYPE_MAX];
34

35
  SHeadFooter   headFooter[1];
36
  STombFooter   tombFooter[1];
37
  TBrinBlkArray brinBlkArray[1];
38
  TTombBlkArray tombBlkArray[1];
39
};
40

41
static int32_t tsdbDataFileReadHeadFooter(SDataFileReader *reader) {
142,559✔
42
  if (reader->ctx->headFooterLoaded) {
142,559!
43
    return 0;
×
44
  }
45

46
  int32_t code = 0;
142,559✔
47
  int32_t lino = 0;
142,559✔
48

49
  int32_t ftype = TSDB_FTYPE_HEAD;
142,559✔
50
  if (reader->fd[ftype]) {
142,559✔
51
    int32_t encryptAlgorithm = reader->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
135,285✔
52
    char   *encryptKey = reader->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
135,285✔
53
#if 1
54
    TAOS_CHECK_GOTO(tsdbReadFile(reader->fd[ftype], reader->config->files[ftype].file.size - sizeof(SHeadFooter),
135,285!
55
                                 (uint8_t *)reader->headFooter, sizeof(SHeadFooter), 0, encryptAlgorithm, encryptKey),
56
                    &lino, _exit);
57
#else
58
    int64_t size = reader->config->files[ftype].file.size;
59
    for (; size > TSDB_FHDR_SIZE; size--) {
60
      code = tsdbReadFile(reader->fd[ftype], size - sizeof(SHeadFooter), (uint8_t *)reader->headFooter,
61
                          sizeof(SHeadFooter), 0, encryptAlgorithm, encryptKey);
62
      if (code) continue;
63
      if (reader->headFooter->brinBlkPtr->offset + reader->headFooter->brinBlkPtr->size + sizeof(SHeadFooter) == size) {
64
        break;
65
      }
66
    }
67
    if (size <= TSDB_FHDR_SIZE) {
68
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
69
    }
70
#endif
71
  }
72

73
  reader->ctx->headFooterLoaded = true;
142,448✔
74

75
_exit:
142,448✔
76
  if (code) {
142,448!
77
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, lino,
×
78
              tstrerror(code));
79
  }
80
  return code;
142,480✔
81
}
82

83
static int32_t tsdbDataFileReadTombFooter(SDataFileReader *reader) {
21,453✔
84
  if (reader->ctx->tombFooterLoaded) {
21,453!
85
    return 0;
×
86
  }
87

88
  int32_t code = 0;
21,453✔
89
  int32_t lino = 0;
21,453✔
90

91
  int32_t ftype = TSDB_FTYPE_TOMB;
21,453✔
92
  if (reader->fd[ftype]) {
21,453✔
93
    int32_t encryptAlgorithm = reader->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
21,051✔
94
    char   *encryptKey = reader->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
21,051✔
95
    TAOS_CHECK_GOTO(tsdbReadFile(reader->fd[ftype], reader->config->files[ftype].file.size - sizeof(STombFooter),
21,051!
96
                                 (uint8_t *)reader->tombFooter, sizeof(STombFooter), 0, encryptAlgorithm, encryptKey),
97
                    &lino, _exit);
98
  }
99
  reader->ctx->tombFooterLoaded = true;
21,453✔
100

101
_exit:
21,453✔
102
  if (code) {
21,453!
103
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, lino,
×
104
              tstrerror(code));
105
  }
106
  return code;
21,453✔
107
}
108

109
int32_t tsdbDataFileReaderOpen(const char *fname[], const SDataFileReaderConfig *config, SDataFileReader **reader) {
153,751✔
110
  int32_t code = 0;
153,751✔
111
  int32_t lino = 0;
153,751✔
112

113
  if ((*reader = taosMemoryCalloc(1, sizeof(**reader))) == NULL) {
153,751!
114
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
115
  }
116

117
  for (int32_t i = 0; i < ARRAY_SIZE(reader[0]->local); i++) {
1,690,574✔
118
    tBufferInit(reader[0]->local + i);
1,536,857✔
119
  }
120

121
  reader[0]->config[0] = config[0];
153,717✔
122
  reader[0]->buffers = config->buffers;
153,717✔
123
  if (reader[0]->buffers == NULL) {
153,717✔
124
    reader[0]->buffers = reader[0]->local;
152,040✔
125
  }
126

127
  if (fname) {
153,717✔
128
    for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) {
759,487✔
129
      if (fname[i]) {
607,554✔
130
        int32_t lcn = config->files[i].file.lcn;
456,117✔
131
        TAOS_CHECK_GOTO(tsdbOpenFile(fname[i], config->tsdb, TD_FILE_READ, &reader[0]->fd[i], lcn), &lino, _exit);
456,117!
132
      }
133
    }
134
  } else {
135
    for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) {
9,035✔
136
      if (config->files[i].exist) {
7,260✔
137
        char fname1[TSDB_FILENAME_LEN];
138
        tsdbTFileName(config->tsdb, &config->files[i].file, fname1);
4,694✔
139
        int32_t lcn = config->files[i].file.lcn;
4,693✔
140
        TAOS_CHECK_GOTO(tsdbOpenFile(fname1, config->tsdb, TD_FILE_READ, &reader[0]->fd[i], lcn), &lino, _exit);
4,693!
141
      }
142
    }
143
  }
144

145
_exit:
1,775✔
146
  if (code) {
153,708!
147
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(config->tsdb->pVnode), __func__, __FILE__, lino,
×
148
              tstrerror(code));
149
  }
150
  return code;
153,687✔
151
}
152

153
void tsdbDataFileReaderClose(SDataFileReader **reader) {
173,691✔
154
  if (reader[0] == NULL) {
173,691✔
155
    return;
20,089✔
156
  }
157

158
  TARRAY2_DESTROY(reader[0]->tombBlkArray, NULL);
153,602!
159
  TARRAY2_DESTROY(reader[0]->brinBlkArray, NULL);
153,602✔
160

161
  for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) {
768,625✔
162
    if (reader[0]->fd[i]) {
614,764✔
163
      tsdbCloseFile(&reader[0]->fd[i]);
460,818✔
164
    }
165
  }
166

167
  for (int32_t i = 0; i < ARRAY_SIZE(reader[0]->local); ++i) {
1,691,178✔
168
    tBufferDestroy(reader[0]->local + i);
1,537,288✔
169
  }
170

171
  taosMemoryFree(reader[0]);
153,890✔
172
  reader[0] = NULL;
153,756✔
173
}
174

175
int32_t tsdbDataFileReadBrinBlk(SDataFileReader *reader, const TBrinBlkArray **brinBlkArray) {
142,584✔
176
  int32_t code = 0;
142,584✔
177
  int32_t lino = 0;
142,584✔
178
  void   *data = NULL;
142,584✔
179

180
  if (!reader->ctx->brinBlkLoaded) {
142,584!
181
    TAOS_CHECK_GOTO(tsdbDataFileReadHeadFooter(reader), &lino, _exit);
142,606!
182

183
    if (reader->headFooter->brinBlkPtr->size > 0) {
142,489✔
184
      data = taosMemoryMalloc(reader->headFooter->brinBlkPtr->size);
135,202!
185
      if (data == NULL) {
135,186!
186
        TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
187
      }
188

189
      int32_t encryptAlgorithm = reader->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
135,186✔
190
      char   *encryptKey = reader->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
135,186✔
191

192
      TAOS_CHECK_GOTO(tsdbReadFile(reader->fd[TSDB_FTYPE_HEAD], reader->headFooter->brinBlkPtr->offset, data,
135,186!
193
                                   reader->headFooter->brinBlkPtr->size, 0, encryptAlgorithm, encryptKey),
194
                      &lino, _exit);
195

196
      int32_t size = reader->headFooter->brinBlkPtr->size / sizeof(SBrinBlk);
135,220✔
197
      TARRAY2_INIT_EX(reader->brinBlkArray, size, size, data);
135,220✔
198
    } else {
199
      TARRAY2_INIT(reader->brinBlkArray);
7,287✔
200
    }
201

202
    reader->ctx->brinBlkLoaded = true;
142,507✔
203
  }
204
  brinBlkArray[0] = reader->brinBlkArray;
142,485✔
205

206
_exit:
142,485✔
207
  if (code) {
142,485!
208
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, lino,
×
209
              tstrerror(code));
210
    taosMemoryFree(data);
×
211
  }
212
  return code;
142,428✔
213
}
214

215
int32_t tsdbDataFileReadBrinBlock(SDataFileReader *reader, const SBrinBlk *brinBlk, SBrinBlock *brinBlock) {
198,515✔
216
  int32_t code = 0;
198,515✔
217
  int32_t lino = 0;
198,515✔
218

219
  SBuffer *buffer = reader->buffers + 0;
198,515✔
220
  SBuffer *assist = reader->buffers + 1;
198,515✔
221

222
  int32_t encryptAlgorithm = reader->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
198,515✔
223
  char   *encryptKey = reader->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
198,515✔
224
  // load data
225
  tBufferClear(buffer);
226
  TAOS_CHECK_GOTO(tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_HEAD], brinBlk->dp->offset, brinBlk->dp->size, buffer, 0,
198,515!
227
                                       encryptAlgorithm, encryptKey),
228
                  &lino, _exit);
229

230
  // decode brin block
231
  SBufferReader br = BUFFER_READER_INITIALIZER(0, buffer);
198,517✔
232
  tBrinBlockClear(brinBlock);
198,517✔
233
  brinBlock->numOfPKs = brinBlk->numOfPKs;
198,668✔
234
  brinBlock->numOfRecords = brinBlk->numRec;
198,668✔
235
  for (int32_t i = 0; i < 10; i++) {  // int64_t
2,183,244✔
236

237
    SCompressInfo cinfo = {
1,984,671✔
238
        .cmprAlg = brinBlk->cmprAlg,
1,984,671✔
239
        .dataType = TSDB_DATA_TYPE_BIGINT,
240
        .compressedSize = brinBlk->size[i],
1,984,671✔
241
        .originalSize = brinBlk->numRec * sizeof(int64_t),
1,984,671✔
242
    };
243
    TAOS_CHECK_GOTO(tDecompressDataToBuffer(BR_PTR(&br), &cinfo, brinBlock->buffers + i, assist), &lino, _exit);
1,984,671!
244
    br.offset += brinBlk->size[i];
1,984,576✔
245
  }
246

247
  for (int32_t i = 10; i < 15; i++) {  // int32_t
1,191,473✔
248
    SCompressInfo cinfo = {
992,909✔
249
        .cmprAlg = brinBlk->cmprAlg,
992,909✔
250
        .dataType = TSDB_DATA_TYPE_INT,
251
        .compressedSize = brinBlk->size[i],
992,909✔
252
        .originalSize = brinBlk->numRec * sizeof(int32_t),
992,909✔
253
    };
254
    TAOS_CHECK_GOTO(tDecompressDataToBuffer(BR_PTR(&br), &cinfo, brinBlock->buffers + i, assist), &lino, _exit);
992,909!
255
    br.offset += brinBlk->size[i];
992,900✔
256
  }
257

258
  // primary keys
259
  if (brinBlk->numOfPKs > 0) {  // decode the primary keys
198,564✔
260
    SValueColumnCompressInfo firstInfos[TD_MAX_PK_COLS];
261
    SValueColumnCompressInfo lastInfos[TD_MAX_PK_COLS];
262

263
    for (int32_t i = 0; i < brinBlk->numOfPKs; i++) {
11,574✔
264
      TAOS_CHECK_GOTO(tValueColumnCompressInfoDecode(&br, firstInfos + i), &lino, _exit);
5,787!
265
    }
266
    for (int32_t i = 0; i < brinBlk->numOfPKs; i++) {
11,574✔
267
      TAOS_CHECK_GOTO(tValueColumnCompressInfoDecode(&br, lastInfos + i), &lino, _exit);
5,787!
268
    }
269

270
    for (int32_t i = 0; i < brinBlk->numOfPKs; i++) {
11,574✔
271
      SValueColumnCompressInfo *info = firstInfos + i;
5,787✔
272

273
      TAOS_CHECK_GOTO(tValueColumnDecompress(BR_PTR(&br), info, brinBlock->firstKeyPKs + i, assist), &lino, _exit);
5,787!
274
      br.offset += (info->offsetCompressedSize + info->dataCompressedSize);
5,787✔
275
    }
276

277
    for (int32_t i = 0; i < brinBlk->numOfPKs; i++) {
11,574✔
278
      SValueColumnCompressInfo *info = lastInfos + i;
5,787✔
279

280
      TAOS_CHECK_GOTO(tValueColumnDecompress(BR_PTR(&br), info, brinBlock->lastKeyPKs + i, assist), &lino, _exit);
5,787!
281
      br.offset += (info->offsetCompressedSize + info->dataCompressedSize);
5,787✔
282
    }
283
  }
284

285
  if (br.offset != br.buffer->size) {
198,564!
286
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
287
  }
288

289
_exit:
198,564✔
290
  if (code) {
198,564!
291
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, lino,
×
292
              tstrerror(code));
293
  }
294
  return code;
198,609✔
295
}
296

297
extern int32_t tBlockDataDecompress(SBufferReader *br, SBlockData *blockData, SBuffer *assist);
298

299
int32_t tsdbDataFileReadBlockData(SDataFileReader *reader, const SBrinRecord *record, SBlockData *bData) {
6,763✔
300
  int32_t code = 0;
6,763✔
301
  int32_t lino = 0;
6,763✔
302
  int32_t fid = reader->config->files[TSDB_FTYPE_DATA].file.fid;
6,763✔
303

304
  SBuffer *buffer = reader->buffers + 0;
6,763✔
305
  SBuffer *assist = reader->buffers + 1;
6,763✔
306

307
  int32_t encryptAlgorithm = reader->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
6,763✔
308
  char   *encryptKey = reader->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
6,763✔
309
  // load data
310
  tBufferClear(buffer);
311
  TAOS_CHECK_GOTO(tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_DATA], record->blockOffset, record->blockSize, buffer, 0,
6,763!
312
                                       encryptAlgorithm, encryptKey),
313
                  &lino, _exit);
314

315
  // decompress
316
  SBufferReader br = BUFFER_READER_INITIALIZER(0, buffer);
6,763✔
317
  TAOS_CHECK_GOTO(tBlockDataDecompress(&br, bData, assist), &lino, _exit);
6,763!
318

319
  if (br.offset != buffer->size) {
6,763!
320
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
321
  }
322

323
_exit:
6,763✔
324
  if (code) {
6,763!
NEW
325
    tsdbError("vgId:%d %s fid %d failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, fid,
×
326
              __FILE__, lino, tstrerror(code));
327
  }
328
  return code;
6,763✔
329
}
330

331
int32_t tsdbDataFileReadBlockDataByColumn(SDataFileReader *reader, const SBrinRecord *record, SBlockData *bData,
3,218,920✔
332
                                          STSchema *pTSchema, int16_t cids[], int32_t ncid) {
333
  int32_t code = 0;
3,218,920✔
334
  int32_t lino = 0;
3,218,920✔
335
  int32_t fid = reader->config->files[TSDB_FTYPE_DATA].file.fid;
3,218,920✔
336

337
  SDiskDataHdr hdr;
338
  SBuffer     *buffer0 = reader->buffers + 0;
3,218,920✔
339
  SBuffer     *buffer1 = reader->buffers + 1;
3,218,920✔
340
  SBuffer     *assist = reader->buffers + 2;
3,218,920✔
341

342
  int32_t encryptAlgorithm = reader->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
3,218,920✔
343
  char   *encryptKey = reader->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
3,218,920✔
344
  // load key part
345
  tBufferClear(buffer0);
346
  TAOS_CHECK_GOTO(tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_DATA], record->blockOffset, record->blockKeySize, buffer0,
3,218,920!
347
                                       0, encryptAlgorithm, encryptKey),
348
                  &lino, _exit);
349

350
  // SDiskDataHdr
351
  SBufferReader br = BUFFER_READER_INITIALIZER(0, buffer0);
3,219,144✔
352
  TAOS_CHECK_GOTO(tGetDiskDataHdr(&br, &hdr), &lino, _exit);
3,219,144!
353

354
  if (hdr.delimiter != TSDB_FILE_DLMT) {
3,219,164!
355
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
356
  }
357

358
  tBlockDataReset(bData);
3,219,164✔
359
  bData->suid = hdr.suid;
3,218,928✔
360
  bData->uid = hdr.uid;
3,218,928✔
361
  bData->nRow = hdr.nRow;
3,218,928✔
362

363
  // Key part
364
  TAOS_CHECK_GOTO(tBlockDataDecompressKeyPart(&hdr, &br, bData, assist), &lino, _exit);
3,218,928!
365
  if (br.offset != buffer0->size) {
3,219,289!
366
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
367
  }
368

369
  int extraColIdx = -1;
3,219,289✔
370
  for (int i = 0; i < ncid; i++) {
3,408,732✔
371
    if (tBlockDataGetColData(bData, cids[i]) == NULL) {
2,099,885✔
372
      extraColIdx = i;
1,910,465✔
373
      break;
1,910,465✔
374
    }
375
  }
376

377
  if (extraColIdx < 0) {
3,219,312✔
378
    goto _exit;
1,308,709✔
379
  }
380

381
  // load SBlockCol part
382
  tBufferClear(buffer0);
383
  TAOS_CHECK_GOTO(tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_DATA], record->blockOffset + record->blockKeySize,
1,910,603!
384
                                       hdr.szBlkCol, buffer0, 0, encryptAlgorithm, encryptKey),
385
                  &lino, _exit);
386

387
  // calc szHint
388
  int64_t szHint = 0;
1,910,551✔
389
  int     extraCols = 1;
1,910,551✔
390
  for (int i = extraColIdx + 1; i < ncid; ++i) {
1,910,546✔
391
    if (tBlockDataGetColData(bData, cids[i]) == NULL) {
104,521!
392
      ++extraCols;
104,532✔
393
      break;
104,532✔
394
    }
395
  }
396

397
  if (extraCols >= 2) {
1,910,557✔
398
    br = BUFFER_READER_INITIALIZER(0, buffer0);
104,539✔
399

400
    SBlockCol blockCol = {.cid = 0};
104,539✔
401
    for (int32_t i = extraColIdx; i < ncid; ++i) {
104,539✔
402
      int16_t extraColCid = cids[i];
104,403✔
403

404
      while (extraColCid > blockCol.cid) {
209,355✔
405
        if (br.offset >= buffer0->size) {
104,944!
406
          blockCol.cid = INT16_MAX;
×
407
          break;
×
408
        }
409

410
        TAOS_CHECK_GOTO(tGetBlockCol(&br, &blockCol, hdr.fmtVer, hdr.cmprAlg), &lino, _exit);
104,944!
411
      }
412

413
      if (extraColCid == blockCol.cid || blockCol.cid == INT16_MAX) {
104,411!
414
        extraColIdx = i;
104,411✔
415
        break;
104,411✔
416
      }
417
    }
418

419
    if (blockCol.cid > 0 && blockCol.cid < INT16_MAX /*&& blockCol->flag == HAS_VALUE*/) {
104,547✔
420
      int64_t   offset = blockCol.offset;
104,495✔
421
      SBlockCol lastNonNoneBlockCol = {.cid = 0};
104,495✔
422

423
      for (int32_t i = extraColIdx; i < ncid; ++i) {
850,473✔
424
        int16_t extraColCid = cids[i];
748,708✔
425

426
        while (extraColCid > blockCol.cid) {
1,395,830✔
427
          if (br.offset >= buffer0->size) {
649,852✔
428
            blockCol.cid = INT16_MAX;
2,524✔
429
            break;
2,524✔
430
          }
431

432
          TAOS_CHECK_GOTO(tGetBlockCol(&br, &blockCol, hdr.fmtVer, hdr.cmprAlg), &lino, _exit);
647,328!
433
        }
434

435
        if (extraColCid == blockCol.cid) {
748,502✔
436
          lastNonNoneBlockCol = blockCol;
745,973✔
437
          continue;
745,973✔
438
        }
439

440
        if (blockCol.cid == INT16_MAX) {
2,529✔
441
          break;
2,524✔
442
        }
443
      }
444

445
      if (lastNonNoneBlockCol.cid > 0) {
104,289!
446
        szHint = lastNonNoneBlockCol.offset + lastNonNoneBlockCol.szBitmap + lastNonNoneBlockCol.szOffset +
104,535✔
447
                 lastNonNoneBlockCol.szValue - offset;
104,535✔
448
      }
449
    }
450
  }
451

452
  // load each column
453
  SBlockCol blockCol = {
1,910,359✔
454
      .cid = 0,
455
  };
456
  bool firstRead = true;
1,910,359✔
457
  br = BUFFER_READER_INITIALIZER(0, buffer0);
1,910,359✔
458
  for (int32_t i = 0; i < ncid; i++) {
4,645,016✔
459
    int16_t cid = cids[i];
2,734,491✔
460

461
    if (tBlockDataGetColData(bData, cid)) {  // already loaded
2,734,491✔
462
      continue;
157,648✔
463
    }
464

465
    while (cid > blockCol.cid) {
5,874,647✔
466
      if (br.offset >= buffer0->size) {
3,300,352✔
467
        blockCol.cid = INT16_MAX;
2,524✔
468
        break;
2,524✔
469
      }
470

471
      TAOS_CHECK_GOTO(tGetBlockCol(&br, &blockCol, hdr.fmtVer, hdr.cmprAlg), &lino, _exit);
3,297,828!
472
    }
473

474
    if (cid < blockCol.cid) {
2,576,819✔
475
      const STColumn *tcol = tTSchemaSearchColumn(pTSchema, cid);
25,240✔
476
      TSDB_CHECK_NULL(tcol, code, lino, _exit, TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER);
25,240!
477
      SBlockCol none = {
25,240✔
478
          .cid = cid,
479
          .type = tcol->type,
25,240✔
480
          .cflag = tcol->flags,
25,240✔
481
          .flag = HAS_NONE,
482
          .szOrigin = 0,
483
          .szBitmap = 0,
484
          .szOffset = 0,
485
          .szValue = 0,
486
          .offset = 0,
487
      };
488
      TAOS_CHECK_GOTO(tBlockDataDecompressColData(&hdr, &none, &br, bData, assist), &lino, _exit);
25,240!
489
    } else if (cid == blockCol.cid) {
2,551,579✔
490
      int32_t encryptAlgorithm = reader->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
2,551,571✔
491
      char   *encryptKey = reader->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
2,551,571✔
492
      // load from file
493
      tBufferClear(buffer1);
494
      TAOS_CHECK_GOTO(tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_DATA],
2,551,571!
495
                                           record->blockOffset + record->blockKeySize + hdr.szBlkCol + blockCol.offset,
496
                                           blockCol.szBitmap + blockCol.szOffset + blockCol.szValue, buffer1,
497
                                           firstRead ? szHint : 0, encryptAlgorithm, encryptKey),
498
                      &lino, _exit);
499

500
      firstRead = false;
2,552,077✔
501

502
      // decode the buffer
503
      SBufferReader br1 = BUFFER_READER_INITIALIZER(0, buffer1);
2,552,077✔
504
      TAOS_CHECK_GOTO(tBlockDataDecompressColData(&hdr, &blockCol, &br1, bData, assist), &lino, _exit);
2,552,077!
505
    }
506
  }
507

508
_exit:
1,910,525✔
509
  if (code) {
3,219,234!
NEW
510
    tsdbError("vgId:%d %s fid:%d failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, fid,
×
511
              __FILE__, lino, tstrerror(code));
512
  }
513
  return code;
3,219,150✔
514
}
515

516
int32_t tsdbDataFileReadBlockSma(SDataFileReader *reader, const SBrinRecord *record,
2,455,343✔
517
                                 TColumnDataAggArray *columnDataAggArray) {
518
  int32_t  code = 0;
2,455,343✔
519
  int32_t  lino = 0;
2,455,343✔
520
  SBuffer *buffer = reader->buffers + 0;
2,455,343✔
521

522
  TARRAY2_CLEAR(columnDataAggArray, NULL);
2,455,343✔
523
  if (record->smaSize > 0) {
2,455,343!
524
    tBufferClear(buffer);
525
    int32_t encryptAlgorithm = reader->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
2,455,348✔
526
    char   *encryptKey = reader->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
2,455,348✔
527
    TAOS_CHECK_GOTO(tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_SMA], record->smaOffset, record->smaSize, buffer, 0,
2,455,348!
528
                                         encryptAlgorithm, encryptKey),
529
                    &lino, _exit);
530

531
    // decode sma data
532
    SBufferReader br = BUFFER_READER_INITIALIZER(0, buffer);
2,455,346✔
533
    while (br.offset < record->smaSize) {
9,837,967✔
534
      SColumnDataAgg sma[1];
535

536
      TAOS_CHECK_GOTO(tGetColumnDataAgg(&br, sma), &lino, _exit);
7,382,629!
537
      TAOS_CHECK_GOTO(TARRAY2_APPEND_PTR(columnDataAggArray, sma), &lino, _exit);
14,765,242!
538
    }
539
    if (br.offset != record->smaSize) {
2,455,338!
540
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
541
    }
542
  }
543

544
_exit:
×
545
  if (code) {
2,455,333!
546
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, lino,
×
547
              tstrerror(code));
548
  }
549
  return code;
2,455,344✔
550
}
551

552
int32_t tsdbDataFileReadTombBlk(SDataFileReader *reader, const TTombBlkArray **tombBlkArray) {
33,814✔
553
  int32_t code = 0;
33,814✔
554
  int32_t lino = 0;
33,814✔
555
  void   *data = NULL;
33,814✔
556

557
  if (!reader->ctx->tombBlkLoaded) {
33,814✔
558
    TAOS_CHECK_GOTO(tsdbDataFileReadTombFooter(reader), &lino, _exit);
21,453!
559

560
    if (reader->tombFooter->tombBlkPtr->size > 0) {
21,453✔
561
      if ((data = taosMemoryMalloc(reader->tombFooter->tombBlkPtr->size)) == NULL) {
21,050!
562
        TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
563
      }
564

565
      int32_t encryptAlgorithm = reader->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
21,051✔
566
      char   *encryptKey = reader->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
21,051✔
567
      TAOS_CHECK_GOTO(tsdbReadFile(reader->fd[TSDB_FTYPE_TOMB], reader->tombFooter->tombBlkPtr->offset, data,
21,051!
568
                                   reader->tombFooter->tombBlkPtr->size, 0, encryptAlgorithm, encryptKey),
569
                      &lino, _exit);
570

571
      int32_t size = reader->tombFooter->tombBlkPtr->size / sizeof(STombBlk);
21,051✔
572
      TARRAY2_INIT_EX(reader->tombBlkArray, size, size, data);
21,051✔
573
    } else {
574
      TARRAY2_INIT(reader->tombBlkArray);
403✔
575
    }
576

577
    reader->ctx->tombBlkLoaded = true;
21,454✔
578
  }
579
  tombBlkArray[0] = reader->tombBlkArray;
33,815✔
580

581
_exit:
33,815✔
582
  if (code) {
33,815!
583
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, lino,
×
584
              tstrerror(code));
585
    taosMemoryFree(data);
×
586
  }
587
  return code;
33,813✔
588
}
589

590
int32_t tsdbDataFileReadTombBlock(SDataFileReader *reader, const STombBlk *tombBlk, STombBlock *tData) {
25,369✔
591
  int32_t code = 0;
25,369✔
592
  int32_t lino = 0;
25,369✔
593

594
  SBuffer *buffer0 = reader->buffers + 0;
25,369✔
595
  SBuffer *assist = reader->buffers + 1;
25,369✔
596

597
  tBufferClear(buffer0);
598
  int32_t encryptAlgorithm = reader->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
25,369✔
599
  char   *encryptKey = reader->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
25,369✔
600
  TAOS_CHECK_GOTO(tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_TOMB], tombBlk->dp->offset, tombBlk->dp->size, buffer0, 0,
25,369!
601
                                       encryptAlgorithm, encryptKey),
602
                  &lino, _exit);
603

604
  int32_t       size = 0;
25,370✔
605
  SBufferReader br = BUFFER_READER_INITIALIZER(0, buffer0);
25,370✔
606
  tTombBlockClear(tData);
25,370✔
607
  tData->numOfRecords = tombBlk->numRec;
25,369✔
608
  for (int32_t i = 0; i < ARRAY_SIZE(tData->buffers); ++i) {
152,219✔
609
    SCompressInfo cinfo = {
126,849✔
610
        .cmprAlg = tombBlk->cmprAlg,
126,849✔
611
        .dataType = TSDB_DATA_TYPE_BIGINT,
612
        .originalSize = tombBlk->numRec * sizeof(int64_t),
126,849✔
613
        .compressedSize = tombBlk->size[i],
126,849✔
614
    };
615
    TAOS_CHECK_GOTO(tDecompressDataToBuffer(BR_PTR(&br), &cinfo, tData->buffers + i, assist), &lino, _exit);
126,849!
616
    br.offset += tombBlk->size[i];
126,850✔
617
  }
618

619
_exit:
25,370✔
620
  if (code) {
25,370!
621
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, lino,
×
622
              tstrerror(code));
623
  }
624
  return code;
25,370✔
625
}
626

627
// SDataFileWriter =============================================
628
struct SDataFileWriter {
629
  SDataFileWriterConfig config[1];
630

631
  SSkmInfo skmTb[1];
632
  SSkmInfo skmRow[1];
633
  SBuffer  local[10];
634
  SBuffer *buffers;
635

636
  struct {
637
    bool             opened;
638
    SDataFileReader *reader;
639

640
    // for ts data
641
    TABLEID tbid[1];
642
    bool    tbHasOldData;
643

644
    const TBrinBlkArray *brinBlkArray;
645
    int32_t              brinBlkArrayIdx;
646
    SBrinBlock           brinBlock[1];
647
    int32_t              brinBlockIdx;
648
    SBlockData           blockData[1];
649
    int32_t              blockDataIdx;
650
    // for tomb data
651
    bool                 hasOldTomb;
652
    const TTombBlkArray *tombBlkArray;
653
    int32_t              tombBlkArrayIdx;
654
    STombBlock           tombBlock[1];
655
    int32_t              tombBlockIdx;
656
    // range
657
    SVersionRange range;
658
    SVersionRange tombRange;
659
  } ctx[1];
660

661
  STFile   files[TSDB_FTYPE_MAX];
662
  STsdbFD *fd[TSDB_FTYPE_MAX];
663

664
  SHeadFooter headFooter[1];
665
  STombFooter tombFooter[1];
666

667
  TBrinBlkArray brinBlkArray[1];
668
  SBrinBlock    brinBlock[1];
669
  SBlockData    blockData[1];
670

671
  TTombBlkArray tombBlkArray[1];
672
  STombBlock    tombBlock[1];
673
};
674

675
static int32_t tsdbDataFileWriterCloseAbort(SDataFileWriter *writer) {
×
676
  tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, __LINE__,
×
677
            "not implemented");
678
  return 0;
×
679
}
680

681
static void tsdbDataFileWriterDoClose(SDataFileWriter *writer) {
4,038✔
682
  if (writer->ctx->reader) {
4,038✔
683
    tsdbDataFileReaderClose(&writer->ctx->reader);
1,711✔
684
  }
685

686
  tTombBlockDestroy(writer->tombBlock);
4,038✔
687
  TARRAY2_DESTROY(writer->tombBlkArray, NULL);
4,038!
688
  tBlockDataDestroy(writer->blockData);
4,038✔
689
  tBrinBlockDestroy(writer->brinBlock);
4,038✔
690
  TARRAY2_DESTROY(writer->brinBlkArray, NULL);
4,038!
691

692
  tTombBlockDestroy(writer->ctx->tombBlock);
4,038✔
693
  tBlockDataDestroy(writer->ctx->blockData);
4,038✔
694
  tBrinBlockDestroy(writer->ctx->brinBlock);
4,038✔
695

696
  for (int32_t i = 0; i < ARRAY_SIZE(writer->local); ++i) {
44,418✔
697
    tBufferDestroy(writer->local + i);
40,380!
698
  }
699

700
  tDestroyTSchema(writer->skmRow->pTSchema);
4,038!
701
  tDestroyTSchema(writer->skmTb->pTSchema);
4,038!
702
}
4,038✔
703

704
static int32_t tsdbDataFileWriterDoOpenReader(SDataFileWriter *writer) {
4,037✔
705
  int32_t code = 0;
4,037✔
706
  int32_t lino = 0;
4,037✔
707

708
  for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) {
14,923✔
709
    if (writer->config->files[i].exist) {
12,596✔
710
      SDataFileReaderConfig config[1] = {{
1,710✔
711
          .tsdb = writer->config->tsdb,
1,710✔
712
          .szPage = writer->config->szPage,
1,710✔
713
          .buffers = writer->buffers,
1,710✔
714
      }};
715

716
      for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) {
8,553✔
717
        config->files[i].exist = writer->config->files[i].exist;
6,843✔
718
        if (config->files[i].exist) {
6,843✔
719
          config->files[i].file = writer->config->files[i].file;
4,669✔
720
        }
721
      }
722

723
      TAOS_CHECK_GOTO(tsdbDataFileReaderOpen(NULL, config, &writer->ctx->reader), &lino, _exit);
1,710!
724
      break;
1,711✔
725
    }
726
  }
727

728
_exit:
2,327✔
729
  if (code) {
4,038!
730
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
731
              tstrerror(code));
732
  }
733
  return code;
4,037✔
734
}
735

736
static int32_t tsdbDataFileWriterDoOpen(SDataFileWriter *writer) {
4,037✔
737
  int32_t code = 0;
4,037✔
738
  int32_t lino = 0;
4,037✔
739
  int32_t ftype;
740

741
  if (!writer->config->skmTb) writer->config->skmTb = writer->skmTb;
4,037!
742
  if (!writer->config->skmRow) writer->config->skmRow = writer->skmRow;
4,037!
743
  writer->buffers = writer->config->buffers;
4,037✔
744
  if (writer->buffers == NULL) {
4,037!
745
    writer->buffers = writer->local;
×
746
  }
747

748
  // open reader
749
  TAOS_CHECK_GOTO(tsdbDataFileWriterDoOpenReader(writer), &lino, _exit);
4,037!
750

751
  // .head
752
  ftype = TSDB_FTYPE_HEAD;
4,037✔
753
  writer->files[ftype] = (STFile){
4,037✔
754
      .type = ftype,
755
      .did = writer->config->did,
4,037✔
756
      .fid = writer->config->fid,
4,037✔
757
      .cid = writer->config->cid,
4,037✔
758
      .size = 0,
759
      .minVer = VERSION_MAX,
760
      .maxVer = VERSION_MIN,
761
  };
762

763
  // .data
764
  ftype = TSDB_FTYPE_DATA;
4,037✔
765
  if (writer->config->files[ftype].exist) {
4,037✔
766
    writer->files[ftype] = writer->config->files[ftype].file;
1,185✔
767
  } else {
768
    writer->files[ftype] = (STFile){
5,704✔
769
        .type = ftype,
770
        .did = writer->config->did,
2,852✔
771
        .fid = writer->config->fid,
2,852✔
772
        .cid = writer->config->cid,
2,852✔
773
        .size = 0,
774
        .lcn = writer->config->lcn == -1 ? 0 : -1,
2,852✔
775
        .minVer = VERSION_MAX,
776
        .maxVer = VERSION_MIN,
777
    };
778
  }
779

780
  // .sma
781
  ftype = TSDB_FTYPE_SMA;
4,037✔
782
  if (writer->config->files[ftype].exist) {
4,037✔
783
    writer->files[ftype] = writer->config->files[ftype].file;
1,184✔
784
  } else {
785
    writer->files[ftype] = (STFile){
2,853✔
786
        .type = ftype,
787
        .did = writer->config->did,
2,853✔
788
        .fid = writer->config->fid,
2,853✔
789
        .cid = writer->config->cid,
2,853✔
790
        .size = 0,
791
        .minVer = VERSION_MAX,
792
        .maxVer = VERSION_MIN,
793
    };
794
  }
795

796
  // .tomb
797
  ftype = TSDB_FTYPE_TOMB;
4,037✔
798
  writer->files[ftype] = (STFile){
4,037✔
799
      .type = ftype,
800
      .did = writer->config->did,
4,037✔
801
      .fid = writer->config->fid,
4,037✔
802
      .cid = writer->config->cid,
4,037✔
803
      .size = 0,
804
      .minVer = VERSION_MAX,
805
      .maxVer = VERSION_MIN,
806
  };
807

808
  // range
809
  writer->ctx->range = (SVersionRange){.minVer = VERSION_MAX, .maxVer = VERSION_MIN};
4,037✔
810
  writer->ctx->tombRange = (SVersionRange){.minVer = VERSION_MAX, .maxVer = VERSION_MIN};
4,037✔
811

812
  writer->ctx->opened = true;
4,037✔
813

814
_exit:
4,037✔
815
  if (code) {
4,037!
816
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
817
              tstrerror(code));
818
  }
819
  return code;
4,036✔
820
}
821

822
void tsdbWriterUpdVerRange(SVersionRange *range, int64_t minVer, int64_t maxVer) {
589,378✔
823
  range->minVer = TMIN(range->minVer, minVer);
589,378✔
824
  range->maxVer = TMAX(range->maxVer, maxVer);
589,378✔
825
}
589,378✔
826

827
int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, uint32_t cmprAlg, int64_t *fileSize,
46,179✔
828
                               TBrinBlkArray *brinBlkArray, SBuffer *buffers, SVersionRange *range,
829
                               int32_t encryptAlgorithm, char *encryptKey) {
830
  if (brinBlock->numOfRecords == 0) {
46,179!
831
    return 0;
×
832
  }
833

834
  int32_t  code;
835
  SBuffer *buffer0 = buffers + 0;
46,179✔
836
  SBuffer *buffer1 = buffers + 1;
46,179✔
837
  SBuffer *assist = buffers + 2;
46,179✔
838

839
  SBrinBlk brinBlk = {
46,179✔
840
      .dp[0] =
841
          {
842
              .offset = *fileSize,
46,179✔
843
              .size = 0,
844
          },
845
      .numRec = brinBlock->numOfRecords,
46,179✔
846
      .numOfPKs = brinBlock->numOfPKs,
46,179✔
847
      .cmprAlg = cmprAlg,
848
  };
849
  for (int i = 0; i < brinBlock->numOfRecords; i++) {
11,204,334✔
850
    SBrinRecord record;
851

852
    TAOS_CHECK_RETURN(tBrinBlockGet(brinBlock, i, &record));
11,158,155!
853
    if (i == 0) {
11,158,155✔
854
      brinBlk.minTbid.suid = record.suid;
46,179✔
855
      brinBlk.minTbid.uid = record.uid;
46,179✔
856
      brinBlk.minVer = record.minVer;
46,179✔
857
      brinBlk.maxVer = record.maxVer;
46,179✔
858
    }
859
    if (i == brinBlock->numOfRecords - 1) {
11,158,155✔
860
      brinBlk.maxTbid.suid = record.suid;
46,179✔
861
      brinBlk.maxTbid.uid = record.uid;
46,179✔
862
    }
863
    if (record.minVer < brinBlk.minVer) {
11,158,155✔
864
      brinBlk.minVer = record.minVer;
6,132✔
865
    }
866
    if (record.maxVer > brinBlk.maxVer) {
11,158,155✔
867
      brinBlk.maxVer = record.maxVer;
4,362,106✔
868
    }
869
  }
870

871
  tsdbWriterUpdVerRange(range, brinBlk.minVer, brinBlk.maxVer);
46,179✔
872

873
  // write to file
874
  for (int32_t i = 0; i < 10; ++i) {
507,975✔
875
    SCompressInfo info = {
461,795✔
876
        .cmprAlg = cmprAlg,
877
        .dataType = TSDB_DATA_TYPE_BIGINT,
878
        .originalSize = brinBlock->buffers[i].size,
461,795✔
879
    };
880

881
    tBufferClear(buffer0);
882
    TAOS_CHECK_RETURN(tCompressDataToBuffer(brinBlock->buffers[i].data, &info, buffer0, assist));
461,795!
883
    TAOS_CHECK_RETURN(tsdbWriteFile(fd, *fileSize, buffer0->data, buffer0->size, encryptAlgorithm, encryptKey));
461,794!
884
    brinBlk.size[i] = info.compressedSize;
461,795✔
885
    brinBlk.dp->size += info.compressedSize;
461,795✔
886
    *fileSize += info.compressedSize;
461,795✔
887
  }
888
  for (int32_t i = 10; i < 15; ++i) {
277,080✔
889
    SCompressInfo info = {
230,900✔
890
        .cmprAlg = cmprAlg,
891
        .dataType = TSDB_DATA_TYPE_INT,
892
        .originalSize = brinBlock->buffers[i].size,
230,900✔
893
    };
894

895
    tBufferClear(buffer0);
896
    TAOS_CHECK_RETURN(tCompressDataToBuffer(brinBlock->buffers[i].data, &info, buffer0, assist));
230,900!
897
    TAOS_CHECK_RETURN(tsdbWriteFile(fd, *fileSize, buffer0->data, buffer0->size, encryptAlgorithm, encryptKey));
230,900!
898
    brinBlk.size[i] = info.compressedSize;
230,900✔
899
    brinBlk.dp->size += info.compressedSize;
230,900✔
900
    *fileSize += info.compressedSize;
230,900✔
901
  }
902

903
  // write primary keys to file
904
  if (brinBlock->numOfPKs > 0) {
46,180✔
905
    tBufferClear(buffer0);
906
    tBufferClear(buffer1);
907

908
    // encode
909
    for (int i = 0; i < brinBlock->numOfPKs; i++) {
3,156✔
910
      SValueColumnCompressInfo info = {.cmprAlg = cmprAlg};
1,578✔
911
      TAOS_CHECK_RETURN(tValueColumnCompress(&brinBlock->firstKeyPKs[i], &info, buffer1, assist));
1,578!
912
      TAOS_CHECK_RETURN(tValueColumnCompressInfoEncode(&info, buffer0));
1,578!
913
    }
914
    for (int i = 0; i < brinBlock->numOfPKs; i++) {
3,156✔
915
      SValueColumnCompressInfo info = {.cmprAlg = cmprAlg};
1,578✔
916
      TAOS_CHECK_RETURN(tValueColumnCompress(&brinBlock->lastKeyPKs[i], &info, buffer1, assist));
1,578!
917
      TAOS_CHECK_RETURN(tValueColumnCompressInfoEncode(&info, buffer0));
1,578!
918
    }
919

920
    // write to file
921
    TAOS_CHECK_RETURN(tsdbWriteFile(fd, *fileSize, buffer0->data, buffer0->size, encryptAlgorithm, encryptKey));
1,578!
922
    *fileSize += buffer0->size;
1,578✔
923
    brinBlk.dp->size += buffer0->size;
1,578✔
924
    TAOS_CHECK_RETURN(tsdbWriteFile(fd, *fileSize, buffer1->data, buffer1->size, encryptAlgorithm, encryptKey));
1,578!
925
    *fileSize += buffer1->size;
1,578✔
926
    brinBlk.dp->size += buffer1->size;
1,578✔
927
  }
928

929
  // append to brinBlkArray
930
  TAOS_CHECK_RETURN(TARRAY2_APPEND_PTR(brinBlkArray, &brinBlk));
92,361!
931

932
  tBrinBlockClear(brinBlock);
46,181✔
933

934
  return 0;
46,180✔
935
}
936

937
static int32_t tsdbDataFileWriteBrinBlock(SDataFileWriter *writer) {
46,206✔
938
  if (writer->brinBlock->numOfRecords == 0) {
46,206✔
939
    return 0;
26✔
940
  }
941

942
  int32_t code = 0;
46,180✔
943
  int32_t lino = 0;
46,180✔
944

945
  int32_t encryptAlgorithm = writer->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
46,180✔
946
  char   *encryptKey = writer->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
46,180✔
947

948
  TAOS_CHECK_GOTO(tsdbFileWriteBrinBlock(writer->fd[TSDB_FTYPE_HEAD], writer->brinBlock, writer->config->cmprAlg,
46,180!
949
                                         &writer->files[TSDB_FTYPE_HEAD].size, writer->brinBlkArray, writer->buffers,
950
                                         &writer->ctx->range, encryptAlgorithm, encryptKey),
951
                  &lino, _exit);
952

953
_exit:
46,180✔
954
  if (code) {
46,180!
955
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
956
              tstrerror(code));
957
  }
958
  return code;
46,180✔
959
}
960

961
static int32_t tsdbDataFileWriteBrinRecord(SDataFileWriter *writer, const SBrinRecord *record) {
11,158,153✔
962
  int32_t code = 0;
11,158,153✔
963
  int32_t lino = 0;
11,158,153✔
964

965
  for (;;) {
966
    code = tBrinBlockPut(writer->brinBlock, record);
11,158,159✔
967
    if (code == TSDB_CODE_INVALID_PARA) {
11,158,159✔
968
      // different records with different primary keys
969
      TAOS_CHECK_GOTO(tsdbDataFileWriteBrinBlock(writer), &lino, _exit);
6!
970
      continue;
6✔
971
    } else {
972
      TSDB_CHECK_CODE(code, lino, _exit);
11,158,153!
973
    }
974
    break;
11,158,153✔
975
  }
976

977
  if ((writer->brinBlock->numOfRecords) >= 256) {
11,158,153✔
978
    TAOS_CHECK_GOTO(tsdbDataFileWriteBrinBlock(writer), &lino, _exit);
43,154!
979
  }
980

981
_exit:
11,158,153✔
982
  if (code) {
11,158,153!
983
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
984
              tstrerror(code));
985
  }
986
  return code;
11,158,153✔
987
}
988

989
static int32_t tsdbDataFileDoWriteBlockData(SDataFileWriter *writer, SBlockData *bData) {
236,286✔
990
  if (bData->nRow == 0) {
236,286✔
991
    return 0;
35,812✔
992
  }
993

994
  if (!bData->uid) {
200,474!
995
    return TSDB_CODE_INVALID_PARA;
×
996
  }
997

998
  int32_t  code = 0;
200,474✔
999
  int32_t  lino = 0;
200,474✔
1000
  SBuffer *buffers = writer->buffers;
200,474✔
1001
  SBuffer *assist = writer->buffers + 4;
200,474✔
1002

1003
  SColCompressInfo cmprInfo = {.pColCmpr = NULL, .defaultCmprAlg = writer->config->cmprAlg};
200,474✔
1004

1005
  SBrinRecord record[1] = {{
200,474✔
1006
      .suid = bData->suid,
200,474✔
1007
      .uid = bData->uid,
200,474✔
1008
      .minVer = bData->aVersion[0],
200,474✔
1009
      .maxVer = bData->aVersion[0],
200,474✔
1010
      .blockOffset = writer->files[TSDB_FTYPE_DATA].size,
200,474✔
1011
      .smaOffset = writer->files[TSDB_FTYPE_SMA].size,
200,474✔
1012
      .blockSize = 0,
1013
      .blockKeySize = 0,
1014
      .smaSize = 0,
1015
      .numRow = bData->nRow,
200,474✔
1016
      .count = 1,
1017
  }};
1018

1019
  tsdbRowGetKey(&tsdbRowFromBlockData(bData, 0), &record->firstKey);
200,474✔
1020
  tsdbRowGetKey(&tsdbRowFromBlockData(bData, bData->nRow - 1), &record->lastKey);
200,477✔
1021

1022
  for (int32_t i = 1; i < bData->nRow; ++i) {
656,598,863✔
1023
    if (tsdbRowCompareWithoutVersion(&tsdbRowFromBlockData(bData, i - 1), &tsdbRowFromBlockData(bData, i)) != 0) {
656,398,386✔
1024
      record->count++;
656,389,394✔
1025
    }
1026
    if (bData->aVersion[i] < record->minVer) {
656,396,457✔
1027
      record->minVer = bData->aVersion[i];
6,748✔
1028
    }
1029
    if (bData->aVersion[i] > record->maxVer) {
656,396,457✔
1030
      record->maxVer = bData->aVersion[i];
8,441,192✔
1031
    }
1032
  }
1033

1034
  tsdbWriterUpdVerRange(&writer->ctx->range, record->minVer, record->maxVer);
200,477✔
1035

1036
  code = metaGetColCmpr(writer->config->tsdb->pVnode->pMeta, bData->suid != 0 ? bData->suid : bData->uid,
200,477✔
1037
                        &cmprInfo.pColCmpr);
1038
  if (code) {
200,477!
1039
    tsdbWarn("vgId:%d failed to get column compress algrithm", TD_VID(writer->config->tsdb->pVnode));
×
1040
  }
1041

1042
  TAOS_CHECK_GOTO(tBlockDataCompress(bData, &cmprInfo, buffers, assist), &lino, _exit);
200,477!
1043

1044
  record->blockKeySize = buffers[0].size + buffers[1].size;
200,476✔
1045
  record->blockSize = record->blockKeySize + buffers[2].size + buffers[3].size;
200,476✔
1046

1047
  int32_t encryptAlgorithm = writer->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
200,476✔
1048
  char   *encryptKey = writer->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
200,476✔
1049
  for (int i = 0; i < 4; i++) {
1,002,379✔
1050
    TAOS_CHECK_GOTO(tsdbWriteFile(writer->fd[TSDB_FTYPE_DATA], writer->files[TSDB_FTYPE_DATA].size, buffers[i].data,
801,903!
1051
                                  buffers[i].size, encryptAlgorithm, encryptKey),
1052
                    &lino, _exit);
1053
    writer->files[TSDB_FTYPE_DATA].size += buffers[i].size;
801,903✔
1054
  }
1055

1056
  // to .sma file
1057
  tBufferClear(&buffers[0]);
1058
  for (int32_t i = 0; i < bData->nColData; ++i) {
868,303✔
1059
    SColData *colData = bData->aColData + i;
667,833✔
1060
    if ((colData->cflag & COL_SMA_ON) == 0 || ((colData->flag & HAS_VALUE) == 0)) continue;
667,833✔
1061

1062
    SColumnDataAgg sma[1] = {{.colId = colData->cid}};
613,316✔
1063
    tColDataCalcSMA[colData->type](colData, &sma->sum, &sma->max, &sma->min, &sma->numOfNull);
613,316✔
1064

1065
    TAOS_CHECK_GOTO(tPutColumnDataAgg(&buffers[0], sma), &lino, _exit);
613,307!
1066
  }
1067
  record->smaSize = buffers[0].size;
200,470✔
1068

1069
  if (record->smaSize > 0) {
200,470✔
1070
    TAOS_CHECK_GOTO(tsdbWriteFile(writer->fd[TSDB_FTYPE_SMA], record->smaOffset, buffers[0].data, record->smaSize,
183,812!
1071
                                  encryptAlgorithm, encryptKey),
1072
                    &lino, _exit);
1073
    writer->files[TSDB_FTYPE_SMA].size += record->smaSize;
183,811✔
1074
  }
1075

1076
  // append SBrinRecord
1077
  TAOS_CHECK_GOTO(tsdbDataFileWriteBrinRecord(writer, record), &lino, _exit);
200,469!
1078

1079
  tBlockDataClear(bData);
200,475✔
1080

1081
_exit:
200,476✔
1082
  if (code) {
200,476!
1083
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1084
              tstrerror(code));
1085
  }
1086
  taosHashCleanup(cmprInfo.pColCmpr);
200,476✔
1087
  return code;
200,476✔
1088
}
1089

1090
static int32_t tsdbDataFileDoWriteTSRow(SDataFileWriter *writer, TSDBROW *row) {
113,359,803✔
1091
  int32_t code = 0;
113,359,803✔
1092
  int32_t lino = 0;
113,359,803✔
1093

1094
  // update/append
1095
  if (row->type == TSDBROW_ROW_FMT) {
113,359,803!
1096
    TAOS_CHECK_GOTO(
×
1097
        tsdbUpdateSkmRow(writer->config->tsdb, writer->ctx->tbid, TSDBROW_SVERSION(row), writer->config->skmRow), &lino,
1098
        _exit);
1099
  }
1100

1101
  if (TSDBROW_VERSION(row) <= writer->config->compactVersion  //
113,359,803!
1102
      && writer->blockData->nRow > 0                          //
113,362,009✔
1103
      &&
113,265,271✔
1104
      tsdbRowCompareWithoutVersion(row, &tsdbRowFromBlockData(writer->blockData, writer->blockData->nRow - 1)) == 0  //
113,316,876✔
1105
  ) {
1106
    TAOS_CHECK_GOTO(tBlockDataUpdateRow(writer->blockData, row, writer->config->skmRow->pTSchema), &lino, _exit);
668,527!
1107
  } else {
1108
    if (writer->blockData->nRow >= writer->config->maxRow) {
112,639,671✔
1109
      TAOS_CHECK_GOTO(tsdbDataFileDoWriteBlockData(writer, writer->blockData), &lino, _exit);
10!
1110
    }
1111

1112
    TAOS_CHECK_GOTO(
112,639,671!
1113
        tBlockDataAppendRow(writer->blockData, row, writer->config->skmRow->pTSchema, writer->ctx->tbid->uid), &lino,
1114
        _exit);
1115
  }
1116

1117
_exit:
113,376,634✔
1118
  if (code) {
113,376,634!
1119
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1120
              tstrerror(code));
1121
  }
1122
  return code;
113,374,150✔
1123
}
1124

1125
static FORCE_INLINE int32_t tsdbRowKeyCmprNullAsLargest(const STsdbRowKey *key1, const STsdbRowKey *key2) {
1126
  if (key1 == NULL) {
22,480,588!
1127
    return 1;
8✔
1128
  } else if (key2 == NULL) {
22,480,580!
1129
    return -1;
7,098✔
1130
  } else {
1131
    return tsdbRowKeyCmpr(key1, key2);
22,473,482✔
1132
  }
1133
}
1134

1135
static int32_t tsdbDataFileDoWriteTableOldData(SDataFileWriter *writer, const STsdbRowKey *key) {
710,405✔
1136
  if (writer->ctx->tbHasOldData == false) {
710,405!
1137
    return 0;
×
1138
  }
1139

1140
  int32_t     code = 0;
710,405✔
1141
  int32_t     lino = 0;
710,405✔
1142
  STsdbRowKey rowKey;
1143

1144
  for (;;) {
41,006✔
1145
    for (;;) {
1146
      // SBlockData
1147
      for (; writer->ctx->blockDataIdx < writer->ctx->blockData->nRow; writer->ctx->blockDataIdx++) {
1,525,959✔
1148
        TSDBROW row = tsdbRowFromBlockData(writer->ctx->blockData, writer->ctx->blockDataIdx);
1,413,220✔
1149

1150
        tsdbRowGetKey(&row, &rowKey);
1,413,220✔
1151
        if (tsdbRowKeyCmprNullAsLargest(&rowKey, key) < 0) {  // key <= rowKey
1,413,220✔
1152
          TAOS_CHECK_GOTO(tsdbDataFileDoWriteTSRow(writer, &row), &lino, _exit);
1,413,220!
1153
        } else {
1154
          goto _exit;
686,568✔
1155
        }
1156
      }
1157

1158
      // SBrinBlock
1159
      if (writer->ctx->brinBlockIdx >= writer->ctx->brinBlock->numOfRecords) {
112,739✔
1160
        break;
41,244✔
1161
      }
1162

1163
      for (; writer->ctx->brinBlockIdx < writer->ctx->brinBlock->numOfRecords; writer->ctx->brinBlockIdx++) {
10,590,103✔
1164
        SBrinRecord record;
1165
        code = tBrinBlockGet(writer->ctx->brinBlock, writer->ctx->brinBlockIdx, &record);
10,548,889✔
1166
        TSDB_CHECK_CODE(code, lino, _exit);
10,572,488!
1167
        if (record.uid != writer->ctx->tbid->uid) {
10,548,889✔
1168
          writer->ctx->tbHasOldData = false;
6,811✔
1169
          goto _exit;
6,811✔
1170
        }
1171

1172
        if (tsdbRowKeyCmprNullAsLargest(key, &record.firstKey) < 0) {  // key < record->firstKey
10,542,078✔
1173
          goto _exit;
16,788✔
1174
        } else {
1175
          SBrinRecord record[1];
1176
          code = tBrinBlockGet(writer->ctx->brinBlock, writer->ctx->brinBlockIdx, record);
10,525,290✔
1177
          TSDB_CHECK_CODE(code, lino, _exit);
10,525,290!
1178
          if (tsdbRowKeyCmprNullAsLargest(key, &record->lastKey) > 0) {  // key > record->lastKey
10,525,290✔
1179
            if (writer->blockData->nRow > 0) {
10,518,608✔
1180
              TAOS_CHECK_GOTO(tsdbDataFileDoWriteBlockData(writer, writer->blockData), &lino, _exit);
17!
1181
            }
1182

1183
            TAOS_CHECK_GOTO(tsdbDataFileWriteBrinRecord(writer, record), &lino, _exit);
10,518,608!
1184
          } else {
1185
            TAOS_CHECK_GOTO(tsdbDataFileReadBlockData(writer->ctx->reader, record, writer->ctx->blockData), &lino,
6,682!
1186
                            _exit);
1187

1188
            writer->ctx->blockDataIdx = 0;
6,682✔
1189
            writer->ctx->brinBlockIdx++;
6,682✔
1190
            break;
6,682✔
1191
          }
1192
        }
1193
      }
1194
    }
1195

1196
    // SBrinBlk
1197
    if (writer->ctx->brinBlkArrayIdx >= TARRAY2_SIZE(writer->ctx->brinBlkArray)) {
41,244✔
1198
      writer->ctx->brinBlkArray = NULL;
214✔
1199
      writer->ctx->tbHasOldData = false;
214✔
1200
      goto _exit;
214✔
1201
    } else {
1202
      const SBrinBlk *brinBlk = TARRAY2_GET_PTR(writer->ctx->brinBlkArray, writer->ctx->brinBlkArrayIdx);
41,030✔
1203

1204
      if (brinBlk->minTbid.uid != writer->ctx->tbid->uid) {
41,030✔
1205
        writer->ctx->tbHasOldData = false;
24✔
1206
        goto _exit;
24✔
1207
      }
1208

1209
      TAOS_CHECK_GOTO(tsdbDataFileReadBrinBlock(writer->ctx->reader, brinBlk, writer->ctx->brinBlock), &lino, _exit);
41,006!
1210

1211
      writer->ctx->brinBlockIdx = 0;
41,006✔
1212
      writer->ctx->brinBlkArrayIdx++;
41,006✔
1213
    }
1214
  }
1215

1216
_exit:
710,405✔
1217
  if (code) {
710,405!
1218
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1219
              tstrerror(code));
1220
  }
1221
  return code;
710,405✔
1222
}
1223

1224
static int32_t tsdbDataFileDoWriteTSData(SDataFileWriter *writer, TSDBROW *row) {
112,635,865✔
1225
  int32_t code = 0;
112,635,865✔
1226
  int32_t lino = 0;
112,635,865✔
1227

1228
  if (writer->ctx->tbHasOldData) {
112,635,865✔
1229
    STsdbRowKey key;
1230
    tsdbRowGetKey(row, &key);
703,350✔
1231
    TAOS_CHECK_GOTO(tsdbDataFileDoWriteTableOldData(writer, &key), &lino, _exit);
703,350!
1232
  }
1233

1234
  TAOS_CHECK_GOTO(tsdbDataFileDoWriteTSRow(writer, row), &lino, _exit);
112,635,865!
1235

1236
_exit:
112,649,067✔
1237
  if (code) {
112,649,067!
1238
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1239
              tstrerror(code));
1240
  }
1241
  return code;
112,649,462✔
1242
}
1243

1244
static int32_t tsdbDataFileWriteTableDataEnd(SDataFileWriter *writer) {
65,464✔
1245
  if (writer->ctx->tbid->uid == 0) {
65,464✔
1246
    return 0;
3,046✔
1247
  }
1248

1249
  int32_t code = 0;
62,418✔
1250
  int32_t lino = 0;
62,418✔
1251

1252
  if (writer->ctx->tbHasOldData) {
62,418✔
1253
    TAOS_CHECK_GOTO(tsdbDataFileDoWriteTableOldData(writer, NULL /* as the largest key */), &lino, _exit);
10!
1254
  }
1255

1256
  TAOS_CHECK_GOTO(tsdbDataFileDoWriteBlockData(writer, writer->blockData), &lino, _exit);
62,418!
1257

1258
_exit:
62,418✔
1259
  if (code) {
62,418!
1260
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1261
              tstrerror(code));
1262
  }
1263
  return code;
62,418✔
1264
}
1265

1266
static int32_t tsdbDataFileWriteTableDataBegin(SDataFileWriter *writer, const TABLEID *tbid) {
65,463✔
1267
  int32_t code = 0;
65,463✔
1268
  int32_t lino = 0;
65,463✔
1269

1270
  SMetaInfo info;
1271
  bool      drop = false;
65,463✔
1272
  TABLEID   tbid1[1];
1273
  writer->ctx->tbHasOldData = false;
65,463✔
1274
  while (writer->ctx->brinBlkArray) {  // skip data of previous table
67,709✔
1275
    for (; writer->ctx->brinBlockIdx < writer->ctx->brinBlock->numOfRecords; writer->ctx->brinBlockIdx++) {
472,016✔
1276
      SBrinRecord record;
1277
      TAOS_CHECK_GOTO(tBrinBlockGet(writer->ctx->brinBlock, writer->ctx->brinBlockIdx, &record), &lino, _exit);
469,194!
1278

1279
      if (record.uid == tbid->uid) {
469,194✔
1280
        writer->ctx->tbHasOldData = true;
7,049✔
1281
        goto _begin;
29,271✔
1282
      } else if (record.suid > tbid->suid || (record.suid == tbid->suid && record.uid > tbid->uid)) {
462,145✔
1283
        goto _begin;
22,222✔
1284
      } else {
1285
        if (record.uid != writer->ctx->tbid->uid) {
439,923✔
1286
          if (drop && tbid1->uid == record.uid) {
235,882✔
1287
            continue;
853✔
1288
          } else if (metaGetInfo(writer->config->tsdb->pVnode->pMeta, record.uid, &info, NULL) != 0) {
235,862✔
1289
            drop = true;
833✔
1290
            tbid1->suid = record.suid;
833✔
1291
            tbid1->uid = record.uid;
833✔
1292
            continue;
833✔
1293
          } else {
1294
            drop = false;
235,029✔
1295
            writer->ctx->tbid->suid = record.suid;
235,029✔
1296
            writer->ctx->tbid->uid = record.uid;
235,029✔
1297
          }
1298
        }
1299

1300
        TAOS_CHECK_GOTO(tsdbDataFileWriteBrinRecord(writer, &record), &lino, _exit);
439,070!
1301
      }
1302
    }
1303

1304
    if (writer->ctx->brinBlkArrayIdx >= TARRAY2_SIZE(writer->ctx->brinBlkArray)) {
2,822✔
1305
      writer->ctx->brinBlkArray = NULL;
576✔
1306
      break;
576✔
1307
    } else {
1308
      const SBrinBlk *brinBlk = TARRAY2_GET_PTR(writer->ctx->brinBlkArray, writer->ctx->brinBlkArrayIdx);
2,246✔
1309

1310
      TAOS_CHECK_GOTO(tsdbDataFileReadBrinBlock(writer->ctx->reader, brinBlk, writer->ctx->brinBlock), &lino, _exit);
2,246!
1311

1312
      writer->ctx->brinBlockIdx = 0;
2,246✔
1313
      writer->ctx->brinBlkArrayIdx++;
2,246✔
1314
    }
1315
  }
1316

1317
_begin:
35,616✔
1318
  writer->ctx->tbid[0] = *tbid;
65,463✔
1319

1320
  if (tbid->uid == INT64_MAX) {
65,463✔
1321
    goto _exit;
3,046✔
1322
  }
1323

1324
  TAOS_CHECK_GOTO(tsdbUpdateSkmTb(writer->config->tsdb, tbid, writer->config->skmTb), &lino, _exit);
62,417!
1325
  TAOS_CHECK_GOTO(tBlockDataInit(writer->blockData, writer->ctx->tbid, writer->config->skmTb->pTSchema, NULL, 0), &lino,
62,417!
1326
                  _exit);
1327

1328
_exit:
62,417✔
1329
  if (code) {
65,463!
1330
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1331
              tstrerror(code));
1332
  }
1333
  return code;
65,463✔
1334
}
1335

1336
int32_t tsdbFileWriteHeadFooter(STsdbFD *fd, int64_t *fileSize, const SHeadFooter *footer, int32_t encryptAlgorithm,
3,045✔
1337
                                char *encryptKey) {
1338
  TAOS_CHECK_RETURN(
3,045!
1339
      tsdbWriteFile(fd, *fileSize, (const uint8_t *)footer, sizeof(*footer), encryptAlgorithm, encryptKey));
1340
  *fileSize += sizeof(*footer);
3,046✔
1341
  return 0;
3,046✔
1342
}
1343

1344
int32_t tsdbFileWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAlg, int64_t *fileSize,
11,055✔
1345
                               TTombBlkArray *tombBlkArray, SBuffer *buffers, SVersionRange *range,
1346
                               int32_t encryptAlgorithm, char *encryptKey) {
1347
  int32_t code;
1348

1349
  if (TOMB_BLOCK_SIZE(tombBlock) == 0) {
11,055!
1350
    return 0;
×
1351
  }
1352

1353
  SBuffer *buffer0 = buffers + 0;
11,055✔
1354
  SBuffer *assist = buffers + 1;
11,055✔
1355

1356
  STombBlk tombBlk = {
11,055✔
1357
      .dp[0] =
1358
          {
1359
              .offset = *fileSize,
11,055✔
1360
              .size = 0,
1361
          },
1362
      .numRec = TOMB_BLOCK_SIZE(tombBlock),
11,055✔
1363
      .cmprAlg = cmprAlg,
1364
  };
1365
  for (int i = 0; i < TOMB_BLOCK_SIZE(tombBlock); i++) {
378,939✔
1366
    STombRecord record;
1367
    TAOS_CHECK_RETURN(tTombBlockGet(tombBlock, i, &record));
367,884!
1368

1369
    if (i == 0) {
367,884✔
1370
      tombBlk.minTbid.suid = record.suid;
11,055✔
1371
      tombBlk.minTbid.uid = record.uid;
11,055✔
1372
      tombBlk.minVer = record.version;
11,055✔
1373
      tombBlk.maxVer = record.version;
11,055✔
1374
    }
1375
    if (i == TOMB_BLOCK_SIZE(tombBlock) - 1) {
367,884✔
1376
      tombBlk.maxTbid.suid = record.suid;
11,055✔
1377
      tombBlk.maxTbid.uid = record.uid;
11,055✔
1378
    }
1379
    if (record.version < tombBlk.minVer) {
367,884✔
1380
      tombBlk.minVer = record.version;
2,547✔
1381
    }
1382
    if (record.version > tombBlk.maxVer) {
367,884✔
1383
      tombBlk.maxVer = record.version;
184,890✔
1384
    }
1385
  }
1386

1387
  tsdbWriterUpdVerRange(range, tombBlk.minVer, tombBlk.maxVer);
11,055✔
1388

1389
  for (int32_t i = 0; i < ARRAY_SIZE(tombBlock->buffers); i++) {
66,330✔
1390
    tBufferClear(buffer0);
1391

1392
    SCompressInfo cinfo = {
55,275✔
1393
        .cmprAlg = cmprAlg,
1394
        .dataType = TSDB_DATA_TYPE_BIGINT,
1395
        .originalSize = tombBlock->buffers[i].size,
55,275✔
1396
    };
1397
    TAOS_CHECK_RETURN(tCompressDataToBuffer(tombBlock->buffers[i].data, &cinfo, buffer0, assist));
55,275!
1398
    TAOS_CHECK_RETURN(tsdbWriteFile(fd, *fileSize, buffer0->data, buffer0->size, encryptAlgorithm, encryptKey));
55,275!
1399

1400
    tombBlk.size[i] = cinfo.compressedSize;
55,275✔
1401
    tombBlk.dp->size += tombBlk.size[i];
55,275✔
1402
    *fileSize += tombBlk.size[i];
55,275✔
1403
  }
1404

1405
  TAOS_CHECK_RETURN(TARRAY2_APPEND_PTR(tombBlkArray, &tombBlk));
22,110!
1406

1407
  tTombBlockClear(tombBlock);
11,055✔
1408
  return 0;
11,055✔
1409
}
1410

1411
static int32_t tsdbDataFileWriteHeadFooter(SDataFileWriter *writer) {
3,045✔
1412
  int32_t code = 0;
3,045✔
1413
  int32_t lino = 0;
3,045✔
1414

1415
  int32_t encryptAlgorithm = writer->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
3,045✔
1416
  char   *encryptKey = writer->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
3,045✔
1417

1418
  TAOS_CHECK_GOTO(tsdbFileWriteHeadFooter(writer->fd[TSDB_FTYPE_HEAD], &writer->files[TSDB_FTYPE_HEAD].size,
3,045!
1419
                                          writer->headFooter, encryptAlgorithm, encryptKey),
1420
                  &lino, _exit);
1421

1422
_exit:
3,046✔
1423
  if (code) {
3,046!
1424
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1425
              tstrerror(code));
1426
  }
1427
  return code;
3,046✔
1428
}
1429

1430
static int32_t tsdbDataFileDoWriteTombBlock(SDataFileWriter *writer) {
2,043✔
1431
  if (TOMB_BLOCK_SIZE(writer->tombBlock) == 0) return 0;
2,043!
1432

1433
  int32_t code = 0;
2,043✔
1434
  int32_t lino = 0;
2,043✔
1435

1436
  int32_t encryptAlgorithm = writer->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
2,043✔
1437
  char   *encryptKey = writer->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
2,043✔
1438

1439
  TAOS_CHECK_GOTO(tsdbFileWriteTombBlock(writer->fd[TSDB_FTYPE_TOMB], writer->tombBlock, writer->config->cmprAlg,
2,043!
1440
                                         &writer->files[TSDB_FTYPE_TOMB].size, writer->tombBlkArray, writer->buffers,
1441
                                         &writer->ctx->tombRange, encryptAlgorithm, encryptKey),
1442
                  &lino, _exit);
1443

1444
_exit:
2,043✔
1445
  if (code) {
2,043!
1446
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1447
              tstrerror(code));
1448
  }
1449
  return code;
2,043✔
1450
}
1451

1452
int32_t tsdbFileWriteTombBlk(STsdbFD *fd, const TTombBlkArray *tombBlkArray, SFDataPtr *ptr, int64_t *fileSize,
141,459✔
1453
                             int32_t encryptAlgorithm, char *encryptKey) {
1454
  ptr->size = TARRAY2_DATA_LEN(tombBlkArray);
141,459✔
1455
  if (ptr->size > 0) {
141,459✔
1456
    ptr->offset = *fileSize;
11,055✔
1457

1458
    TAOS_CHECK_RETURN(tsdbWriteFile(fd, *fileSize, (const uint8_t *)TARRAY2_DATA(tombBlkArray), ptr->size,
11,055!
1459
                                    encryptAlgorithm, encryptKey));
1460

1461
    *fileSize += ptr->size;
11,055✔
1462
  }
1463
  return 0;
141,459✔
1464
}
1465

1466
static int32_t tsdbDataFileDoWriteTombBlk(SDataFileWriter *writer) {
2,043✔
1467
  if (TARRAY2_SIZE(writer->tombBlkArray) <= 0) {
2,043!
1468
    return TSDB_CODE_INVALID_PARA;
×
1469
  }
1470

1471
  int32_t code = 0;
2,043✔
1472
  int32_t lino = 0;
2,043✔
1473

1474
  int32_t encryptAlgorithm = writer->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
2,043✔
1475
  char   *encryptKey = writer->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
2,043✔
1476

1477
  TAOS_CHECK_GOTO(
2,043!
1478
      tsdbFileWriteTombBlk(writer->fd[TSDB_FTYPE_TOMB], writer->tombBlkArray, writer->tombFooter->tombBlkPtr,
1479
                           &writer->files[TSDB_FTYPE_TOMB].size, encryptAlgorithm, encryptKey),
1480
      &lino, _exit);
1481

1482
_exit:
2,043✔
1483
  if (code) {
2,043!
1484
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1485
              tstrerror(code));
1486
  }
1487
  return code;
2,043✔
1488
}
1489

1490
int32_t tsdbFileWriteTombFooter(STsdbFD *fd, const STombFooter *footer, int64_t *fileSize, int32_t encryptAlgorithm,
2,043✔
1491
                                char *encryptKey) {
1492
  TAOS_CHECK_RETURN(
2,043!
1493
      tsdbWriteFile(fd, *fileSize, (const uint8_t *)footer, sizeof(*footer), encryptAlgorithm, encryptKey));
1494
  *fileSize += sizeof(*footer);
2,043✔
1495
  return 0;
2,043✔
1496
}
1497

1498
static int32_t tsdbDataFileWriteTombFooter(SDataFileWriter *writer) {
2,043✔
1499
  int32_t code = 0;
2,043✔
1500
  int32_t lino = 0;
2,043✔
1501

1502
  int32_t encryptAlgorithm = writer->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
2,043✔
1503
  char   *encryptKey = writer->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
2,043✔
1504

1505
  TAOS_CHECK_GOTO(tsdbFileWriteTombFooter(writer->fd[TSDB_FTYPE_TOMB], writer->tombFooter,
2,043!
1506
                                          &writer->files[TSDB_FTYPE_TOMB].size, encryptAlgorithm, encryptKey),
1507
                  &lino, _exit);
1508

1509
_exit:
2,043✔
1510
  if (code) {
2,043!
1511
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1512
              tstrerror(code));
1513
  }
1514
  return code;
2,043✔
1515
}
1516

1517
static int32_t tsdbDataFileDoWriteTombRecord(SDataFileWriter *writer, const STombRecord *record) {
11,280✔
1518
  int32_t code = 0;
11,280✔
1519
  int32_t lino = 0;
11,280✔
1520

1521
  while (writer->ctx->hasOldTomb) {
12,387✔
1522
    for (; writer->ctx->tombBlockIdx < TOMB_BLOCK_SIZE(writer->ctx->tombBlock); writer->ctx->tombBlockIdx++) {
291,008✔
1523
      STombRecord record1[1];
1524
      TAOS_CHECK_GOTO(tTombBlockGet(writer->ctx->tombBlock, writer->ctx->tombBlockIdx, record1), &lino, _exit);
288,794!
1525

1526
      int32_t c = tTombRecordCompare(record, record1);
288,794✔
1527
      if (c < 0) {
288,794✔
1528
        goto _write;
2,367✔
1529
      } else if (c > 0) {
286,427!
1530
        TAOS_CHECK_GOTO(tTombBlockPut(writer->tombBlock, record1), &lino, _exit);
286,427!
1531

1532
        tsdbTrace("vgId:%d write tomb record to tomb file:%s, cid:%" PRId64 ", suid:%" PRId64 ", uid:%" PRId64
286,427!
1533
                  ", version:%" PRId64,
1534
                  TD_VID(writer->config->tsdb->pVnode), writer->fd[TSDB_FTYPE_TOMB]->path, writer->config->cid,
1535
                  record1->suid, record1->uid, record1->version);
1536

1537
        if (TOMB_BLOCK_SIZE(writer->tombBlock) >= writer->config->maxRow) {
286,427!
1538
          TAOS_CHECK_GOTO(tsdbDataFileDoWriteTombBlock(writer), &lino, _exit);
×
1539
        }
1540
      } else {
1541
        tsdbError("vgId:%d duplicate tomb record, cid:%" PRId64 ", suid:%" PRId64 ", uid:%" PRId64 ", version:%" PRId64,
×
1542
                  TD_VID(writer->config->tsdb->pVnode), writer->config->cid, record->suid, record->uid,
1543
                  record->version);
1544
      }
1545
    }
1546

1547
    if (writer->ctx->tombBlkArrayIdx >= TARRAY2_SIZE(writer->ctx->tombBlkArray)) {
2,214✔
1548
      writer->ctx->hasOldTomb = false;
1,107✔
1549
      break;
1,107✔
1550
    } else {
1551
      const STombBlk *tombBlk = TARRAY2_GET_PTR(writer->ctx->tombBlkArray, writer->ctx->tombBlkArrayIdx);
1,107✔
1552

1553
      TAOS_CHECK_GOTO(tsdbDataFileReadTombBlock(writer->ctx->reader, tombBlk, writer->ctx->tombBlock), &lino, _exit);
1,107!
1554

1555
      writer->ctx->tombBlockIdx = 0;
1,107✔
1556
      writer->ctx->tombBlkArrayIdx++;
1,107✔
1557
    }
1558
  }
1559

1560
_write:
7,806✔
1561
  if (record->suid == INT64_MAX) {
11,280✔
1562
    goto _exit;
2,043✔
1563
  }
1564

1565
  TAOS_CHECK_GOTO(tTombBlockPut(writer->tombBlock, record), &lino, _exit);
9,237!
1566

1567
  tsdbTrace("vgId:%d write tomb record to tomb file:%s, cid:%" PRId64 ", suid:%" PRId64 ", uid:%" PRId64
9,237!
1568
            ", version:%" PRId64,
1569
            TD_VID(writer->config->tsdb->pVnode), writer->fd[TSDB_FTYPE_TOMB]->path, writer->config->cid, record->suid,
1570
            record->uid, record->version);
1571

1572
  if (TOMB_BLOCK_SIZE(writer->tombBlock) >= writer->config->maxRow) {
9,237!
1573
    TAOS_CHECK_GOTO(tsdbDataFileDoWriteTombBlock(writer), &lino, _exit);
×
1574
  }
1575

1576
_exit:
9,237✔
1577
  if (code) {
11,280!
1578
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1579
              tstrerror(code));
1580
  }
1581
  return code;
11,280✔
1582
}
1583

1584
int32_t tsdbFileWriteBrinBlk(STsdbFD *fd, TBrinBlkArray *brinBlkArray, SFDataPtr *ptr, int64_t *fileSize,
3,046✔
1585
                             int32_t encryptAlgorithm, char *encryptKey) {
1586
  if (TARRAY2_SIZE(brinBlkArray) <= 0) {
3,046!
1587
    return TSDB_CODE_INVALID_PARA;
×
1588
  }
1589
  ptr->offset = *fileSize;
3,046✔
1590
  ptr->size = TARRAY2_DATA_LEN(brinBlkArray);
3,046✔
1591

1592
  TAOS_CHECK_RETURN(
3,046!
1593
      tsdbWriteFile(fd, ptr->offset, (uint8_t *)TARRAY2_DATA(brinBlkArray), ptr->size, encryptAlgorithm, encryptKey));
1594

1595
  *fileSize += ptr->size;
3,046✔
1596
  return 0;
3,046✔
1597
}
1598

1599
static int32_t tsdbDataFileWriteBrinBlk(SDataFileWriter *writer) {
3,046✔
1600
  int32_t code = 0;
3,046✔
1601
  int32_t lino = 0;
3,046✔
1602

1603
  int32_t encryptAlgorithm = writer->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
3,046✔
1604
  char   *encryptKey = writer->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
3,046✔
1605

1606
  TAOS_CHECK_GOTO(
3,046!
1607
      tsdbFileWriteBrinBlk(writer->fd[TSDB_FTYPE_HEAD], writer->brinBlkArray, writer->headFooter->brinBlkPtr,
1608
                           &writer->files[TSDB_FTYPE_HEAD].size, encryptAlgorithm, encryptKey),
1609
      &lino, _exit);
1610

1611
_exit:
3,046✔
1612
  if (code) {
3,046!
1613
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1614
              tstrerror(code));
1615
  }
1616
  return code;
3,046✔
1617
}
1618

1619
void tsdbTFileUpdVerRange(STFile *f, SVersionRange range) {
155,849✔
1620
  f->minVer = TMIN(f->minVer, range.minVer);
155,849✔
1621
  f->maxVer = TMAX(f->maxVer, range.maxVer);
155,849✔
1622
}
155,849✔
1623

1624
static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer, TFileOpArray *opArr) {
4,038✔
1625
  int32_t code = 0;
4,038✔
1626
  int32_t lino = 0;
4,038✔
1627

1628
  int32_t  ftype;
1629
  STFileOp op;
1630

1631
  if (writer->fd[TSDB_FTYPE_HEAD]) {
4,038✔
1632
    TABLEID tbid[1] = {{
3,046✔
1633
        .suid = INT64_MAX,
1634
        .uid = INT64_MAX,
1635
    }};
1636

1637
    TAOS_CHECK_GOTO(tsdbDataFileWriteTableDataEnd(writer), &lino, _exit);
3,046!
1638
    TAOS_CHECK_GOTO(tsdbDataFileWriteTableDataBegin(writer, tbid), &lino, _exit);
3,046!
1639
    TAOS_CHECK_GOTO(tsdbDataFileWriteBrinBlock(writer), &lino, _exit);
3,046!
1640
    TAOS_CHECK_GOTO(tsdbDataFileWriteBrinBlk(writer), &lino, _exit);
3,046!
1641
    TAOS_CHECK_GOTO(tsdbDataFileWriteHeadFooter(writer), &lino, _exit);
3,045!
1642

1643
    SVersionRange ofRange = {.minVer = VERSION_MAX, .maxVer = VERSION_MIN};
3,045✔
1644

1645
    // .head
1646
    ftype = TSDB_FTYPE_HEAD;
3,045✔
1647
    if (writer->config->files[ftype].exist) {
3,045✔
1648
      op = (STFileOp){
790✔
1649
          .optype = TSDB_FOP_REMOVE,
1650
          .fid = writer->config->fid,
790✔
1651
          .of = writer->config->files[ftype].file,
790✔
1652
      };
1653
      ofRange = (SVersionRange){.minVer = op.of.minVer, .maxVer = op.of.maxVer};
790✔
1654
      TAOS_CHECK_GOTO(TARRAY2_APPEND(opArr, op), &lino, _exit);
1,580!
1655
    }
1656
    op = (STFileOp){
3,045✔
1657
        .optype = TSDB_FOP_CREATE,
1658
        .fid = writer->config->fid,
3,045✔
1659
        .nf = writer->files[ftype],
3,045✔
1660
    };
1661
    tsdbTFileUpdVerRange(&op.nf, ofRange);
3,045✔
1662
    tsdbTFileUpdVerRange(&op.nf, writer->ctx->range);
3,046✔
1663
    TAOS_CHECK_GOTO(TARRAY2_APPEND(opArr, op), &lino, _exit);
6,092!
1664

1665
    // .data
1666
    ftype = TSDB_FTYPE_DATA;
3,046✔
1667
    if (!writer->config->files[ftype].exist) {
3,046✔
1668
      op = (STFileOp){
2,256✔
1669
          .optype = TSDB_FOP_CREATE,
1670
          .fid = writer->config->fid,
2,256✔
1671
          .nf = writer->files[ftype],
2,256✔
1672
      };
1673
      tsdbTFileUpdVerRange(&op.nf, writer->ctx->range);
2,256✔
1674
      TAOS_CHECK_GOTO(TARRAY2_APPEND(opArr, op), &lino, _exit);
4,512!
1675
    } else if (writer->config->files[ftype].file.size != writer->files[ftype].size) {
790!
1676
      op = (STFileOp){
790✔
1677
          .optype = TSDB_FOP_MODIFY,
1678
          .fid = writer->config->fid,
790✔
1679
          .of = writer->config->files[ftype].file,
790✔
1680
          .nf = writer->files[ftype],
790✔
1681
      };
1682
      tsdbTFileUpdVerRange(&op.nf, writer->ctx->range);
790✔
1683
      TAOS_CHECK_GOTO(TARRAY2_APPEND(opArr, op), &lino, _exit);
1,580!
1684
    }
1685

1686
    // .sma
1687
    ftype = TSDB_FTYPE_SMA;
3,046✔
1688
    if (!writer->config->files[ftype].exist) {
3,046✔
1689
      op = (STFileOp){
2,256✔
1690
          .optype = TSDB_FOP_CREATE,
1691
          .fid = writer->config->fid,
2,256✔
1692
          .nf = writer->files[ftype],
2,256✔
1693
      };
1694
      tsdbTFileUpdVerRange(&op.nf, writer->ctx->range);
2,256✔
1695
      TAOS_CHECK_GOTO(TARRAY2_APPEND(opArr, op), &lino, _exit);
4,512!
1696
    } else if (writer->config->files[ftype].file.size != writer->files[ftype].size) {
790✔
1697
      op = (STFileOp){
787✔
1698
          .optype = TSDB_FOP_MODIFY,
1699
          .fid = writer->config->fid,
787✔
1700
          .of = writer->config->files[ftype].file,
787✔
1701
          .nf = writer->files[ftype],
787✔
1702
      };
1703
      tsdbTFileUpdVerRange(&op.nf, writer->ctx->range);
787✔
1704
      TAOS_CHECK_GOTO(TARRAY2_APPEND(opArr, op), &lino, _exit);
1,574!
1705
    }
1706
  }
1707

1708
  if (writer->fd[TSDB_FTYPE_TOMB]) {
4,038✔
1709
    STombRecord record[1] = {{
2,043✔
1710
        .suid = INT64_MAX,
1711
        .uid = INT64_MAX,
1712
        .version = INT64_MAX,
1713
    }};
1714

1715
    TAOS_CHECK_GOTO(tsdbDataFileDoWriteTombRecord(writer, record), &lino, _exit);
2,043!
1716
    TAOS_CHECK_GOTO(tsdbDataFileDoWriteTombBlock(writer), &lino, _exit);
2,043!
1717
    TAOS_CHECK_GOTO(tsdbDataFileDoWriteTombBlk(writer), &lino, _exit);
2,043!
1718
    TAOS_CHECK_GOTO(tsdbDataFileWriteTombFooter(writer), &lino, _exit);
2,043!
1719

1720
    SVersionRange ofRange = (SVersionRange){.minVer = VERSION_MAX, .maxVer = VERSION_MIN};
2,043✔
1721

1722
    ftype = TSDB_FTYPE_TOMB;
2,043✔
1723
    if (writer->config->files[ftype].exist) {
2,043✔
1724
      op = (STFileOp){
1,107✔
1725
          .optype = TSDB_FOP_REMOVE,
1726
          .fid = writer->config->fid,
1,107✔
1727
          .of = writer->config->files[ftype].file,
1,107✔
1728
      };
1729
      ofRange = (SVersionRange){.minVer = op.of.minVer, .maxVer = op.of.maxVer};
1,107✔
1730
      TAOS_CHECK_GOTO(TARRAY2_APPEND(opArr, op), &lino, _exit);
2,214!
1731
    }
1732
    op = (STFileOp){
2,043✔
1733
        .optype = TSDB_FOP_CREATE,
1734
        .fid = writer->config->fid,
2,043✔
1735
        .nf = writer->files[ftype],
2,043✔
1736
    };
1737
    tsdbTFileUpdVerRange(&op.nf, ofRange);
2,043✔
1738
    tsdbTFileUpdVerRange(&op.nf, writer->ctx->tombRange);
2,043✔
1739
    TAOS_CHECK_GOTO(TARRAY2_APPEND(opArr, op), &lino, _exit);
4,086!
1740
  }
1741
  int32_t encryptAlgorithm = writer->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
4,038✔
1742
  char   *encryptKey = writer->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
4,038✔
1743
  for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) {
20,190✔
1744
    if (writer->fd[i]) {
16,151✔
1745
      TAOS_CHECK_GOTO(tsdbFsyncFile(writer->fd[i], encryptAlgorithm, encryptKey), &lino, _exit);
11,181!
1746
      tsdbCloseFile(&writer->fd[i]);
11,181✔
1747
    }
1748
  }
1749

1750
_exit:
4,039✔
1751
  if (code) {
4,039!
1752
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1753
              tstrerror(code));
1754
  }
1755
  return code;
4,038✔
1756
}
1757

1758
static int32_t tsdbDataFileWriterOpenDataFD(SDataFileWriter *writer) {
3,045✔
1759
  int32_t code = 0;
3,045✔
1760
  int32_t lino = 0;
3,045✔
1761

1762
  int32_t ftypes[] = {TSDB_FTYPE_HEAD, TSDB_FTYPE_DATA, TSDB_FTYPE_SMA};
3,045✔
1763

1764
  for (int32_t i = 0; i < ARRAY_SIZE(ftypes); ++i) {
12,183✔
1765
    int32_t ftype = ftypes[i];
9,137✔
1766

1767
    char    fname[TSDB_FILENAME_LEN];
1768
    int32_t flag = TD_FILE_READ | TD_FILE_WRITE;
9,137✔
1769

1770
    if (writer->files[ftype].size == 0) {
9,137✔
1771
      flag |= (TD_FILE_CREATE | TD_FILE_TRUNC);
7,557✔
1772
    }
1773

1774
    int32_t lcn = writer->files[ftype].lcn;
9,137✔
1775
    tsdbTFileName(writer->config->tsdb, &writer->files[ftype], fname);
9,137✔
1776
    TAOS_CHECK_GOTO(tsdbOpenFile(fname, writer->config->tsdb, flag, &writer->fd[ftype], lcn), &lino, _exit);
9,138!
1777

1778
    if (writer->files[ftype].size == 0) {
9,139✔
1779
      uint8_t hdr[TSDB_FHDR_SIZE] = {0};
7,558✔
1780

1781
      int32_t encryptAlgorithm = writer->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
7,558✔
1782
      char   *encryptKey = writer->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
7,558✔
1783

1784
      TAOS_CHECK_GOTO(tsdbWriteFile(writer->fd[ftype], 0, hdr, TSDB_FHDR_SIZE, encryptAlgorithm, encryptKey), &lino,
7,558!
1785
                      _exit);
1786

1787
      writer->files[ftype].size += TSDB_FHDR_SIZE;
7,557✔
1788
    }
1789
  }
1790

1791
  if (writer->ctx->reader) {
3,046✔
1792
    TAOS_CHECK_GOTO(tsdbDataFileReadBrinBlk(writer->ctx->reader, &writer->ctx->brinBlkArray), &lino, _exit);
790!
1793
  }
1794

1795
_exit:
3,046✔
1796
  if (code) {
3,046!
1797
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1798
              tstrerror(code));
1799
  }
1800
  return code;
3,046✔
1801
}
1802

1803
int32_t tsdbDataFileWriterOpen(const SDataFileWriterConfig *config, SDataFileWriter **writer) {
7,946✔
1804
  writer[0] = taosMemoryCalloc(1, sizeof(*writer[0]));
7,946!
1805
  if (!writer[0]) {
7,946!
1806
    return terrno;
×
1807
  }
1808

1809
  writer[0]->config[0] = config[0];
7,946✔
1810
  return 0;
7,946✔
1811
}
1812

1813
int32_t tsdbDataFileWriterClose(SDataFileWriter **writer, bool abort, TFileOpArray *opArr) {
7,947✔
1814
  if (writer[0] == NULL) return 0;
7,947!
1815

1816
  int32_t code = 0;
7,947✔
1817
  int32_t lino = 0;
7,947✔
1818

1819
  if (writer[0]->ctx->opened) {
7,947✔
1820
    if (abort) {
4,038!
1821
      TAOS_CHECK_GOTO(tsdbDataFileWriterCloseAbort(writer[0]), &lino, _exit);
×
1822
    } else {
1823
      TAOS_CHECK_GOTO(tsdbDataFileWriterCloseCommit(writer[0], opArr), &lino, _exit);
4,038!
1824
    }
1825
    tsdbDataFileWriterDoClose(writer[0]);
4,038✔
1826
  }
1827
  taosMemoryFree(writer[0]);
7,947!
1828
  writer[0] = NULL;
7,946✔
1829

1830
_exit:
7,946✔
1831
  if (code) {
7,946!
1832
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer[0]->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1833
              tstrerror(code));
1834
  }
1835
  return code;
7,946✔
1836
}
1837

1838
int32_t tsdbDataFileWriteRow(SDataFileWriter *writer, SRowInfo *row) {
111,910,887✔
1839
  int32_t code = 0;
111,910,887✔
1840
  int32_t lino = 0;
111,910,887✔
1841

1842
  if (!writer->ctx->opened) {
111,910,887✔
1843
    TAOS_CHECK_GOTO(tsdbDataFileWriterDoOpen(writer), &lino, _exit);
59!
1844
  }
1845

1846
  if (writer->fd[TSDB_FTYPE_HEAD] == NULL) {
111,910,887✔
1847
    TAOS_CHECK_GOTO(tsdbDataFileWriterOpenDataFD(writer), &lino, _exit);
59!
1848
  }
1849

1850
  if (row->uid != writer->ctx->tbid->uid) {
111,910,887✔
1851
    TAOS_CHECK_GOTO(tsdbDataFileWriteTableDataEnd(writer), &lino, _exit);
7,168!
1852
    TAOS_CHECK_GOTO(tsdbDataFileWriteTableDataBegin(writer, (TABLEID *)row), &lino, _exit);
7,168!
1853
  }
1854

1855
  TAOS_CHECK_GOTO(tsdbDataFileDoWriteTSData(writer, &row->row), &lino, _exit);
111,910,887!
1856

1857
_exit:
111,919,234✔
1858
  if (code) {
111,919,234!
1859
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1860
              tstrerror(code));
1861
  }
1862
  return code;
111,920,262✔
1863
}
1864

1865
int32_t tsdbDataFileWriteBlockData(SDataFileWriter *writer, SBlockData *bData) {
160,602✔
1866
  if (bData->nRow == 0) {
160,602!
1867
    return 0;
×
1868
  }
1869

1870
  int32_t code = 0;
160,602✔
1871
  int32_t lino = 0;
160,602✔
1872

1873
  if (!bData->uid) {
160,602!
1874
    return TSDB_CODE_INVALID_PARA;
×
1875
  }
1876

1877
  if (!writer->ctx->opened) {
160,602✔
1878
    TAOS_CHECK_GOTO(tsdbDataFileWriterDoOpen(writer), &lino, _exit);
2,852!
1879
  }
1880

1881
  if (writer->fd[TSDB_FTYPE_DATA] == NULL) {
160,601✔
1882
    TAOS_CHECK_GOTO(tsdbDataFileWriterOpenDataFD(writer), &lino, _exit);
2,986!
1883
  }
1884

1885
  if (bData->uid != writer->ctx->tbid->uid) {
160,602✔
1886
    TAOS_CHECK_GOTO(tsdbDataFileWriteTableDataEnd(writer), &lino, _exit);
55,250!
1887
    TAOS_CHECK_GOTO(tsdbDataFileWriteTableDataBegin(writer, (TABLEID *)bData), &lino, _exit);
55,250!
1888
  }
1889

1890
  if (writer->ctx->tbHasOldData) {
160,601✔
1891
    STsdbRowKey key;
1892

1893
    tsdbRowGetKey(&tsdbRowFromBlockData(bData, 0), &key);
7,045✔
1894
    TAOS_CHECK_GOTO(tsdbDataFileDoWriteTableOldData(writer, &key), &lino, _exit);
7,045!
1895
  }
1896

1897
  if (!writer->ctx->tbHasOldData       //
160,601✔
1898
      && writer->blockData->nRow == 0  //
153,919!
1899
  ) {
1900
    TAOS_CHECK_GOTO(tsdbDataFileDoWriteBlockData(writer, bData), &lino, _exit);
153,919!
1901

1902
  } else {
1903
    for (int32_t i = 0; i < bData->nRow; ++i) {
736,289✔
1904
      TSDBROW row[1] = {tsdbRowFromBlockData(bData, i)};
729,607✔
1905
      TAOS_CHECK_GOTO(tsdbDataFileDoWriteTSData(writer, row), &lino, _exit);
729,607!
1906
    }
1907
  }
1908

1909
_exit:
6,682✔
1910
  if (code) {
160,602!
1911
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1912
              tstrerror(code));
1913
  }
1914
  return code;
160,601✔
1915
}
1916

1917
int32_t tsdbDataFileFlush(SDataFileWriter *writer) {
19,928✔
1918
  if (!writer->ctx->opened) {
19,928!
1919
    return TSDB_CODE_INVALID_PARA;
×
1920
  }
1921

1922
  if (writer->blockData->nRow == 0) return 0;
19,928!
1923
  if (writer->ctx->tbHasOldData) return 0;
19,928✔
1924

1925
  return tsdbDataFileDoWriteBlockData(writer, writer->blockData);
19,924✔
1926
}
1927

1928
static int32_t tsdbDataFileWriterOpenTombFD(SDataFileWriter *writer) {
2,042✔
1929
  int32_t code = 0;
2,042✔
1930
  int32_t lino = 0;
2,042✔
1931

1932
  char    fname[TSDB_FILENAME_LEN];
1933
  int32_t ftype = TSDB_FTYPE_TOMB;
2,042✔
1934

1935
  int32_t flag = (TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
2,042✔
1936

1937
  int32_t lcn = writer->files[ftype].lcn;
2,042✔
1938
  tsdbTFileName(writer->config->tsdb, writer->files + ftype, fname);
2,042✔
1939

1940
  TAOS_CHECK_GOTO(tsdbOpenFile(fname, writer->config->tsdb, flag, &writer->fd[ftype], lcn), &lino, _exit);
2,043!
1941

1942
  uint8_t hdr[TSDB_FHDR_SIZE] = {0};
2,043✔
1943
  int32_t encryptAlgorithm = writer->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
2,043✔
1944
  char   *encryptKey = writer->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
2,043✔
1945

1946
  TAOS_CHECK_GOTO(tsdbWriteFile(writer->fd[ftype], 0, hdr, TSDB_FHDR_SIZE, encryptAlgorithm, encryptKey), &lino, _exit);
2,043!
1947
  writer->files[ftype].size += TSDB_FHDR_SIZE;
2,043✔
1948

1949
  if (writer->ctx->reader) {
2,043✔
1950
    TAOS_CHECK_GOTO(tsdbDataFileReadTombBlk(writer->ctx->reader, &writer->ctx->tombBlkArray), &lino, _exit);
1,405!
1951

1952
    if (TARRAY2_SIZE(writer->ctx->tombBlkArray) > 0) {
1,405✔
1953
      writer->ctx->hasOldTomb = true;
1,107✔
1954
    }
1955

1956
    writer->ctx->tombBlkArrayIdx = 0;
1,405✔
1957
    tTombBlockClear(writer->ctx->tombBlock);
1,405✔
1958
    writer->ctx->tombBlockIdx = 0;
1,405✔
1959
  }
1960

1961
_exit:
638✔
1962
  if (code) {
2,043!
1963
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1964
              tstrerror(code));
1965
  }
1966
  return code;
2,043✔
1967
}
1968

1969
int32_t tsdbDataFileWriteTombRecord(SDataFileWriter *writer, const STombRecord *record) {
9,236✔
1970
  int32_t code = 0;
9,236✔
1971
  int32_t lino = 0;
9,236✔
1972

1973
  if (!writer->ctx->opened) {
9,236✔
1974
    TAOS_CHECK_GOTO(tsdbDataFileWriterDoOpen(writer), &lino, _exit);
1,126!
1975
  }
1976

1977
  if (writer->fd[TSDB_FTYPE_TOMB] == NULL) {
9,236✔
1978
    TAOS_CHECK_GOTO(tsdbDataFileWriterOpenTombFD(writer), &lino, _exit);
2,042!
1979
  }
1980

1981
  TAOS_CHECK_GOTO(tsdbDataFileDoWriteTombRecord(writer, record), &lino, _exit);
9,237!
1982

1983
_exit:
9,237✔
1984
  if (code) {
9,237!
1985
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1986
              tstrerror(code));
1987
  }
1988
  return code;
9,237✔
1989
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc