• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5032

24 Apr 2026 11:25AM UTC coverage: 73.076% (+0.2%) from 72.876%
#5032

push

travis-ci

web-flow
merge: from main to 3.0 branch #35224

merge: from main to 3.0 branch[manual-only]

1344 of 1975 new or added lines in 48 files covered. (68.05%)

527 existing lines in 138 files now uncovered.

275965 of 377640 relevant lines covered (73.08%)

132797765.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.52
/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tsdbDataFileRW.h"
17
#include "meta.h"
18

19
// SDataFileReader =============================================
20
struct SDataFileReader {
21
  SDataFileReaderConfig config[1];
22

23
  SBuffer  local[10];
24
  SBuffer *buffers;
25

26
  struct {
27
    bool headFooterLoaded;
28
    bool tombFooterLoaded;
29
    bool brinBlkLoaded;
30
    bool tombBlkLoaded;
31
  } ctx[1];
32

33
  STsdbFD *fd[TSDB_FTYPE_MAX];
34

35
  SHeadFooter   headFooter[1];
36
  STombFooter   tombFooter[1];
37
  TBrinBlkArray brinBlkArray[1];
38
  TTombBlkArray tombBlkArray[1];
39
};
40

41
static int32_t tsdbDataFileReadHeadFooter(SDataFileReader *reader) {
27,854,300✔
42
  if (reader->ctx->headFooterLoaded) {
27,854,300✔
43
    return 0;
×
44
  }
45

46
  int32_t code = 0;
27,859,453✔
47
  int32_t lino = 0;
27,859,453✔
48

49
  int32_t ftype = TSDB_FTYPE_HEAD;
27,852,826✔
50
  if (reader->fd[ftype]) {
27,852,826✔
51
    SEncryptData *pEncryptData = &(reader->config->tsdb->pVnode->config.tsdbCfg.encryptData);
24,898,985✔
52

53
#if 1
54
    TAOS_CHECK_GOTO(tsdbReadFile(reader->fd[ftype], reader->config->files[ftype].file.size - sizeof(SHeadFooter),
24,905,012✔
55
                                 (uint8_t *)reader->headFooter, sizeof(SHeadFooter), 0, pEncryptData),
56
                    &lino, _exit);
57
#else
58
    int64_t size = reader->config->files[ftype].file.size;
59
    for (; size > TSDB_FHDR_SIZE; size--) {
60
      code = tsdbReadFile(reader->fd[ftype], size - sizeof(SHeadFooter), (uint8_t *)reader->headFooter,
61
                          sizeof(SHeadFooter), 0, encryptAlgorithm, encryptKey);
62
      if (code) continue;
63
      if (reader->headFooter->brinBlkPtr->offset + reader->headFooter->brinBlkPtr->size + sizeof(SHeadFooter) == size) {
64
        break;
65
      }
66
    }
67
    if (size <= TSDB_FHDR_SIZE) {
68
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
69
    }
70
#endif
71
  }
72

73
  reader->ctx->headFooterLoaded = true;
27,852,516✔
74

75
_exit:
27,861,244✔
76
  if (code) {
27,860,420✔
77
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, lino,
×
78
              tstrerror(code));
79
  }
80
  return code;
27,852,399✔
81
}
82

83
static int32_t tsdbDataFileReadTombFooter(SDataFileReader *reader) {
3,215,889✔
84
  if (reader->ctx->tombFooterLoaded) {
3,215,889✔
85
    return 0;
×
86
  }
87

88
  int32_t code = 0;
3,215,889✔
89
  int32_t lino = 0;
3,215,889✔
90

91
  int32_t ftype = TSDB_FTYPE_TOMB;
3,215,889✔
92
  if (reader->fd[ftype]) {
3,215,889✔
93
    SEncryptData *pEncryptData = &(reader->config->tsdb->pVnode->config.tsdbCfg.encryptData);
3,058,457✔
94

95
    TAOS_CHECK_GOTO(tsdbReadFile(reader->fd[ftype], reader->config->files[ftype].file.size - sizeof(STombFooter),
3,058,457✔
96
                                 (uint8_t *)reader->tombFooter, sizeof(STombFooter), 0, pEncryptData),
97
                    &lino, _exit);
98
  }
99
  reader->ctx->tombFooterLoaded = true;
3,215,889✔
100

101
_exit:
3,215,889✔
102
  if (code) {
3,215,889✔
103
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, lino,
×
104
              tstrerror(code));
105
  }
106
  return code;
3,215,889✔
107
}
108

109
int32_t tsdbDataFileReaderOpen(const char *fname[], const SDataFileReaderConfig *config, SDataFileReader **reader) {
28,107,187✔
110
  int32_t code = 0;
28,107,187✔
111
  int32_t lino = 0;
28,107,187✔
112

113
  if ((*reader = taosMemoryCalloc(1, sizeof(**reader))) == NULL) {
28,111,100✔
114
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
115
  }
116

117
  for (int32_t i = 0; i < ARRAY_SIZE(reader[0]->local); i++) {
308,991,037✔
118
    tBufferInit(reader[0]->local + i);
280,824,231✔
119
  }
120

121
  reader[0]->config[0] = config[0];
28,166,806✔
122
  reader[0]->buffers = config->buffers;
28,113,805✔
123
  if (reader[0]->buffers == NULL) {
28,121,816✔
124
    reader[0]->buffers = reader[0]->local;
27,711,187✔
125
  }
126

127
  if (fname) {
28,121,976✔
128
    for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) {
137,968,004✔
129
      if (fname[i]) {
110,335,019✔
130
        int32_t lcn = config->files[i].file.lcn;
77,086,135✔
131
        TAOS_CHECK_GOTO(tsdbOpenFile(fname[i], config->tsdb, TD_FILE_READ, &reader[0]->fd[i], lcn), &lino, _exit);
77,087,729✔
132
      }
133
    }
134
  } else {
135
    for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) {
2,643,263✔
136
      if (config->files[i].exist) {
2,114,745✔
137
        char fname1[TSDB_FILENAME_LEN];
1,089,503✔
138
        tsdbTFileName(config->tsdb, &config->files[i].file, fname1);
1,089,503✔
139
        int32_t lcn = config->files[i].file.lcn;
1,089,503✔
140
        TAOS_CHECK_GOTO(tsdbOpenFile(fname1, config->tsdb, TD_FILE_READ, &reader[0]->fd[i], lcn), &lino, _exit);
1,089,503✔
141
      }
142
    }
143
  }
144

145
_exit:
28,164,195✔
146
  if (code) {
28,122,934✔
147
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(config->tsdb->pVnode), __func__, __FILE__, lino,
×
148
              tstrerror(code));
149
  }
150
  return code;
28,122,934✔
151
}
152

153
void tsdbDataFileReaderClose(SDataFileReader **reader) {
48,299,433✔
154
  if (reader[0] == NULL) {
48,299,433✔
155
    return;
20,193,876✔
156
  }
157

158
  TARRAY2_DESTROY(reader[0]->tombBlkArray, NULL);
28,113,918✔
159
  TARRAY2_DESTROY(reader[0]->brinBlkArray, NULL);
28,120,095✔
160

161
  for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) {
140,602,209✔
162
    if (reader[0]->fd[i]) {
112,474,273✔
163
      tsdbCloseFile(&reader[0]->fd[i]);
78,177,616✔
164
    }
165
  }
166

167
  for (int32_t i = 0; i < ARRAY_SIZE(reader[0]->local); ++i) {
309,231,231✔
168
    tBufferDestroy(reader[0]->local + i);
281,102,436✔
169
  }
170

171
  taosMemoryFree(reader[0]);
28,128,795✔
172
  reader[0] = NULL;
28,119,278✔
173
}
174

175
int32_t tsdbDataFileReadBrinBlk(SDataFileReader *reader, const TBrinBlkArray **brinBlkArray) {
27,856,398✔
176
  int32_t code = 0;
27,856,398✔
177
  int32_t lino = 0;
27,856,398✔
178
  void   *data = NULL;
27,858,956✔
179

180
  if (!reader->ctx->brinBlkLoaded) {
27,858,956✔
181
    TAOS_CHECK_GOTO(tsdbDataFileReadHeadFooter(reader), &lino, _exit);
27,851,749✔
182

183
    if (reader->headFooter->brinBlkPtr->size > 0) {
27,852,569✔
184
      data = taosMemoryMalloc(reader->headFooter->brinBlkPtr->size);
24,895,800✔
185
      if (data == NULL) {
24,890,927✔
186
        TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
187
      }
188

189
      SEncryptData *pEncryptData = &(reader->config->tsdb->pVnode->config.tsdbCfg.encryptData);
24,890,927✔
190

191
      TAOS_CHECK_GOTO(tsdbReadFile(reader->fd[TSDB_FTYPE_HEAD], reader->headFooter->brinBlkPtr->offset, data,
24,903,464✔
192
                                   reader->headFooter->brinBlkPtr->size, 0, pEncryptData),
193
                      &lino, _exit);
194

195
      int32_t size = reader->headFooter->brinBlkPtr->size / sizeof(SBrinBlk);
24,908,961✔
196
      TARRAY2_INIT_EX(reader->brinBlkArray, size, size, data);
24,901,090✔
197
    } else {
198
      TARRAY2_INIT(reader->brinBlkArray);
2,953,077✔
199
    }
200

201
    reader->ctx->brinBlkLoaded = true;
27,850,458✔
202
  }
203
  brinBlkArray[0] = reader->brinBlkArray;
27,860,830✔
204

205
_exit:
27,853,787✔
206
  if (code) {
27,854,574✔
207
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, lino,
×
208
              tstrerror(code));
209
    taosMemoryFree(data);
×
210
  }
211
  return code;
27,854,574✔
212
}
213

214
int32_t tsdbDataFileReadBrinBlock(SDataFileReader *reader, const SBrinBlk *brinBlk, SBrinBlock *brinBlock) {
23,206,903✔
215
  int32_t code = 0;
23,206,903✔
216
  int32_t lino = 0;
23,206,903✔
217

218
  SBuffer *buffer = reader->buffers + 0;
23,209,461✔
219
  SBuffer *assist = reader->buffers + 1;
23,208,974✔
220

221
  SEncryptData *pEncryptData = &(reader->config->tsdb->pVnode->config.tsdbCfg.encryptData);
23,200,213✔
222

223
  // load data
224
  tBufferClear(buffer);
225
  TAOS_CHECK_GOTO(tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_HEAD], brinBlk->dp->offset, brinBlk->dp->size, buffer, 0,
23,193,589✔
226
                                       pEncryptData),
227
                  &lino, _exit);
228

229
  // decode brin block
230
  SBufferReader br = BUFFER_READER_INITIALIZER(0, buffer);
23,214,668✔
231
  tBrinBlockClear(brinBlock);
23,215,305✔
232
  brinBlock->numOfPKs = brinBlk->numOfPKs;
23,219,944✔
233
  brinBlock->numOfRecords = brinBlk->numRec;
23,214,471✔
234
  for (int32_t i = 0; i < 10; i++) {  // int64_t
255,358,451✔
235

236
    SCompressInfo cinfo = {
232,138,347✔
237
        .cmprAlg = brinBlk->cmprAlg,
232,131,144✔
238
        .dataType = TSDB_DATA_TYPE_BIGINT,
239
        .compressedSize = brinBlk->size[i],
232,141,721✔
240
        .originalSize = brinBlk->numRec * sizeof(int64_t),
232,141,085✔
241
    };
242
    TAOS_CHECK_GOTO(tDecompressDataToBuffer(BR_PTR(&br), &cinfo, brinBlock->buffers + i, assist), &lino, _exit);
232,141,411✔
243
    br.offset += brinBlk->size[i];
232,155,399✔
244
  }
245

246
  for (int32_t i = 10; i < 15; i++) {  // int32_t
139,313,727✔
247
    SCompressInfo cinfo = {
116,090,101✔
248
        .cmprAlg = brinBlk->cmprAlg,
116,090,738✔
249
        .dataType = TSDB_DATA_TYPE_INT,
250
        .compressedSize = brinBlk->size[i],
116,089,117✔
251
        .originalSize = brinBlk->numRec * sizeof(int32_t),
116,094,430✔
252
    };
253
    TAOS_CHECK_GOTO(tDecompressDataToBuffer(BR_PTR(&br), &cinfo, brinBlock->buffers + i, assist), &lino, _exit);
116,093,964✔
254
    br.offset += brinBlk->size[i];
116,097,475✔
255
  }
256

257
  // primary keys
258
  if (brinBlk->numOfPKs > 0) {  // decode the primary keys
23,223,626✔
259
    SValueColumnCompressInfo firstInfos[TD_MAX_PK_COLS];
4,220✔
260
    SValueColumnCompressInfo lastInfos[TD_MAX_PK_COLS];
4,220✔
261

262
    for (int32_t i = 0; i < brinBlk->numOfPKs; i++) {
8,440✔
263
      TAOS_CHECK_GOTO(tValueColumnCompressInfoDecode(&br, firstInfos + i), &lino, _exit);
4,220✔
264
    }
265
    for (int32_t i = 0; i < brinBlk->numOfPKs; i++) {
8,440✔
266
      TAOS_CHECK_GOTO(tValueColumnCompressInfoDecode(&br, lastInfos + i), &lino, _exit);
4,220✔
267
    }
268

269
    for (int32_t i = 0; i < brinBlk->numOfPKs; i++) {
8,440✔
270
      SValueColumnCompressInfo *info = firstInfos + i;
4,220✔
271

272
      TAOS_CHECK_GOTO(tValueColumnDecompress(BR_PTR(&br), info, brinBlock->firstKeyPKs + i, assist), &lino, _exit);
4,220✔
273
      br.offset += (info->offsetCompressedSize + info->dataCompressedSize);
4,220✔
274
    }
275

276
    for (int32_t i = 0; i < brinBlk->numOfPKs; i++) {
8,440✔
277
      SValueColumnCompressInfo *info = lastInfos + i;
4,220✔
278

279
      TAOS_CHECK_GOTO(tValueColumnDecompress(BR_PTR(&br), info, brinBlock->lastKeyPKs + i, assist), &lino, _exit);
4,220✔
280
      br.offset += (info->offsetCompressedSize + info->dataCompressedSize);
4,220✔
281
    }
282
  }
283

284
  if (br.offset != br.buffer->size) {
23,221,078✔
285
    tsdbError("vgId:%d %s failed at %s:%d since brin block size mismatch, expected: %u, actual: %u, fname:%s",
×
286
              TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, lino, br.buffer->size, br.offset,
287
              reader->fd[TSDB_FTYPE_HEAD]->path);
288
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
289
  }
290

291
_exit:
23,223,006✔
292
  if (code) {
23,216,909✔
293
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, lino,
×
294
              tstrerror(code));
295
  }
296
  return code;
23,216,909✔
297
}
298

299
extern int32_t tBlockDataDecompress(SBufferReader *br, SBlockData *blockData, SBuffer *assist);
300

301
int32_t tsdbDataFileReadBlockData(SDataFileReader *reader, const SBrinRecord *record, SBlockData *bData) {
2,792,309✔
302
  int32_t code = 0;
2,792,309✔
303
  int32_t lino = 0;
2,792,309✔
304
  int32_t fid = reader->config->files[TSDB_FTYPE_DATA].file.fid;
2,792,309✔
305

306
  SBuffer *buffer = reader->buffers + 0;
2,792,309✔
307
  SBuffer *assist = reader->buffers + 1;
2,792,309✔
308

309
  SEncryptData *pEncryptData = &(reader->config->tsdb->pVnode->config.tsdbCfg.encryptData);
2,792,309✔
310

311
  // load data
312
  tBufferClear(buffer);
313
  TAOS_CHECK_GOTO(tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_DATA], record->blockOffset, record->blockSize, buffer, 0,
2,792,309✔
314
                                       pEncryptData),
315
                  &lino, _exit);
316

317
  // decompress
318
  SBufferReader br = BUFFER_READER_INITIALIZER(0, buffer);
2,792,309✔
319
  TAOS_CHECK_GOTO(tBlockDataDecompress(&br, bData, assist), &lino, _exit);
2,792,309✔
320

321
  if (br.offset != buffer->size) {
2,792,309✔
322
    tsdbError("vgId:%d %s failed at %s:%d since block data size mismatch, expected: %u, actual: %u, fname:%s",
×
323
              TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, __LINE__, buffer->size, br.offset,
324
              reader->fd[TSDB_FTYPE_DATA]->path);
325
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
326
  }
327

328
_exit:
2,792,309✔
329
  if (code) {
2,792,309✔
330
    tsdbError("vgId:%d %s fid %d failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, fid,
×
331
              __FILE__, lino, tstrerror(code));
332
  }
333
  return code;
2,792,309✔
334
}
335

336
int32_t tsdbDataFileReadBlockDataByColumn(SDataFileReader *reader, const SBrinRecord *record, SBlockData *bData,
89,002,926✔
337
                                          STSchema *pTSchema, int16_t cids[], int32_t ncid) {
338
  int32_t code = 0;
89,002,926✔
339
  int32_t lino = 0;
89,002,926✔
340
  int32_t fid = reader->config->files[TSDB_FTYPE_DATA].file.fid;
89,009,363✔
341

342
  SDiskDataHdr hdr;
89,011,114✔
343
  SBuffer     *buffer0 = reader->buffers + 0;
89,020,864✔
344
  SBuffer     *buffer1 = reader->buffers + 1;
89,018,933✔
345
  SBuffer     *assist = reader->buffers + 2;
89,018,883✔
346

347
  SEncryptData *pEncryptData = &(reader->config->tsdb->pVnode->config.tsdbCfg.encryptData);
89,019,970✔
348

349
  // load key part
350
  tBufferClear(buffer0);
351
  TAOS_CHECK_GOTO(tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_DATA], record->blockOffset, record->blockKeySize, buffer0,
89,005,662✔
352
                                       0, pEncryptData),
353
                  &lino, _exit);
354

355
  // SDiskDataHdr
356
  SBufferReader br = BUFFER_READER_INITIALIZER(0, buffer0);
89,013,473✔
357
  TAOS_CHECK_GOTO(tGetDiskDataHdr(&br, &hdr), &lino, _exit);
89,016,705✔
358

359
  if (hdr.delimiter != TSDB_FILE_DLMT) {
88,973,000✔
360
    tsdbError("vgId:%d %s failed at %s:%d since disk data header delimiter is invalid, fname:%s",
×
361
              TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, __LINE__, reader->fd[TSDB_FTYPE_DATA]->path);
362
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
363
  }
364

365
  tBlockDataReset(bData);
88,973,000✔
366
  bData->suid = hdr.suid;
88,998,401✔
367
  bData->uid = hdr.uid;
89,017,429✔
368
  bData->nRow = hdr.nRow;
89,017,562✔
369

370
  // Key part
371
  TAOS_CHECK_GOTO(tBlockDataDecompressKeyPart(&hdr, &br, bData, assist), &lino, _exit);
89,008,537✔
372
  if (br.offset != buffer0->size) {
89,019,023✔
373
    tsdbError("vgId:%d %s failed at %s:%d since key part size mismatch, expected: %u, actual: %u, fname:%s",
7,057✔
374
              TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, __LINE__, buffer0->size, br.offset,
375
              reader->fd[TSDB_FTYPE_DATA]->path);
376
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
377
  }
378

379
  int extraColIdx = -1;
88,995,880✔
380
  for (int i = 0; i < ncid; i++) {
89,008,720✔
381
    if (tBlockDataGetColData(bData, cids[i]) == NULL) {
86,923,696✔
382
      extraColIdx = i;
86,917,423✔
383
      break;
86,917,423✔
384
    }
385
  }
386

387
  if (extraColIdx < 0) {
89,002,447✔
388
    goto _exit;
2,081,642✔
389
  }
390

391
  // load SBlockCol part
392
  tBufferClear(buffer0);
393
  TAOS_CHECK_GOTO(tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_DATA], record->blockOffset + record->blockKeySize,
86,920,298✔
394
                                       hdr.szBlkCol, buffer0, 0, pEncryptData),
395
                  &lino, _exit);
396

397
  // calc szHint
398
  int64_t szHint = 0;
86,935,807✔
399
  int     extraCols = 1;
86,935,807✔
400
  for (int i = extraColIdx + 1; i < ncid; ++i) {
86,935,807✔
401
    if (tBlockDataGetColData(bData, cids[i]) == NULL) {
47,007,057✔
402
      ++extraCols;
47,016,959✔
403
      break;
47,016,959✔
404
    }
405
  }
406

407
  if (extraCols >= 2) {
86,945,709✔
408
    br = BUFFER_READER_INITIALIZER(0, buffer0);
47,016,809✔
409

410
    SBlockCol blockCol = {.cid = 0};
47,016,809✔
411
    for (int32_t i = extraColIdx; i < ncid; ++i) {
47,018,740✔
412
      int16_t extraColCid = cids[i];
46,994,923✔
413

414
      while (extraColCid > blockCol.cid) {
156,947,032✔
415
        if (br.offset >= buffer0->size) {
109,926,201✔
416
          blockCol.cid = INT16_MAX;
×
417
          break;
×
418
        }
419

420
        TAOS_CHECK_GOTO(tGetBlockCol(&br, &blockCol, hdr.fmtVer, hdr.cmprAlg), &lino, _exit);
109,926,201✔
421
      }
422

423
      if (extraColCid == blockCol.cid || blockCol.cid == INT16_MAX) {
47,020,831✔
424
        extraColIdx = i;
47,020,831✔
425
        break;
47,020,831✔
426
      }
427
    }
428

429
    if (blockCol.cid > 0 && blockCol.cid < INT16_MAX /*&& blockCol->flag == HAS_VALUE*/) {
47,044,648✔
430
      int64_t   offset = blockCol.offset;
47,021,805✔
431
      SBlockCol lastNonNoneBlockCol = {.cid = 0};
47,021,805✔
432

433
      for (int32_t i = extraColIdx; i < ncid; ++i) {
254,279,110✔
434
        int16_t extraColCid = cids[i];
207,254,897✔
435

436
        while (extraColCid > blockCol.cid) {
571,645,425✔
437
          if (br.offset >= buffer0->size) {
364,388,120✔
438
            blockCol.cid = INT16_MAX;
×
439
            break;
×
440
          }
441

442
          TAOS_CHECK_GOTO(tGetBlockCol(&br, &blockCol, hdr.fmtVer, hdr.cmprAlg), &lino, _exit);
364,389,901✔
443
        }
444

445
        if (extraColCid == blockCol.cid) {
207,257,305✔
446
          lastNonNoneBlockCol = blockCol;
207,257,305✔
447
          continue;
207,257,305✔
448
        }
449

450
        if (blockCol.cid == INT16_MAX) {
×
451
          break;
×
452
        }
453
      }
454

455
      if (lastNonNoneBlockCol.cid > 0) {
47,024,213✔
456
        szHint = lastNonNoneBlockCol.offset + lastNonNoneBlockCol.szBitmap + lastNonNoneBlockCol.szOffset +
47,022,292✔
457
                 lastNonNoneBlockCol.szValue - offset;
47,022,292✔
458
      }
459
    }
460
  }
461

462
  // load each column
463
  SBlockCol blockCol = {
86,951,192✔
464
      .cid = 0,
465
  };
466
  bool firstRead = true;
86,926,411✔
467
  br = BUFFER_READER_INITIALIZER(0, buffer0);
86,926,411✔
468
  for (int32_t i = 0; i < ncid; i++) {
334,109,901✔
469
    int16_t cid = cids[i];
247,168,444✔
470

471
    if (tBlockDataGetColData(bData, cid)) {  // already loaded
247,176,165✔
472
      continue;
1,506✔
473
    }
474

475
    while (cid > blockCol.cid) {
878,525,699✔
476
      if (br.offset >= buffer0->size) {
631,318,484✔
477
        blockCol.cid = INT16_MAX;
×
478
        break;
×
479
      }
480

481
      TAOS_CHECK_GOTO(tGetBlockCol(&br, &blockCol, hdr.fmtVer, hdr.cmprAlg), &lino, _exit);
631,332,169✔
482
    }
483

484
    if (cid < blockCol.cid) {
247,207,215✔
485
      const STColumn *tcol = tTSchemaSearchColumn(pTSchema, cid);
×
486
      TSDB_CHECK_NULL(tcol, code, lino, _exit, TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER);
×
487
      SBlockCol none = {
×
488
          .cid = cid,
489
          .type = tcol->type,
×
490
          .cflag = tcol->flags,
×
491
          .flag = HAS_NONE,
492
          .szOrigin = 0,
493
          .szBitmap = 0,
494
          .szOffset = 0,
495
          .szValue = 0,
496
          .offset = 0,
497
      };
498
      TAOS_CHECK_GOTO(tBlockDataDecompressColData(&hdr, &none, &br, bData, assist), &lino, _exit);
×
499
    } else if (cid == blockCol.cid) {
247,207,215✔
500
      SEncryptData *pEncryptData = &(reader->config->tsdb->pVnode->config.tsdbCfg.encryptData);
247,159,090✔
501

502
      // load from file
503
      tBufferClear(buffer1);
504
      TAOS_CHECK_GOTO(tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_DATA],
247,196,466✔
505
                                           record->blockOffset + record->blockKeySize + hdr.szBlkCol + blockCol.offset,
506
                                           blockCol.szBitmap + blockCol.szOffset + blockCol.szValue, buffer1,
507
                                           firstRead ? szHint : 0, pEncryptData),
508
                      &lino, _exit);
509

510
      firstRead = false;
247,185,880✔
511

512
      // decode the buffer
513
      SBufferReader br1 = BUFFER_READER_INITIALIZER(0, buffer1);
247,185,880✔
514
      TAOS_CHECK_GOTO(tBlockDataDecompressColData(&hdr, &blockCol, &br1, bData, assist), &lino, _exit);
247,193,074✔
515
    }
516
  }
517

518
_exit:
89,058,957✔
519
  if (code) {
89,020,140✔
520
    tsdbError("vgId:%d %s fid:%d failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, fid,
×
521
              __FILE__, lino, tstrerror(code));
522
  }
523
  return code;
89,020,140✔
524
}
525

526
int32_t tsdbDataFileReadBlockSma(SDataFileReader *reader, const SBrinRecord *record,
34,148,954✔
527
                                 TColumnDataAggArray *columnDataAggArray) {
528
  int32_t  code = 0;
34,148,954✔
529
  int32_t  lino = 0;
34,148,954✔
530
  SBuffer *buffer = reader->buffers + 0;
34,161,834✔
531

532
  TARRAY2_CLEAR(columnDataAggArray, NULL);
34,172,096✔
533
  if (record->smaSize > 0) {
34,170,822✔
534
    tBufferClear(buffer);
535
    SEncryptData *pEncryptData = &(reader->config->tsdb->pVnode->config.tsdbCfg.encryptData);
34,165,059✔
536

537
    TAOS_CHECK_GOTO(
34,172,096✔
538
        tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_SMA], record->smaOffset, record->smaSize, buffer, 0, pEncryptData),
539
        &lino, _exit);
540

541
    // decode sma data
542
    SBufferReader br = BUFFER_READER_INITIALIZER(0, buffer);
34,163,765✔
543
    while (br.offset < record->smaSize) {
435,029,629✔
544
      SColumnDataAgg sma[1];
400,828,754✔
545

546
      TAOS_CHECK_GOTO(tGetColumnDataAgg(&br, sma), &lino, _exit);
400,842,676✔
547
      TAOS_CHECK_GOTO(TARRAY2_APPEND_PTR(columnDataAggArray, sma), &lino, _exit);
801,671,577✔
548
    }
549
    if (br.offset != record->smaSize) {
34,172,096✔
550
      tsdbError("vgId:%d %s failed at %s:%d since sma data size mismatch, expected: %u, actual: %u, fname:%s",
×
551
                TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, __LINE__, record->smaSize, br.offset,
552
                reader->fd[TSDB_FTYPE_SMA]->path);
553
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
554
    }
555
  }
556

557
_exit:
34,163,735✔
558
  if (code) {
34,170,812✔
559
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, lino,
×
560
              tstrerror(code));
561
  }
562
  return code;
34,170,812✔
563
}
564

565
int32_t tsdbDataFileReadTombBlk(SDataFileReader *reader, const TTombBlkArray **tombBlkArray) {
5,224,379✔
566
  int32_t code = 0;
5,224,379✔
567
  int32_t lino = 0;
5,224,379✔
568
  void   *data = NULL;
5,224,379✔
569

570
  if (!reader->ctx->tombBlkLoaded) {
5,224,379✔
571
    TAOS_CHECK_GOTO(tsdbDataFileReadTombFooter(reader), &lino, _exit);
3,214,544✔
572

573
    if (reader->tombFooter->tombBlkPtr->size > 0) {
3,215,889✔
574
      if ((data = taosMemoryMalloc(reader->tombFooter->tombBlkPtr->size)) == NULL) {
3,058,457✔
575
        TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
576
      }
577

578
      SEncryptData *pEncryptData = &(reader->config->tsdb->pVnode->config.tsdbCfg.encryptData);
3,058,457✔
579

580
      TAOS_CHECK_GOTO(tsdbReadFile(reader->fd[TSDB_FTYPE_TOMB], reader->tombFooter->tombBlkPtr->offset, data,
3,058,457✔
581
                                   reader->tombFooter->tombBlkPtr->size, 0, pEncryptData),
582
                      &lino, _exit);
583

584
      int32_t size = reader->tombFooter->tombBlkPtr->size / sizeof(STombBlk);
3,058,457✔
585
      TARRAY2_INIT_EX(reader->tombBlkArray, size, size, data);
3,058,457✔
586
    } else {
587
      TARRAY2_INIT(reader->tombBlkArray);
157,432✔
588
    }
589

590
    reader->ctx->tombBlkLoaded = true;
3,215,889✔
591
  }
592
  tombBlkArray[0] = reader->tombBlkArray;
5,225,724✔
593

594
_exit:
5,224,379✔
595
  if (code) {
5,224,379✔
596
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, lino,
×
597
              tstrerror(code));
598
    taosMemoryFree(data);
×
599
  }
600
  return code;
5,224,379✔
601
}
602

603
int32_t tsdbDataFileReadTombBlock(SDataFileReader *reader, const STombBlk *tombBlk, STombBlock *tData) {
1,899,619✔
604
  int32_t code = 0;
1,899,619✔
605
  int32_t lino = 0;
1,899,619✔
606

607
  SBuffer *buffer0 = reader->buffers + 0;
1,899,619✔
608
  SBuffer *assist = reader->buffers + 1;
1,899,619✔
609

610
  tBufferClear(buffer0);
611
  SEncryptData *pEncryptData = &(reader->config->tsdb->pVnode->config.tsdbCfg.encryptData);
1,899,619✔
612

613
  TAOS_CHECK_GOTO(tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_TOMB], tombBlk->dp->offset, tombBlk->dp->size, buffer0, 0,
1,899,619✔
614
                                       pEncryptData),
615
                  &lino, _exit);
616

617
  int32_t       size = 0;
1,899,619✔
618
  SBufferReader br = BUFFER_READER_INITIALIZER(0, buffer0);
1,899,619✔
619
  tTombBlockClear(tData);
1,899,619✔
620
  tData->numOfRecords = tombBlk->numRec;
1,899,619✔
621
  for (int32_t i = 0; i < ARRAY_SIZE(tData->buffers); ++i) {
11,397,714✔
622
    SCompressInfo cinfo = {
9,498,095✔
623
        .cmprAlg = tombBlk->cmprAlg,
9,498,095✔
624
        .dataType = TSDB_DATA_TYPE_BIGINT,
625
        .originalSize = tombBlk->numRec * sizeof(int64_t),
9,498,095✔
626
        .compressedSize = tombBlk->size[i],
9,498,095✔
627
    };
628
    TAOS_CHECK_GOTO(tDecompressDataToBuffer(BR_PTR(&br), &cinfo, tData->buffers + i, assist), &lino, _exit);
9,498,095✔
629
    br.offset += tombBlk->size[i];
9,498,095✔
630
  }
631

632
_exit:
1,899,619✔
633
  if (code) {
1,899,619✔
634
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, lino,
×
635
              tstrerror(code));
636
  }
637
  return code;
1,899,619✔
638
}
639

640
// SDataFileWriter =============================================
641
struct SDataFileWriter {
642
  SDataFileWriterConfig config[1];
643

644
  SSkmInfo skmTb[1];
645
  SSkmInfo skmRow[1];
646
  SBuffer  local[10];
647
  SBuffer *buffers;
648

649
  struct {
650
    bool             opened;
651
    SDataFileReader *reader;
652

653
    // for ts data
654
    TABLEID tbid[1];
655
    bool    tbHasOldData;
656

657
    const TBrinBlkArray *brinBlkArray;
658
    int32_t              brinBlkArrayIdx;
659
    SBrinBlock           brinBlock[1];
660
    int32_t              brinBlockIdx;
661
    SBlockData           blockData[1];
662
    int32_t              blockDataIdx;
663
    // for tomb data
664
    bool                 hasOldTomb;
665
    const TTombBlkArray *tombBlkArray;
666
    int32_t              tombBlkArrayIdx;
667
    STombBlock           tombBlock[1];
668
    int32_t              tombBlockIdx;
669
    // range
670
    SVersionRange range;
671
    SVersionRange tombRange;
672
  } ctx[1];
673

674
  STFile   files[TSDB_FTYPE_MAX];
675
  STsdbFD *fd[TSDB_FTYPE_MAX];
676

677
  SHeadFooter headFooter[1];
678
  STombFooter tombFooter[1];
679

680
  TBrinBlkArray brinBlkArray[1];
681
  SBrinBlock    brinBlock[1];
682
  SBlockData    blockData[1];
683

684
  TTombBlkArray tombBlkArray[1];
685
  STombBlock    tombBlock[1];
686
};
687

688
static int32_t tsdbDataFileWriterCloseAbort(SDataFileWriter *writer) {
×
689
  tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, __LINE__,
×
690
            "not implemented");
691
  return 0;
×
692
}
693

694
static void tsdbDataFileWriterDoClose(SDataFileWriter *writer) {
831,918✔
695
  if (writer->ctx->reader) {
831,918✔
696
    tsdbDataFileReaderClose(&writer->ctx->reader);
405,272✔
697
  }
698

699
  tTombBlockDestroy(writer->tombBlock);
831,918✔
700
  TARRAY2_DESTROY(writer->tombBlkArray, NULL);
831,158✔
701
  tBlockDataDestroy(writer->blockData);
831,158✔
702
  tBrinBlockDestroy(writer->brinBlock);
831,918✔
703
  TARRAY2_DESTROY(writer->brinBlkArray, NULL);
831,918✔
704

705
  tTombBlockDestroy(writer->ctx->tombBlock);
831,918✔
706
  tBlockDataDestroy(writer->ctx->blockData);
831,918✔
707
  tBrinBlockDestroy(writer->ctx->brinBlock);
831,918✔
708

709
  for (int32_t i = 0; i < ARRAY_SIZE(writer->local); ++i) {
9,148,176✔
710
    tBufferDestroy(writer->local + i);
8,316,258✔
711
  }
712

713
  tDestroyTSchema(writer->skmRow->pTSchema);
831,918✔
714
  tDestroyTSchema(writer->skmTb->pTSchema);
831,918✔
715
}
831,918✔
716

717
static int32_t tsdbDataFileWriterDoOpenReader(SDataFileWriter *writer) {
831,918✔
718
  int32_t code = 0;
831,918✔
719
  int32_t lino = 0;
831,918✔
720

721
  for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) {
2,857,093✔
722
    if (writer->config->files[i].exist) {
2,430,447✔
723
      SDataFileReaderConfig config[1] = {{
405,272✔
724
          .tsdb = writer->config->tsdb,
405,272✔
725
          .szPage = writer->config->szPage,
405,272✔
726
          .buffers = writer->buffers,
405,272✔
727
      }};
728

729
      for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) {
2,026,360✔
730
        config->files[i].exist = writer->config->files[i].exist;
1,621,088✔
731
        if (config->files[i].exist) {
1,621,088✔
732
          config->files[i].file = writer->config->files[i].file;
1,004,396✔
733
        }
734
      }
735

736
      TAOS_CHECK_GOTO(tsdbDataFileReaderOpen(NULL, config, &writer->ctx->reader), &lino, _exit);
405,272✔
737
      break;
405,272✔
738
    }
739
  }
740

741
_exit:
831,918✔
742
  if (code) {
831,918✔
743
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
744
              tstrerror(code));
745
  }
746
  return code;
831,918✔
747
}
748

749
static int32_t tsdbDataFileWriterDoOpen(SDataFileWriter *writer) {
831,918✔
750
  int32_t code = 0;
831,918✔
751
  int32_t lino = 0;
831,918✔
752
  int32_t ftype;
753
  SDiskID diskId = {0};
831,918✔
754

755
  if (!writer->config->skmTb) writer->config->skmTb = writer->skmTb;
831,918✔
756
  if (!writer->config->skmRow) writer->config->skmRow = writer->skmRow;
831,918✔
757
  writer->buffers = writer->config->buffers;
831,918✔
758
  if (writer->buffers == NULL) {
831,918✔
759
    writer->buffers = writer->local;
×
760
  }
761

762
  // open reader
763
  TAOS_CHECK_GOTO(tsdbDataFileWriterDoOpenReader(writer), &lino, _exit);
831,918✔
764

765
  // .head
766
  ftype = TSDB_FTYPE_HEAD;
831,918✔
767
  code = tsdbAllocateDisk(writer->config->tsdb, tsdbFTypeLabel(ftype), writer->config->expLevel, &diskId);
831,918✔
768
  TSDB_CHECK_CODE(code, lino, _exit);
831,918✔
769
  writer->files[ftype] = (STFile){
1,663,836✔
770
      .type = ftype,
771
      .did = diskId,
772
      .fid = writer->config->fid,
831,918✔
773
      .cid = writer->config->cid,
831,918✔
774
      .size = 0,
775
      .minVer = VERSION_MAX,
776
      .maxVer = VERSION_MIN,
777
  };
778

779
  // .data
780
  ftype = TSDB_FTYPE_DATA;
831,918✔
781
  if (writer->config->files[ftype].exist) {
831,918✔
782
    writer->files[ftype] = writer->config->files[ftype].file;
299,075✔
783
  } else {
784
    code = tsdbAllocateDisk(writer->config->tsdb, tsdbFTypeLabel(ftype), writer->config->expLevel, &diskId);
532,843✔
785
    TSDB_CHECK_CODE(code, lino, _exit);
532,843✔
786
    writer->files[ftype] = (STFile){
1,065,686✔
787
        .type = ftype,
788
        .did = diskId,
789
        .fid = writer->config->fid,
532,843✔
790
        .cid = writer->config->cid,
532,843✔
791
        .size = 0,
792
        .lcn = writer->config->lcn == 0 ? -1 : 0,
532,843✔
793
        .minVer = VERSION_MAX,
794
        .maxVer = VERSION_MIN,
795
    };
796
  }
797

798
  // .sma
799
  ftype = TSDB_FTYPE_SMA;
831,918✔
800
  if (writer->config->files[ftype].exist) {
831,918✔
801
    writer->files[ftype] = writer->config->files[ftype].file;
299,075✔
802
  } else {
803
    code = tsdbAllocateDisk(writer->config->tsdb, tsdbFTypeLabel(ftype), writer->config->expLevel, &diskId);
532,843✔
804
    TSDB_CHECK_CODE(code, lino, _exit);
532,843✔
805
    writer->files[ftype] = (STFile){
1,065,686✔
806
        .type = ftype,
807
        .did = diskId,
808
        .fid = writer->config->fid,
532,843✔
809
        .cid = writer->config->cid,
532,843✔
810
        .size = 0,
811
        .minVer = VERSION_MAX,
812
        .maxVer = VERSION_MIN,
813
    };
814
  }
815

816
  // .tomb
817
  ftype = TSDB_FTYPE_TOMB;
831,918✔
818
  code = tsdbAllocateDisk(writer->config->tsdb, tsdbFTypeLabel(ftype), writer->config->expLevel, &diskId);
831,918✔
819
  TSDB_CHECK_CODE(code, lino, _exit);
831,918✔
820
  writer->files[ftype] = (STFile){
1,663,836✔
821
      .type = ftype,
822
      .did = diskId,
823
      .fid = writer->config->fid,
831,918✔
824
      .cid = writer->config->cid,
831,918✔
825
      .size = 0,
826
      .minVer = VERSION_MAX,
827
      .maxVer = VERSION_MIN,
828
  };
829

830
  // range
831
  writer->ctx->range = (SVersionRange){.minVer = VERSION_MAX, .maxVer = VERSION_MIN};
831,918✔
832
  writer->ctx->tombRange = (SVersionRange){.minVer = VERSION_MAX, .maxVer = VERSION_MIN};
831,918✔
833

834
  writer->ctx->opened = true;
831,918✔
835

836
_exit:
831,431✔
837
  if (code) {
831,431✔
838
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
839
              tstrerror(code));
840
  }
841
  return code;
831,431✔
842
}
843

844
void tsdbWriterUpdVerRange(SVersionRange *range, int64_t minVer, int64_t maxVer) {
82,014,223✔
845
  range->minVer = TMIN(range->minVer, minVer);
82,014,223✔
846
  range->maxVer = TMAX(range->maxVer, maxVer);
82,029,518✔
847
}
82,030,900✔
848

849
int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, uint32_t cmprAlg, int64_t *fileSize,
789,342✔
850
                               TBrinBlkArray *brinBlkArray, SBuffer *buffers, SVersionRange *range,
851
                               SEncryptData *encryptData) {
852
  if (brinBlock->numOfRecords == 0) {
789,342✔
853
    return 0;
×
854
  }
855

856
  int32_t  code;
857
  SBuffer *buffer0 = buffers + 0;
788,855✔
858
  SBuffer *buffer1 = buffers + 1;
788,855✔
859
  SBuffer *assist = buffers + 2;
788,855✔
860

861
  SBrinBlk brinBlk = {
788,855✔
862
      .dp[0] =
863
          {
864
              .offset = *fileSize,
788,855✔
865
              .size = 0,
866
          },
867
      .numRec = brinBlock->numOfRecords,
789,342✔
868
      .numOfPKs = brinBlock->numOfPKs,
789,342✔
869
      .cmprAlg = cmprAlg,
870
  };
871
  for (int i = 0; i < brinBlock->numOfRecords; i++) {
49,455,111✔
872
    SBrinRecord record;
48,666,256✔
873

874
    TAOS_CHECK_RETURN(tBrinBlockGet(brinBlock, i, &record));
48,666,256✔
875
    if (i == 0) {
48,665,769✔
876
      brinBlk.minTbid.suid = record.suid;
789,342✔
877
      brinBlk.minTbid.uid = record.uid;
789,342✔
878
      brinBlk.minVer = record.minVer;
789,342✔
879
      brinBlk.maxVer = record.maxVer;
789,342✔
880
    }
881
    if (i == brinBlock->numOfRecords - 1) {
48,665,769✔
882
      brinBlk.maxTbid.suid = record.suid;
789,342✔
883
      brinBlk.maxTbid.uid = record.uid;
789,342✔
884
    }
885
    if (record.minVer < brinBlk.minVer) {
48,665,769✔
886
      brinBlk.minVer = record.minVer;
486,844✔
887
    }
888
    if (record.maxVer > brinBlk.maxVer) {
48,665,769✔
889
      brinBlk.maxVer = record.maxVer;
13,888,398✔
890
    }
891
  }
892

893
  tsdbWriterUpdVerRange(range, brinBlk.minVer, brinBlk.maxVer);
788,855✔
894

895
  // write to file
896
  for (int32_t i = 0; i < 10; ++i) {
8,682,275✔
897
    SCompressInfo info = {
7,892,933✔
898
        .cmprAlg = cmprAlg,
899
        .dataType = TSDB_DATA_TYPE_BIGINT,
900
        .originalSize = brinBlock->buffers[i].size,
7,892,933✔
901
    };
902

903
    tBufferClear(buffer0);
904
    TAOS_CHECK_RETURN(tCompressDataToBuffer(brinBlock->buffers[i].data, &info, buffer0, assist));
7,893,420✔
905
    TAOS_CHECK_RETURN(tsdbWriteFile(fd, *fileSize, buffer0->data, buffer0->size, encryptData));
7,893,420✔
906
    brinBlk.size[i] = info.compressedSize;
7,892,933✔
907
    brinBlk.dp->size += info.compressedSize;
7,893,420✔
908
    *fileSize += info.compressedSize;
7,893,420✔
909
  }
910
  for (int32_t i = 10; i < 15; ++i) {
4,736,052✔
911
    SCompressInfo info = {
3,946,710✔
912
        .cmprAlg = cmprAlg,
913
        .dataType = TSDB_DATA_TYPE_INT,
914
        .originalSize = brinBlock->buffers[i].size,
3,946,710✔
915
    };
916

917
    tBufferClear(buffer0);
918
    TAOS_CHECK_RETURN(tCompressDataToBuffer(brinBlock->buffers[i].data, &info, buffer0, assist));
3,946,710✔
919
    TAOS_CHECK_RETURN(tsdbWriteFile(fd, *fileSize, buffer0->data, buffer0->size, encryptData));
3,946,710✔
920
    brinBlk.size[i] = info.compressedSize;
3,946,710✔
921
    brinBlk.dp->size += info.compressedSize;
3,946,710✔
922
    *fileSize += info.compressedSize;
3,946,710✔
923
  }
924

925
  // write primary keys to file
926
  if (brinBlock->numOfPKs > 0) {
789,342✔
927
    tBufferClear(buffer0);
928
    tBufferClear(buffer1);
929

930
    // encode
931
    for (int i = 0; i < brinBlock->numOfPKs; i++) {
8,384✔
932
      SValueColumnCompressInfo info = {.cmprAlg = cmprAlg};
4,192✔
933
      TAOS_CHECK_RETURN(tValueColumnCompress(&brinBlock->firstKeyPKs[i], &info, buffer1, assist));
4,192✔
934
      TAOS_CHECK_RETURN(tValueColumnCompressInfoEncode(&info, buffer0));
4,192✔
935
    }
936
    for (int i = 0; i < brinBlock->numOfPKs; i++) {
8,384✔
937
      SValueColumnCompressInfo info = {.cmprAlg = cmprAlg};
4,192✔
938
      TAOS_CHECK_RETURN(tValueColumnCompress(&brinBlock->lastKeyPKs[i], &info, buffer1, assist));
4,192✔
939
      TAOS_CHECK_RETURN(tValueColumnCompressInfoEncode(&info, buffer0));
4,192✔
940
    }
941

942
    // write to file
943
    TAOS_CHECK_RETURN(tsdbWriteFile(fd, *fileSize, buffer0->data, buffer0->size, encryptData));
4,192✔
944
    *fileSize += buffer0->size;
4,192✔
945
    brinBlk.dp->size += buffer0->size;
4,192✔
946
    TAOS_CHECK_RETURN(tsdbWriteFile(fd, *fileSize, buffer1->data, buffer1->size, encryptData));
4,192✔
947
    *fileSize += buffer1->size;
4,192✔
948
    brinBlk.dp->size += buffer1->size;
4,192✔
949
  }
950

951
  // append to brinBlkArray
952
  TAOS_CHECK_RETURN(TARRAY2_APPEND_PTR(brinBlkArray, &brinBlk));
1,578,684✔
953

954
  tBrinBlockClear(brinBlock);
789,342✔
955

956
  return 0;
789,342✔
957
}
958

959
static int32_t tsdbDataFileWriteBrinBlock(SDataFileWriter *writer) {
789,342✔
960
  if (writer->brinBlock->numOfRecords == 0) {
789,342✔
961
    return 0;
×
962
  }
963

964
  int32_t code = 0;
789,342✔
965
  int32_t lino = 0;
789,342✔
966

967
  SEncryptData *pEncryptData = &(writer->config->tsdb->pVnode->config.tsdbCfg.encryptData);
789,342✔
968

969
  TAOS_CHECK_GOTO(tsdbFileWriteBrinBlock(writer->fd[TSDB_FTYPE_HEAD], writer->brinBlock, writer->config->cmprAlg,
789,342✔
970
                                         &writer->files[TSDB_FTYPE_HEAD].size, writer->brinBlkArray, writer->buffers,
971
                                         &writer->ctx->range, pEncryptData),
972
                  &lino, _exit);
973

974
_exit:
789,342✔
975
  if (code) {
789,342✔
976
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
977
              tstrerror(code));
978
  }
979
  return code;
789,342✔
980
}
981

982
static int32_t tsdbDataFileWriteBrinRecord(SDataFileWriter *writer, const SBrinRecord *record) {
48,665,122✔
983
  int32_t code = 0;
48,665,122✔
984
  int32_t lino = 0;
48,665,122✔
985

986
  for (;;) {
987
    code = tBrinBlockPut(writer->brinBlock, record);
48,666,698✔
988
    if (code == TSDB_CODE_INVALID_PARA) {
48,659,458✔
989
      // different records with different primary keys
990
      TAOS_CHECK_GOTO(tsdbDataFileWriteBrinBlock(writer), &lino, _exit);
1,576✔
991
      continue;
1,576✔
992
    } else {
993
      TSDB_CHECK_CODE(code, lino, _exit);
48,657,882✔
994
    }
995
    break;
48,657,882✔
996
  }
997

998
  if ((writer->brinBlock->numOfRecords) >= 256) {
48,657,882✔
999
    TAOS_CHECK_GOTO(tsdbDataFileWriteBrinBlock(writer), &lino, _exit);
97,497✔
1000
  }
1001

1002
_exit:
48,654,478✔
1003
  if (code) {
48,658,756✔
1004
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1005
              tstrerror(code));
1006
  }
1007
  return code;
48,658,756✔
1008
}
1009

1010
static int32_t tsdbDataFileDoWriteBlockData(SDataFileWriter *writer, SBlockData *bData) {
48,381,020✔
1011
  if (bData->nRow == 0) {
48,381,020✔
1012
    return 0;
13,418,922✔
1013
  }
1014

1015
  if (!bData->uid) {
34,964,258✔
1016
    return TSDB_CODE_INVALID_PARA;
×
1017
  }
1018

1019
  int32_t  code = 0;
34,964,437✔
1020
  int32_t  lino = 0;
34,964,437✔
1021
  SBuffer *buffers = writer->buffers;
34,966,255✔
1022
  SBuffer *assist = writer->buffers + 4;
34,967,115✔
1023

1024
  SColCompressInfo cmprInfo = {.pColCmpr = NULL, .defaultCmprAlg = writer->config->cmprAlg};
34,966,827✔
1025

1026
  SBrinRecord record[1] = {{
34,967,512✔
1027
      .suid = bData->suid,
34,967,098✔
1028
      .uid = bData->uid,
34,964,258✔
1029
      .minVer = bData->aVersion[0],
34,966,476✔
1030
      .maxVer = bData->aVersion[0],
34,967,863✔
1031
      .blockOffset = writer->files[TSDB_FTYPE_DATA].size,
34,966,638✔
1032
      .smaOffset = writer->files[TSDB_FTYPE_SMA].size,
34,964,585✔
1033
      .blockSize = 0,
1034
      .blockKeySize = 0,
1035
      .smaSize = 0,
1036
      .numRow = bData->nRow,
34,967,863✔
1037
      .count = 1,
1038
  }};
1039

1040
  tsdbRowGetKey(&tsdbRowFromBlockData(bData, 0), &record->firstKey);
34,967,863✔
1041
  tsdbRowGetKey(&tsdbRowFromBlockData(bData, bData->nRow - 1), &record->lastKey);
34,963,732✔
1042

1043
  for (int32_t i = 1; i < bData->nRow; ++i) {
2,147,483,647✔
1044
    if (tsdbRowCompareWithoutVersion(&tsdbRowFromBlockData(bData, i - 1), &tsdbRowFromBlockData(bData, i)) != 0) {
2,147,483,647✔
1045
      record->count++;
2,147,483,647✔
1046
    }
1047
    if (bData->aVersion[i] < record->minVer) {
2,147,483,647✔
1048
      record->minVer = bData->aVersion[i];
5,159,836✔
1049
    }
1050
    if (bData->aVersion[i] > record->maxVer) {
2,147,483,647✔
1051
      record->maxVer = bData->aVersion[i];
285,897,927✔
1052
    }
1053
  }
1054

1055
  tsdbWriterUpdVerRange(&writer->ctx->range, record->minVer, record->maxVer);
34,968,214✔
1056

1057
  code = metaGetColCmpr(writer->config->tsdb->pVnode->pMeta, bData->suid != 0 ? bData->suid : bData->uid,
34,968,214✔
1058
                        &cmprInfo.pColCmpr);
1059
  if (code) {
34,962,734✔
UNCOV
1060
    tsdbWarn("vgId:%d failed to get column compress algrithm", TD_VID(writer->config->tsdb->pVnode));
×
1061
  }
1062

1063
  TAOS_CHECK_GOTO(tBlockDataCompress(bData, &cmprInfo, buffers, assist), &lino, _exit);
34,962,734✔
1064

1065
  record->blockKeySize = buffers[0].size + buffers[1].size;
34,961,842✔
1066
  record->blockSize = record->blockKeySize + buffers[2].size + buffers[3].size;
34,964,696✔
1067

1068
  SEncryptData *pEncryptData = &(writer->config->tsdb->pVnode->config.tsdbCfg.encryptData);
34,965,938✔
1069

1070
  for (int i = 0; i < 4; i++) {
174,816,656✔
1071
    TAOS_CHECK_GOTO(tsdbWriteFile(writer->fd[TSDB_FTYPE_DATA], writer->files[TSDB_FTYPE_DATA].size, buffers[i].data,
139,848,442✔
1072
                                  buffers[i].size, pEncryptData),
1073
                    &lino, _exit);
1074
    writer->files[TSDB_FTYPE_DATA].size += buffers[i].size;
139,840,749✔
1075
  }
1076

1077
  // to .sma file
1078
  tBufferClear(&buffers[0]);
1079
  for (int32_t i = 0; i < bData->nColData; ++i) {
273,707,783✔
1080
    SColData *colData = bData->aColData + i;
238,739,498✔
1081
    if ((colData->cflag & COL_SMA_ON) == 0 || ((colData->flag & HAS_VALUE) == 0)) continue;
238,741,122✔
1082

1083
    SColumnDataAgg sma[1] = {{.colId = colData->cid}};
232,923,639✔
1084
    tColDataCalcSMA[colData->type](colData, sma);
232,919,197✔
1085

1086
    TAOS_CHECK_GOTO(tPutColumnDataAgg(&buffers[0], sma), &lino, _exit);
232,893,984✔
1087
  }
1088
  record->smaSize = buffers[0].size;
34,965,806✔
1089

1090
  if (record->smaSize > 0) {
34,965,806✔
1091
    TAOS_CHECK_GOTO(
34,960,927✔
1092
        tsdbWriteFile(writer->fd[TSDB_FTYPE_SMA], record->smaOffset, buffers[0].data, record->smaSize, pEncryptData),
1093
        &lino, _exit);
1094
    writer->files[TSDB_FTYPE_SMA].size += record->smaSize;
34,960,755✔
1095
  }
1096

1097
  // append SBrinRecord
1098
  TAOS_CHECK_GOTO(tsdbDataFileWriteBrinRecord(writer, record), &lino, _exit);
34,967,577✔
1099

1100
  tBlockDataClear(bData);
34,957,519✔
1101

1102
_exit:
34,959,787✔
1103
  if (code) {
34,962,653✔
1104
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1105
              tstrerror(code));
1106
  }
1107
  taosHashCleanup(cmprInfo.pColCmpr);
34,962,653✔
1108
  return code;
34,955,779✔
1109
}
1110

1111
static int32_t tsdbDataFileDoWriteTSRow(SDataFileWriter *writer, TSDBROW *row) {
2,147,483,647✔
1112
  int32_t code = 0;
2,147,483,647✔
1113
  int32_t lino = 0;
2,147,483,647✔
1114

1115
  // update/append
1116
  if (row->type == TSDBROW_ROW_FMT) {
2,147,483,647✔
1117
    TAOS_CHECK_GOTO(
×
1118
        tsdbUpdateSkmRow(writer->config->tsdb, writer->ctx->tbid, TSDBROW_SVERSION(row), writer->config->skmRow), &lino,
1119
        _exit);
1120
  }
1121

1122
  if (TSDBROW_VERSION(row) <= writer->config->compactVersion  //
2,147,483,647✔
1123
      && writer->blockData->nRow > 0                          //
2,147,483,647✔
1124
      &&
2,147,483,647✔
1125
      tsdbRowCompareWithoutVersion(row, &tsdbRowFromBlockData(writer->blockData, writer->blockData->nRow - 1)) == 0  //
2,147,483,647✔
1126
  ) {
1127
    TAOS_CHECK_GOTO(tBlockDataUpdateRow(writer->blockData, row, writer->config->skmRow->pTSchema), &lino, _exit);
2,147,483,647✔
1128
  } else {
1129
    if (writer->blockData->nRow >= writer->config->maxRow) {
2,147,483,647✔
1130
      TAOS_CHECK_GOTO(tsdbDataFileDoWriteBlockData(writer, writer->blockData), &lino, _exit);
2,843,436✔
1131
    }
1132

1133
    TAOS_CHECK_GOTO(
2,147,483,647✔
1134
        tBlockDataAppendRow(writer->blockData, row, writer->config->skmRow->pTSchema, writer->ctx->tbid->uid), &lino,
1135
        _exit);
1136
  }
1137

1138
_exit:
2,147,483,647✔
1139
  if (code) {
2,147,483,647✔
1140
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1141
              tstrerror(code));
1142
  }
1143
  return code;
2,147,483,647✔
1144
}
1145

1146
static FORCE_INLINE int32_t tsdbRowKeyCmprNullAsLargest(const STsdbRowKey *key1, const STsdbRowKey *key2) {
1147
  if (key1 == NULL) {
2,147,483,647✔
1148
    return 1;
554,366✔
1149
  } else if (key2 == NULL) {
2,147,483,647✔
1150
    return -1;
7,596,755✔
1151
  } else {
1152
    return tsdbRowKeyCmpr(key1, key2);
2,147,483,647✔
1153
  }
1154
}
1155

1156
static int32_t tsdbDataFileDoWriteTableOldData(SDataFileWriter *writer, const STsdbRowKey *key) {
2,147,483,647✔
1157
  if (writer->ctx->tbHasOldData == false) {
2,147,483,647✔
1158
    return 0;
×
1159
  }
1160

1161
  int32_t     code = 0;
2,147,483,647✔
1162
  int32_t     lino = 0;
2,147,483,647✔
1163
  STsdbRowKey rowKey;
2,147,483,647✔
1164

1165
  for (;;) {
7,115✔
1166
    for (;;) {
1167
      // SBlockData
1168
      for (; writer->ctx->blockDataIdx < writer->ctx->blockData->nRow; writer->ctx->blockDataIdx++) {
2,147,483,647✔
1169
        TSDBROW row = tsdbRowFromBlockData(writer->ctx->blockData, writer->ctx->blockDataIdx);
2,147,483,647✔
1170

1171
        tsdbRowGetKey(&row, &rowKey);
2,147,483,647✔
1172
        if (tsdbRowKeyCmprNullAsLargest(&rowKey, key) < 0) {  // key <= rowKey
2,147,483,647✔
1173
          TAOS_CHECK_GOTO(tsdbDataFileDoWriteTSRow(writer, &row), &lino, _exit);
2,147,483,647✔
1174
        } else {
1175
          goto _exit;
2,147,483,647✔
1176
        }
1177
      }
1178

1179
      // SBrinBlock
1180
      if (writer->ctx->brinBlockIdx >= writer->ctx->brinBlock->numOfRecords) {
6,913,253✔
1181
        break;
189,452✔
1182
      }
1183

1184
      for (; writer->ctx->brinBlockIdx < writer->ctx->brinBlock->numOfRecords; writer->ctx->brinBlockIdx++) {
8,919,847✔
1185
        SBrinRecord record;
8,802,490✔
1186
        code = tBrinBlockGet(writer->ctx->brinBlock, writer->ctx->brinBlockIdx, &record);
8,802,490✔
1187
        TSDB_CHECK_CODE(code, lino, _exit);
8,802,490✔
1188
        if (record.uid != writer->ctx->tbid->uid) {
8,802,490✔
1189
          writer->ctx->tbHasOldData = false;
191,929✔
1190
          goto _exit;
191,929✔
1191
        }
1192

1193
        if (tsdbRowKeyCmprNullAsLargest(key, &record.firstKey) < 0) {  // key < record->firstKey
8,610,561✔
1194
          goto _exit;
4,027,305✔
1195
        } else {
1196
          SBrinRecord record[1];
4,583,256✔
1197
          code = tBrinBlockGet(writer->ctx->brinBlock, writer->ctx->brinBlockIdx, record);
4,583,256✔
1198
          TSDB_CHECK_CODE(code, lino, _exit);
4,583,256✔
1199
          if (tsdbRowKeyCmprNullAsLargest(key, &record->lastKey) > 0) {  // key > record->lastKey
4,583,256✔
1200
            if (writer->blockData->nRow > 0) {
2,196,046✔
1201
              TAOS_CHECK_GOTO(tsdbDataFileDoWriteBlockData(writer, writer->blockData), &lino, _exit);
1,621✔
1202
            }
1203

1204
            TAOS_CHECK_GOTO(tsdbDataFileWriteBrinRecord(writer, record), &lino, _exit);
2,196,046✔
1205
          } else {
1206
            TAOS_CHECK_GOTO(tsdbDataFileReadBlockData(writer->ctx->reader, record, writer->ctx->blockData), &lino,
2,387,210✔
1207
                            _exit);
1208

1209
            writer->ctx->blockDataIdx = 0;
2,387,210✔
1210
            writer->ctx->brinBlockIdx++;
2,387,210✔
1211
            break;
2,387,210✔
1212
          }
1213
        }
1214
      }
1215
    }
1216

1217
    // SBrinBlk
1218
    if (writer->ctx->brinBlkArrayIdx >= TARRAY2_SIZE(writer->ctx->brinBlkArray)) {
189,452✔
1219
      writer->ctx->brinBlkArray = NULL;
182,337✔
1220
      writer->ctx->tbHasOldData = false;
182,337✔
1221
      goto _exit;
182,337✔
1222
    } else {
1223
      const SBrinBlk *brinBlk = TARRAY2_GET_PTR(writer->ctx->brinBlkArray, writer->ctx->brinBlkArrayIdx);
7,115✔
1224

1225
      if (brinBlk->minTbid.uid != writer->ctx->tbid->uid) {
7,115✔
1226
        writer->ctx->tbHasOldData = false;
×
1227
        goto _exit;
×
1228
      }
1229

1230
      TAOS_CHECK_GOTO(tsdbDataFileReadBrinBlock(writer->ctx->reader, brinBlk, writer->ctx->brinBlock), &lino, _exit);
7,115✔
1231

1232
      writer->ctx->brinBlockIdx = 0;
7,115✔
1233
      writer->ctx->brinBlkArrayIdx++;
7,115✔
1234
    }
1235
  }
1236

1237
_exit:
2,147,483,647✔
1238
  if (code) {
2,147,483,647✔
1239
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1240
              tstrerror(code));
1241
  }
1242
  return code;
2,147,483,647✔
1243
}
1244

1245
static int32_t tsdbDataFileDoWriteTSData(SDataFileWriter *writer, TSDBROW *row) {
2,147,483,647✔
1246
  int32_t code = 0;
2,147,483,647✔
1247
  int32_t lino = 0;
2,147,483,647✔
1248

1249
  if (writer->ctx->tbHasOldData) {
2,147,483,647✔
1250
    STsdbRowKey key;
2,147,483,647✔
1251
    tsdbRowGetKey(row, &key);
2,147,483,647✔
1252
    TAOS_CHECK_GOTO(tsdbDataFileDoWriteTableOldData(writer, &key), &lino, _exit);
2,147,483,647✔
1253
  }
1254

1255
  TAOS_CHECK_GOTO(tsdbDataFileDoWriteTSRow(writer, row), &lino, _exit);
2,147,483,647✔
1256

1257
_exit:
2,147,483,647✔
1258
  if (code) {
2,147,483,647✔
1259
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1260
              tstrerror(code));
1261
  }
1262
  return code;
2,147,483,647✔
1263
}
1264

1265
static int32_t tsdbDataFileWriteTableDataEnd(SDataFileWriter *writer) {
15,089,994✔
1266
  if (writer->ctx->tbid->uid == 0) {
15,089,994✔
1267
    return 0;
690,269✔
1268
  }
1269

1270
  int32_t code = 0;
14,400,059✔
1271
  int32_t lino = 0;
14,400,059✔
1272

1273
  if (writer->ctx->tbHasOldData) {
14,400,076✔
1274
    TAOS_CHECK_GOTO(tsdbDataFileDoWriteTableOldData(writer, NULL /* as the largest key */), &lino, _exit);
140,118✔
1275
  }
1276

1277
  TAOS_CHECK_GOTO(tsdbDataFileDoWriteBlockData(writer, writer->blockData), &lino, _exit);
14,400,410✔
1278

1279
_exit:
14,398,609✔
1280
  if (code) {
14,398,609✔
1281
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1282
              tstrerror(code));
1283
  }
1284
  return code;
14,399,725✔
1285
}
1286

1287
static int32_t tsdbDataFileWriteTableDataBegin(SDataFileWriter *writer, const TABLEID *tbid) {
15,088,786✔
1288
  int32_t code = 0;
15,088,786✔
1289
  int32_t lino = 0;
15,088,786✔
1290

1291
  SMetaInfo info;
15,088,786✔
1292
  bool      drop = false;
15,088,786✔
1293
  TABLEID   tbid1[1];
15,088,786✔
1294
  writer->ctx->tbHasOldData = false;
15,088,786✔
1295
  while (writer->ctx->brinBlkArray) {  // skip data of previous table
15,375,127✔
1296
    for (; writer->ctx->brinBlockIdx < writer->ctx->brinBlock->numOfRecords; writer->ctx->brinBlockIdx++) {
14,657,260✔
1297
      SBrinRecord record;
14,289,343✔
1298
      TAOS_CHECK_GOTO(tBrinBlockGet(writer->ctx->brinBlock, writer->ctx->brinBlockIdx, &record), &lino, _exit);
14,289,343✔
1299

1300
      if (record.uid == tbid->uid) {
14,289,171✔
1301
        writer->ctx->tbHasOldData = true;
374,266✔
1302
        goto _begin;
374,266✔
1303
      } else if (record.suid > tbid->suid || (record.suid == tbid->suid && record.uid > tbid->uid)) {
13,915,077✔
1304
        goto _begin;
2,374,140✔
1305
      } else {
1306
        if (record.uid != writer->ctx->tbid->uid) {
11,540,937✔
1307
          if (drop && tbid1->uid == record.uid) {
4,167,299✔
1308
            continue;
×
1309
          } else if (metaGetInfo(writer->config->tsdb->pVnode->pMeta, record.uid, &info, NULL) != 0) {
4,167,299✔
1310
            drop = true;
38,941✔
1311
            tbid1->suid = record.suid;
38,941✔
1312
            tbid1->uid = record.uid;
38,941✔
1313
            continue;
38,941✔
1314
          } else {
1315
            drop = false;
4,128,358✔
1316
            writer->ctx->tbid->suid = record.suid;
4,128,358✔
1317
            writer->ctx->tbid->uid = record.uid;
4,128,358✔
1318
          }
1319
        }
1320

1321
        TAOS_CHECK_GOTO(tsdbDataFileWriteBrinRecord(writer, &record), &lino, _exit);
11,501,996✔
1322
      }
1323
    }
1324

1325
    if (writer->ctx->brinBlkArrayIdx >= TARRAY2_SIZE(writer->ctx->brinBlkArray)) {
367,917✔
1326
      writer->ctx->brinBlkArray = NULL;
83,135✔
1327
      break;
83,135✔
1328
    } else {
1329
      const SBrinBlk *brinBlk = TARRAY2_GET_PTR(writer->ctx->brinBlkArray, writer->ctx->brinBlkArrayIdx);
284,782✔
1330

1331
      TAOS_CHECK_GOTO(tsdbDataFileReadBrinBlock(writer->ctx->reader, brinBlk, writer->ctx->brinBlock), &lino, _exit);
284,782✔
1332

1333
      writer->ctx->brinBlockIdx = 0;
284,782✔
1334
      writer->ctx->brinBlkArrayIdx++;
284,782✔
1335
    }
1336
  }
1337

1338
_begin:
15,090,679✔
1339
  writer->ctx->tbid[0] = *tbid;
15,090,328✔
1340

1341
  if (tbid->uid == INT64_MAX) {
15,090,679✔
1342
    goto _exit;
690,269✔
1343
  }
1344

1345
  TAOS_CHECK_GOTO(tsdbUpdateSkmTb(writer->config->tsdb, tbid, writer->config->skmTb), &lino, _exit);
14,399,536✔
1346
  TAOS_CHECK_GOTO(tBlockDataInit(writer->blockData, writer->ctx->tbid, writer->config->skmTb->pTSchema, NULL, 0), &lino,
14,398,851✔
1347
                  _exit);
1348

1349
_exit:
15,090,328✔
1350
  if (code) {
15,090,328✔
1351
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1352
              tstrerror(code));
1353
  }
1354
  return code;
15,090,328✔
1355
}
1356

1357
int32_t tsdbFileWriteHeadFooter(STsdbFD *fd, int64_t *fileSize, const SHeadFooter *footer, SEncryptData *encryptData) {
690,269✔
1358
  TAOS_CHECK_RETURN(tsdbWriteFile(fd, *fileSize, (const uint8_t *)footer, sizeof(*footer), encryptData));
690,269✔
1359
  *fileSize += sizeof(*footer);
690,269✔
1360
  return 0;
690,269✔
1361
}
1362

1363
int32_t tsdbFileWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAlg, int64_t *fileSize,
1,744,427✔
1364
                               TTombBlkArray *tombBlkArray, SBuffer *buffers, SVersionRange *range,
1365
                               SEncryptData *encryptData) {
1366
  int32_t code;
1367

1368
  if (TOMB_BLOCK_SIZE(tombBlock) == 0) {
1,744,427✔
1369
    return 0;
×
1370
  }
1371

1372
  SBuffer *buffer0 = buffers + 0;
1,744,427✔
1373
  SBuffer *assist = buffers + 1;
1,744,427✔
1374

1375
  STombBlk tombBlk = {
1,744,427✔
1376
      .dp[0] =
1377
          {
1378
              .offset = *fileSize,
1,744,427✔
1379
              .size = 0,
1380
          },
1381
      .numRec = TOMB_BLOCK_SIZE(tombBlock),
1,744,427✔
1382
      .cmprAlg = cmprAlg,
1383
  };
1384
  for (int i = 0; i < TOMB_BLOCK_SIZE(tombBlock); i++) {
42,552,289✔
1385
    STombRecord record;
40,807,862✔
1386
    TAOS_CHECK_RETURN(tTombBlockGet(tombBlock, i, &record));
40,807,862✔
1387

1388
    if (i == 0) {
40,807,862✔
1389
      tombBlk.minTbid.suid = record.suid;
1,744,427✔
1390
      tombBlk.minTbid.uid = record.uid;
1,744,427✔
1391
      tombBlk.minVer = record.version;
1,744,427✔
1392
      tombBlk.maxVer = record.version;
1,744,427✔
1393
    }
1394
    if (i == TOMB_BLOCK_SIZE(tombBlock) - 1) {
40,807,862✔
1395
      tombBlk.maxTbid.suid = record.suid;
1,744,427✔
1396
      tombBlk.maxTbid.uid = record.uid;
1,744,427✔
1397
    }
1398
    if (record.version < tombBlk.minVer) {
40,807,862✔
1399
      tombBlk.minVer = record.version;
3,178✔
1400
    }
1401
    if (record.version > tombBlk.maxVer) {
40,807,862✔
1402
      tombBlk.maxVer = record.version;
36,735,403✔
1403
    }
1404
  }
1405

1406
  tsdbWriterUpdVerRange(range, tombBlk.minVer, tombBlk.maxVer);
1,744,427✔
1407

1408
  for (int32_t i = 0; i < ARRAY_SIZE(tombBlock->buffers); i++) {
10,466,562✔
1409
    tBufferClear(buffer0);
1410

1411
    SCompressInfo cinfo = {
8,722,135✔
1412
        .cmprAlg = cmprAlg,
1413
        .dataType = TSDB_DATA_TYPE_BIGINT,
1414
        .originalSize = tombBlock->buffers[i].size,
8,722,135✔
1415
    };
1416
    TAOS_CHECK_RETURN(tCompressDataToBuffer(tombBlock->buffers[i].data, &cinfo, buffer0, assist));
8,722,135✔
1417
    TAOS_CHECK_RETURN(tsdbWriteFile(fd, *fileSize, buffer0->data, buffer0->size, encryptData));
8,722,135✔
1418

1419
    tombBlk.size[i] = cinfo.compressedSize;
8,722,135✔
1420
    tombBlk.dp->size += tombBlk.size[i];
8,722,135✔
1421
    *fileSize += tombBlk.size[i];
8,722,135✔
1422
  }
1423

1424
  TAOS_CHECK_RETURN(TARRAY2_APPEND_PTR(tombBlkArray, &tombBlk));
3,488,854✔
1425

1426
  tTombBlockClear(tombBlock);
1,744,427✔
1427
  return 0;
1,744,427✔
1428
}
1429

1430
static int32_t tsdbDataFileWriteHeadFooter(SDataFileWriter *writer) {
690,269✔
1431
  int32_t code = 0;
690,269✔
1432
  int32_t lino = 0;
690,269✔
1433

1434
  SEncryptData *pEncryptData = &(writer->config->tsdb->pVnode->config.tsdbCfg.encryptData);
690,269✔
1435

1436
  TAOS_CHECK_GOTO(tsdbFileWriteHeadFooter(writer->fd[TSDB_FTYPE_HEAD], &writer->files[TSDB_FTYPE_HEAD].size,
690,269✔
1437
                                          writer->headFooter, pEncryptData),
1438
                  &lino, _exit);
1439

1440
_exit:
690,269✔
1441
  if (code) {
690,269✔
1442
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1443
              tstrerror(code));
1444
  }
1445
  return code;
690,269✔
1446
}
1447

1448
static int32_t tsdbDataFileDoWriteTombBlock(SDataFileWriter *writer) {
143,694✔
1449
  if (TOMB_BLOCK_SIZE(writer->tombBlock) == 0) return 0;
143,694✔
1450

1451
  int32_t code = 0;
143,694✔
1452
  int32_t lino = 0;
143,694✔
1453

1454
  SEncryptData *pEncryptData = &(writer->config->tsdb->pVnode->config.tsdbCfg.encryptData);
143,694✔
1455

1456
  TAOS_CHECK_GOTO(tsdbFileWriteTombBlock(writer->fd[TSDB_FTYPE_TOMB], writer->tombBlock, writer->config->cmprAlg,
143,694✔
1457
                                         &writer->files[TSDB_FTYPE_TOMB].size, writer->tombBlkArray, writer->buffers,
1458
                                         &writer->ctx->tombRange, pEncryptData),
1459
                  &lino, _exit);
1460

1461
_exit:
143,694✔
1462
  if (code) {
143,694✔
1463
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1464
              tstrerror(code));
1465
  }
1466
  return code;
143,694✔
1467
}
1468

1469
int32_t tsdbFileWriteTombBlk(STsdbFD *fd, const TTombBlkArray *tombBlkArray, SFDataPtr *ptr, int64_t *fileSize,
19,065,217✔
1470
                             SEncryptData *encryptData) {
1471
  ptr->size = TARRAY2_DATA_LEN(tombBlkArray);
19,065,217✔
1472
  if (ptr->size > 0) {
19,076,061✔
1473
    ptr->offset = *fileSize;
1,744,427✔
1474

1475
    TAOS_CHECK_RETURN(
1,744,427✔
1476
        tsdbWriteFile(fd, *fileSize, (const uint8_t *)TARRAY2_DATA(tombBlkArray), ptr->size, encryptData));
1477

1478
    *fileSize += ptr->size;
1,744,427✔
1479
  }
1480
  return 0;
19,072,741✔
1481
}
1482

1483
static int32_t tsdbDataFileDoWriteTombBlk(SDataFileWriter *writer) {
143,694✔
1484
  if (TARRAY2_SIZE(writer->tombBlkArray) <= 0) {
143,694✔
1485
    return TSDB_CODE_INVALID_PARA;
×
1486
  }
1487

1488
  int32_t code = 0;
143,694✔
1489
  int32_t lino = 0;
143,694✔
1490

1491
  SEncryptData *pEncryptData = &(writer->config->tsdb->pVnode->config.tsdbCfg.encryptData);
143,694✔
1492

1493
  TAOS_CHECK_GOTO(
143,694✔
1494
      tsdbFileWriteTombBlk(writer->fd[TSDB_FTYPE_TOMB], writer->tombBlkArray, writer->tombFooter->tombBlkPtr,
1495
                           &writer->files[TSDB_FTYPE_TOMB].size, pEncryptData),
1496
      &lino, _exit);
1497

1498
_exit:
143,694✔
1499
  if (code) {
143,694✔
1500
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1501
              tstrerror(code));
1502
  }
1503
  return code;
143,694✔
1504
}
1505

1506
int32_t tsdbFileWriteTombFooter(STsdbFD *fd, const STombFooter *footer, int64_t *fileSize, SEncryptData *encryptData) {
143,694✔
1507
  TAOS_CHECK_RETURN(tsdbWriteFile(fd, *fileSize, (const uint8_t *)footer, sizeof(*footer), encryptData));
143,694✔
1508
  *fileSize += sizeof(*footer);
143,694✔
1509
  return 0;
143,694✔
1510
}
1511

1512
static int32_t tsdbDataFileWriteTombFooter(SDataFileWriter *writer) {
143,694✔
1513
  int32_t code = 0;
143,694✔
1514
  int32_t lino = 0;
143,694✔
1515

1516
  SEncryptData *pEncryptData = &(writer->config->tsdb->pVnode->config.tsdbCfg.encryptData);
143,694✔
1517

1518
  TAOS_CHECK_GOTO(tsdbFileWriteTombFooter(writer->fd[TSDB_FTYPE_TOMB], writer->tombFooter,
143,694✔
1519
                                          &writer->files[TSDB_FTYPE_TOMB].size, pEncryptData),
1520
                  &lino, _exit);
1521

1522
_exit:
143,694✔
1523
  if (code) {
143,694✔
1524
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1525
              tstrerror(code));
1526
  }
1527
  return code;
143,694✔
1528
}
1529

1530
static int32_t tsdbDataFileDoWriteTombRecord(SDataFileWriter *writer, const STombRecord *record) {
560,660✔
1531
  int32_t code = 0;
560,660✔
1532
  int32_t lino = 0;
560,660✔
1533

1534
  while (writer->ctx->hasOldTomb) {
666,857✔
1535
    for (; writer->ctx->tombBlockIdx < TOMB_BLOCK_SIZE(writer->ctx->tombBlock); writer->ctx->tombBlockIdx++) {
34,166,748✔
1536
      STombRecord record1[1];
33,954,354✔
1537
      TAOS_CHECK_GOTO(tTombBlockGet(writer->ctx->tombBlock, writer->ctx->tombBlockIdx, record1), &lino, _exit);
33,954,354✔
1538

1539
      int32_t c = tTombRecordCompare(record, record1);
33,954,354✔
1540
      if (c < 0) {
33,954,354✔
1541
        goto _write;
×
1542
      } else if (c > 0) {
33,954,354✔
1543
        TAOS_CHECK_GOTO(tTombBlockPut(writer->tombBlock, record1), &lino, _exit);
33,954,354✔
1544

1545
        tsdbTrace("vgId:%d write tomb record to tomb file:%s, cid:%" PRId64 ", suid:%" PRId64 ", uid:%" PRId64
33,954,354✔
1546
                  ", version:%" PRId64,
1547
                  TD_VID(writer->config->tsdb->pVnode), writer->fd[TSDB_FTYPE_TOMB]->path, writer->config->cid,
1548
                  record1->suid, record1->uid, record1->version);
1549

1550
        if (TOMB_BLOCK_SIZE(writer->tombBlock) >= writer->config->maxRow) {
33,954,354✔
1551
          TAOS_CHECK_GOTO(tsdbDataFileDoWriteTombBlock(writer), &lino, _exit);
×
1552
        }
1553
      } else {
1554
        tsdbError("vgId:%d duplicate tomb record, cid:%" PRId64 ", suid:%" PRId64 ", uid:%" PRId64 ", version:%" PRId64,
×
1555
                  TD_VID(writer->config->tsdb->pVnode), writer->config->cid, record->suid, record->uid,
1556
                  record->version);
1557
      }
1558
    }
1559

1560
    if (writer->ctx->tombBlkArrayIdx >= TARRAY2_SIZE(writer->ctx->tombBlkArray)) {
212,394✔
1561
      writer->ctx->hasOldTomb = false;
106,197✔
1562
      break;
106,197✔
1563
    } else {
1564
      const STombBlk *tombBlk = TARRAY2_GET_PTR(writer->ctx->tombBlkArray, writer->ctx->tombBlkArrayIdx);
106,197✔
1565

1566
      TAOS_CHECK_GOTO(tsdbDataFileReadTombBlock(writer->ctx->reader, tombBlk, writer->ctx->tombBlock), &lino, _exit);
106,197✔
1567

1568
      writer->ctx->tombBlockIdx = 0;
106,197✔
1569
      writer->ctx->tombBlkArrayIdx++;
106,197✔
1570
    }
1571
  }
1572

1573
_write:
560,660✔
1574
  if (record->suid == INT64_MAX) {
560,660✔
1575
    goto _exit;
143,694✔
1576
  }
1577

1578
  TAOS_CHECK_GOTO(tTombBlockPut(writer->tombBlock, record), &lino, _exit);
416,966✔
1579

1580
  tsdbTrace("vgId:%d write tomb record to tomb file:%s, cid:%" PRId64 ", suid:%" PRId64 ", uid:%" PRId64
416,966✔
1581
            ", version:%" PRId64,
1582
            TD_VID(writer->config->tsdb->pVnode), writer->fd[TSDB_FTYPE_TOMB]->path, writer->config->cid, record->suid,
1583
            record->uid, record->version);
1584

1585
  if (TOMB_BLOCK_SIZE(writer->tombBlock) >= writer->config->maxRow) {
416,966✔
1586
    TAOS_CHECK_GOTO(tsdbDataFileDoWriteTombBlock(writer), &lino, _exit);
×
1587
  }
1588

1589
_exit:
560,660✔
1590
  if (code) {
560,660✔
1591
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1592
              tstrerror(code));
1593
  }
1594
  return code;
560,660✔
1595
}
1596

1597
int32_t tsdbFileWriteBrinBlk(STsdbFD *fd, TBrinBlkArray *brinBlkArray, SFDataPtr *ptr, int64_t *fileSize,
690,269✔
1598
                             SEncryptData *encryptData) {
1599
  if (TARRAY2_SIZE(brinBlkArray) <= 0) {
690,269✔
1600
    return TSDB_CODE_INVALID_PARA;
×
1601
  }
1602
  ptr->offset = *fileSize;
690,269✔
1603
  ptr->size = TARRAY2_DATA_LEN(brinBlkArray);
689,782✔
1604

1605
  TAOS_CHECK_RETURN(tsdbWriteFile(fd, ptr->offset, (uint8_t *)TARRAY2_DATA(brinBlkArray), ptr->size, encryptData));
689,782✔
1606

1607
  *fileSize += ptr->size;
689,782✔
1608
  return 0;
690,269✔
1609
}
1610

1611
static int32_t tsdbDataFileWriteBrinBlk(SDataFileWriter *writer) {
690,269✔
1612
  int32_t code = 0;
690,269✔
1613
  int32_t lino = 0;
690,269✔
1614

1615
  SEncryptData *pEncryptData = &(writer->config->tsdb->pVnode->config.tsdbCfg.encryptData);
690,269✔
1616

1617
  TAOS_CHECK_GOTO(
690,269✔
1618
      tsdbFileWriteBrinBlk(writer->fd[TSDB_FTYPE_HEAD], writer->brinBlkArray, writer->headFooter->brinBlkPtr,
1619
                           &writer->files[TSDB_FTYPE_HEAD].size, pEncryptData),
1620
      &lino, _exit);
1621

1622
_exit:
689,782✔
1623
  if (code) {
690,269✔
1624
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1625
              tstrerror(code));
1626
  }
1627
  return code;
690,269✔
1628
}
1629

1630
void tsdbTFileUpdVerRange(STFile *f, SVersionRange range) {
21,967,742✔
1631
  f->minVer = TMIN(f->minVer, range.minVer);
21,967,742✔
1632
  f->maxVer = TMAX(f->maxVer, range.maxVer);
21,972,786✔
1633
}
21,970,308✔
1634

1635
static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer, TFileOpArray *opArr) {
831,918✔
1636
  int32_t code = 0;
831,918✔
1637
  int32_t lino = 0;
831,918✔
1638

1639
  int32_t  ftype;
1640
  STFileOp op;
831,918✔
1641

1642
  if (writer->fd[TSDB_FTYPE_HEAD]) {
831,918✔
1643
    TABLEID tbid[1] = {{
690,269✔
1644
        .suid = INT64_MAX,
1645
        .uid = INT64_MAX,
1646
    }};
1647

1648
    TAOS_CHECK_GOTO(tsdbDataFileWriteTableDataEnd(writer), &lino, _exit);
690,269✔
1649
    TAOS_CHECK_GOTO(tsdbDataFileWriteTableDataBegin(writer, tbid), &lino, _exit);
690,269✔
1650
    TAOS_CHECK_GOTO(tsdbDataFileWriteBrinBlock(writer), &lino, _exit);
690,269✔
1651
    TAOS_CHECK_GOTO(tsdbDataFileWriteBrinBlk(writer), &lino, _exit);
690,269✔
1652
    TAOS_CHECK_GOTO(tsdbDataFileWriteHeadFooter(writer), &lino, _exit);
690,269✔
1653

1654
    SVersionRange ofRange = {.minVer = VERSION_MAX, .maxVer = VERSION_MIN};
689,782✔
1655

1656
    // .head
1657
    ftype = TSDB_FTYPE_HEAD;
689,782✔
1658
    if (writer->config->files[ftype].exist) {
689,782✔
1659
      op = (STFileOp){
265,472✔
1660
          .optype = TSDB_FOP_REMOVE,
1661
          .fid = writer->config->fid,
265,472✔
1662
          .of = writer->config->files[ftype].file,
265,472✔
1663
      };
1664
      ofRange = (SVersionRange){.minVer = op.of.minVer, .maxVer = op.of.maxVer};
265,472✔
1665
      TAOS_CHECK_GOTO(TARRAY2_APPEND(opArr, op), &lino, _exit);
530,944✔
1666
    }
1667
    op = (STFileOp){
689,782✔
1668
        .optype = TSDB_FOP_CREATE,
1669
        .fid = writer->config->fid,
690,269✔
1670
        .nf = writer->files[ftype],
690,269✔
1671
    };
1672
    tsdbTFileUpdVerRange(&op.nf, ofRange);
689,782✔
1673
    tsdbTFileUpdVerRange(&op.nf, writer->ctx->range);
690,269✔
1674
    TAOS_CHECK_GOTO(TARRAY2_APPEND(opArr, op), &lino, _exit);
1,380,051✔
1675

1676
    // .data
1677
    ftype = TSDB_FTYPE_DATA;
689,782✔
1678
    if (!writer->config->files[ftype].exist) {
689,782✔
1679
      op = (STFileOp){
424,797✔
1680
          .optype = TSDB_FOP_CREATE,
1681
          .fid = writer->config->fid,
424,310✔
1682
          .nf = writer->files[ftype],
424,797✔
1683
      };
1684
      tsdbTFileUpdVerRange(&op.nf, writer->ctx->range);
424,797✔
1685
      TAOS_CHECK_GOTO(TARRAY2_APPEND(opArr, op), &lino, _exit);
849,594✔
1686
    } else if (writer->config->files[ftype].file.size != writer->files[ftype].size) {
265,472✔
1687
      op = (STFileOp){
265,472✔
1688
          .optype = TSDB_FOP_MODIFY,
1689
          .fid = writer->config->fid,
265,472✔
1690
          .of = writer->config->files[ftype].file,
265,472✔
1691
          .nf = writer->files[ftype],
265,472✔
1692
      };
1693
      tsdbTFileUpdVerRange(&op.nf, writer->ctx->range);
265,472✔
1694
      TAOS_CHECK_GOTO(TARRAY2_APPEND(opArr, op), &lino, _exit);
530,944✔
1695
    }
1696

1697
    // .sma
1698
    ftype = TSDB_FTYPE_SMA;
690,269✔
1699
    if (!writer->config->files[ftype].exist) {
690,269✔
1700
      op = (STFileOp){
424,797✔
1701
          .optype = TSDB_FOP_CREATE,
1702
          .fid = writer->config->fid,
424,797✔
1703
          .nf = writer->files[ftype],
424,797✔
1704
      };
1705
      tsdbTFileUpdVerRange(&op.nf, writer->ctx->range);
424,797✔
1706
      TAOS_CHECK_GOTO(TARRAY2_APPEND(opArr, op), &lino, _exit);
849,107✔
1707
    } else if (writer->config->files[ftype].file.size != writer->files[ftype].size) {
265,472✔
1708
      op = (STFileOp){
265,472✔
1709
          .optype = TSDB_FOP_MODIFY,
1710
          .fid = writer->config->fid,
265,472✔
1711
          .of = writer->config->files[ftype].file,
265,472✔
1712
          .nf = writer->files[ftype],
265,472✔
1713
      };
1714
      tsdbTFileUpdVerRange(&op.nf, writer->ctx->range);
265,472✔
1715
      TAOS_CHECK_GOTO(TARRAY2_APPEND(opArr, op), &lino, _exit);
530,944✔
1716
    }
1717
  }
1718

1719
  if (writer->fd[TSDB_FTYPE_TOMB]) {
831,918✔
1720
    STombRecord record[1] = {{
143,694✔
1721
        .suid = INT64_MAX,
1722
        .uid = INT64_MAX,
1723
        .version = INT64_MAX,
1724
    }};
1725

1726
    TAOS_CHECK_GOTO(tsdbDataFileDoWriteTombRecord(writer, record), &lino, _exit);
143,694✔
1727
    TAOS_CHECK_GOTO(tsdbDataFileDoWriteTombBlock(writer), &lino, _exit);
143,694✔
1728
    TAOS_CHECK_GOTO(tsdbDataFileDoWriteTombBlk(writer), &lino, _exit);
143,694✔
1729
    TAOS_CHECK_GOTO(tsdbDataFileWriteTombFooter(writer), &lino, _exit);
143,694✔
1730

1731
    SVersionRange ofRange = (SVersionRange){.minVer = VERSION_MAX, .maxVer = VERSION_MIN};
143,694✔
1732

1733
    ftype = TSDB_FTYPE_TOMB;
143,694✔
1734
    if (writer->config->files[ftype].exist) {
143,694✔
1735
      op = (STFileOp){
106,197✔
1736
          .optype = TSDB_FOP_REMOVE,
1737
          .fid = writer->config->fid,
106,197✔
1738
          .of = writer->config->files[ftype].file,
106,197✔
1739
      };
1740
      ofRange = (SVersionRange){.minVer = op.of.minVer, .maxVer = op.of.maxVer};
106,197✔
1741
      TAOS_CHECK_GOTO(TARRAY2_APPEND(opArr, op), &lino, _exit);
212,394✔
1742
    }
1743
    op = (STFileOp){
143,694✔
1744
        .optype = TSDB_FOP_CREATE,
1745
        .fid = writer->config->fid,
143,694✔
1746
        .nf = writer->files[ftype],
143,694✔
1747
    };
1748
    tsdbTFileUpdVerRange(&op.nf, ofRange);
143,694✔
1749
    tsdbTFileUpdVerRange(&op.nf, writer->ctx->tombRange);
143,694✔
1750
    TAOS_CHECK_GOTO(TARRAY2_APPEND(opArr, op), &lino, _exit);
287,388✔
1751
  }
1752
  SEncryptData *pEncryptData = &(writer->config->tsdb->pVnode->config.tsdbCfg.encryptData);
831,918✔
1753
  for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) {
4,158,436✔
1754
    if (writer->fd[i]) {
3,326,518✔
1755
      TAOS_CHECK_GOTO(tsdbFsyncFile(writer->fd[i], pEncryptData), &lino, _exit);
2,214,501✔
1756
      tsdbCloseFile(&writer->fd[i]);
2,214,501✔
1757
    }
1758
  }
1759

1760
_exit:
831,918✔
1761
  if (code) {
830,764✔
1762
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1763
              tstrerror(code));
1764
  }
1765
  return code;
830,764✔
1766
}
1767

1768
static int32_t tsdbDataFileWriterOpenDataFD(SDataFileWriter *writer) {
690,269✔
1769
  int32_t code = 0;
690,269✔
1770
  int32_t lino = 0;
690,269✔
1771

1772
  int32_t ftypes[] = {TSDB_FTYPE_HEAD, TSDB_FTYPE_DATA, TSDB_FTYPE_SMA};
690,269✔
1773

1774
  for (int32_t i = 0; i < ARRAY_SIZE(ftypes); ++i) {
2,761,076✔
1775
    int32_t ftype = ftypes[i];
2,070,807✔
1776

1777
    char    fname[TSDB_FILENAME_LEN];
2,070,320✔
1778
    int32_t flag = TD_FILE_READ | TD_FILE_WRITE;
2,070,320✔
1779

1780
    if (writer->files[ftype].size == 0) {
2,070,320✔
1781
      flag |= (TD_FILE_CREATE | TD_FILE_TRUNC);
1,539,863✔
1782
    }
1783

1784
    int32_t lcn = writer->files[ftype].lcn;
2,070,807✔
1785
    tsdbTFileName(writer->config->tsdb, &writer->files[ftype], fname);
2,070,807✔
1786
    TAOS_CHECK_GOTO(tsdbOpenFile(fname, writer->config->tsdb, flag, &writer->fd[ftype], lcn), &lino, _exit);
2,070,807✔
1787

1788
    if (writer->files[ftype].size == 0) {
2,070,807✔
1789
      uint8_t hdr[TSDB_FHDR_SIZE] = {0};
1,539,863✔
1790

1791
      SEncryptData *pEncryptData = &(writer->config->tsdb->pVnode->config.tsdbCfg.encryptData);
1,539,863✔
1792

1793
      TAOS_CHECK_GOTO(tsdbWriteFile(writer->fd[ftype], 0, hdr, TSDB_FHDR_SIZE, pEncryptData), &lino, _exit);
1,539,863✔
1794

1795
      writer->files[ftype].size += TSDB_FHDR_SIZE;
1,539,863✔
1796
    }
1797
  }
1798

1799
  if (writer->ctx->reader) {
690,269✔
1800
    TAOS_CHECK_GOTO(tsdbDataFileReadBrinBlk(writer->ctx->reader, &writer->ctx->brinBlkArray), &lino, _exit);
265,472✔
1801
  }
1802

1803
_exit:
690,269✔
1804
  if (code) {
690,269✔
1805
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1806
              tstrerror(code));
1807
  }
1808
  return code;
690,269✔
1809
}
1810

1811
int32_t tsdbDataFileWriterOpen(const SDataFileWriterConfig *config, SDataFileWriter **writer) {
4,541,303✔
1812
  writer[0] = taosMemoryCalloc(1, sizeof(*writer[0]));
4,541,303✔
1813
  if (!writer[0]) {
4,532,331✔
1814
    return terrno;
×
1815
  }
1816

1817
  writer[0]->config[0] = config[0];
4,532,331✔
1818
  return 0;
4,541,674✔
1819
}
1820

1821
int32_t tsdbDataFileWriterClose(SDataFileWriter **writer, bool abort, TFileOpArray *opArr) {
4,542,648✔
1822
  if (writer == NULL || writer[0] == NULL) return 0;
4,542,648✔
1823

1824
  int32_t code = 0;
4,543,135✔
1825
  int32_t lino = 0;
4,543,135✔
1826

1827
  if (writer[0]->ctx->opened) {
4,544,109✔
1828
    if (abort) {
831,918✔
1829
      TAOS_CHECK_GOTO(tsdbDataFileWriterCloseAbort(writer[0]), &lino, _exit);
×
1830
    } else {
1831
      TAOS_CHECK_GOTO(tsdbDataFileWriterCloseCommit(writer[0], opArr), &lino, _exit);
831,918✔
1832
    }
1833
    tsdbDataFileWriterDoClose(writer[0]);
831,341✔
1834
  }
1835
  taosMemoryFree(writer[0]);
4,544,596✔
1836
  writer[0] = NULL;
4,543,135✔
1837

1838
_exit:
4,543,622✔
1839
  if (code) {
4,543,622✔
1840
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer[0]->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1841
              tstrerror(code));
1842
  }
1843
  return code;
4,543,135✔
1844
}
1845

1846
int32_t tsdbDataFileWriteRow(SDataFileWriter *writer, SRowInfo *row) {
2,147,483,647✔
1847
  int32_t code = 0;
2,147,483,647✔
1848
  int32_t lino = 0;
2,147,483,647✔
1849

1850
  if (!writer->ctx->opened) {
2,147,483,647✔
1851
    TAOS_CHECK_GOTO(tsdbDataFileWriterDoOpen(writer), &lino, _exit);
86,023✔
1852
  }
1853

1854
  if (writer->fd[TSDB_FTYPE_HEAD] == NULL) {
2,147,483,647✔
1855
    TAOS_CHECK_GOTO(tsdbDataFileWriterOpenDataFD(writer), &lino, _exit);
86,023✔
1856
  }
1857

1858
  if (row->uid != writer->ctx->tbid->uid) {
2,147,483,647✔
1859
    TAOS_CHECK_GOTO(tsdbDataFileWriteTableDataEnd(writer), &lino, _exit);
173,270✔
1860
    TAOS_CHECK_GOTO(tsdbDataFileWriteTableDataBegin(writer, (TABLEID *)row), &lino, _exit);
173,270✔
1861
  }
1862

1863
  TAOS_CHECK_GOTO(tsdbDataFileDoWriteTSData(writer, &row->row), &lino, _exit);
2,147,483,647✔
1864

1865
_exit:
2,147,483,647✔
1866
  if (code) {
2,147,483,647✔
1867
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1868
              tstrerror(code));
1869
  }
1870
  return code;
2,147,483,647✔
1871
}
1872

1873
int32_t tsdbDataFileWriteBlockData(SDataFileWriter *writer, SBlockData *bData) {
32,466,854✔
1874
  if (bData->nRow == 0) {
32,466,854✔
1875
    return 0;
×
1876
  }
1877

1878
  int32_t code = 0;
32,466,103✔
1879
  int32_t lino = 0;
32,466,103✔
1880

1881
  if (!bData->uid) {
32,466,103✔
1882
    return TSDB_CODE_INVALID_PARA;
×
1883
  }
1884

1885
  if (!writer->ctx->opened) {
32,466,086✔
1886
    TAOS_CHECK_GOTO(tsdbDataFileWriterDoOpen(writer), &lino, _exit);
603,331✔
1887
  }
1888

1889
  if (writer->fd[TSDB_FTYPE_DATA] == NULL) {
32,466,854✔
1890
    TAOS_CHECK_GOTO(tsdbDataFileWriterOpenDataFD(writer), &lino, _exit);
604,246✔
1891
  }
1892

1893
  if (bData->uid != writer->ctx->tbid->uid) {
32,466,602✔
1894
    TAOS_CHECK_GOTO(tsdbDataFileWriteTableDataEnd(writer), &lino, _exit);
14,226,968✔
1895
    TAOS_CHECK_GOTO(tsdbDataFileWriteTableDataBegin(writer, (TABLEID *)bData), &lino, _exit);
14,226,104✔
1896
  }
1897

1898
  if (writer->ctx->tbHasOldData) {
32,466,946✔
1899
    STsdbRowKey key;
2,247,435✔
1900

1901
    tsdbRowGetKey(&tsdbRowFromBlockData(bData, 0), &key);
2,247,435✔
1902
    TAOS_CHECK_GOTO(tsdbDataFileDoWriteTableOldData(writer, &key), &lino, _exit);
2,247,435✔
1903
  }
1904

1905
  if (!writer->ctx->tbHasOldData       //
32,467,539✔
1906
      && writer->blockData->nRow == 0  //
30,444,323✔
1907
  ) {
1908
    TAOS_CHECK_GOTO(tsdbDataFileDoWriteBlockData(writer, bData), &lino, _exit);
30,305,624✔
1909

1910
  } else {
1911
    for (int32_t i = 0; i < bData->nRow; ++i) {
2,147,483,647✔
1912
      TSDBROW row[1] = {tsdbRowFromBlockData(bData, i)};
2,147,483,647✔
1913
      TAOS_CHECK_GOTO(tsdbDataFileDoWriteTSData(writer, row), &lino, _exit);
2,147,483,647✔
1914
    }
1915
  }
1916

1917
_exit:
32,452,340✔
1918
  if (code) {
32,456,079✔
1919
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1920
              tstrerror(code));
1921
  }
1922
  return code;
32,459,459✔
1923
}
1924

1925
int32_t tsdbDataFileFlush(SDataFileWriter *writer) {
958,858✔
1926
  if (!writer->ctx->opened) {
958,858✔
1927
    return TSDB_CODE_INVALID_PARA;
×
1928
  }
1929

1930
  if (writer->blockData->nRow == 0) return 0;
958,858✔
1931
  if (writer->ctx->tbHasOldData) return 0;
958,858✔
1932

1933
  return tsdbDataFileDoWriteBlockData(writer, writer->blockData);
834,486✔
1934
}
1935

1936
static int32_t tsdbDataFileWriterOpenTombFD(SDataFileWriter *writer) {
143,694✔
1937
  int32_t code = 0;
143,694✔
1938
  int32_t lino = 0;
143,694✔
1939

1940
  char    fname[TSDB_FILENAME_LEN];
143,694✔
1941
  int32_t ftype = TSDB_FTYPE_TOMB;
143,694✔
1942

1943
  int32_t flag = (TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
143,694✔
1944

1945
  int32_t lcn = writer->files[ftype].lcn;
143,694✔
1946
  tsdbTFileName(writer->config->tsdb, writer->files + ftype, fname);
143,694✔
1947

1948
  TAOS_CHECK_GOTO(tsdbOpenFile(fname, writer->config->tsdb, flag, &writer->fd[ftype], lcn), &lino, _exit);
143,694✔
1949

1950
  uint8_t hdr[TSDB_FHDR_SIZE] = {0};
143,694✔
1951
  SEncryptData *pEncryptData = &(writer->config->tsdb->pVnode->config.tsdbCfg.encryptData);
143,694✔
1952

1953
  TAOS_CHECK_GOTO(tsdbWriteFile(writer->fd[ftype], 0, hdr, TSDB_FHDR_SIZE, pEncryptData), &lino, _exit);
143,694✔
1954
  writer->files[ftype].size += TSDB_FHDR_SIZE;
143,694✔
1955

1956
  if (writer->ctx->reader) {
143,694✔
1957
    TAOS_CHECK_GOTO(tsdbDataFileReadTombBlk(writer->ctx->reader, &writer->ctx->tombBlkArray), &lino, _exit);
139,800✔
1958

1959
    if (TARRAY2_SIZE(writer->ctx->tombBlkArray) > 0) {
139,800✔
1960
      writer->ctx->hasOldTomb = true;
106,197✔
1961
    }
1962

1963
    writer->ctx->tombBlkArrayIdx = 0;
139,800✔
1964
    tTombBlockClear(writer->ctx->tombBlock);
139,800✔
1965
    writer->ctx->tombBlockIdx = 0;
139,800✔
1966
  }
1967

1968
_exit:
143,694✔
1969
  if (code) {
143,694✔
1970
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1971
              tstrerror(code));
1972
  }
1973
  return code;
143,694✔
1974
}
1975

1976
int32_t tsdbDataFileWriteTombRecord(SDataFileWriter *writer, const STombRecord *record) {
416,966✔
1977
  int32_t code = 0;
416,966✔
1978
  int32_t lino = 0;
416,966✔
1979

1980
  if (!writer->ctx->opened) {
416,966✔
1981
    TAOS_CHECK_GOTO(tsdbDataFileWriterDoOpen(writer), &lino, _exit);
142,564✔
1982
  }
1983

1984
  if (writer->fd[TSDB_FTYPE_TOMB] == NULL) {
416,966✔
1985
    TAOS_CHECK_GOTO(tsdbDataFileWriterOpenTombFD(writer), &lino, _exit);
143,694✔
1986
  }
1987

1988
  TAOS_CHECK_GOTO(tsdbDataFileDoWriteTombRecord(writer, record), &lino, _exit);
416,966✔
1989

1990
_exit:
416,966✔
1991
  if (code) {
416,966✔
1992
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1993
              tstrerror(code));
1994
  }
1995
  return code;
416,966✔
1996
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc