• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.97
/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tsdbDataFileRW.h"
17
#include "meta.h"
18

19
// SDataFileReader =============================================
20
struct SDataFileReader {
21
  SDataFileReaderConfig config[1];
22

23
  SBuffer  local[10];
24
  SBuffer *buffers;
25

26
  struct {
27
    bool headFooterLoaded;
28
    bool tombFooterLoaded;
29
    bool brinBlkLoaded;
30
    bool tombBlkLoaded;
31
  } ctx[1];
32

33
  STsdbFD *fd[TSDB_FTYPE_MAX];
34

35
  SHeadFooter   headFooter[1];
36
  STombFooter   tombFooter[1];
37
  TBrinBlkArray brinBlkArray[1];
38
  TTombBlkArray tombBlkArray[1];
39
};
40

41
static int32_t tsdbDataFileReadHeadFooter(SDataFileReader *reader) {
179,575✔
42
  if (reader->ctx->headFooterLoaded) {
179,575!
43
    return 0;
×
44
  }
45

46
  int32_t code = 0;
179,575✔
47
  int32_t lino = 0;
179,575✔
48

49
  int32_t ftype = TSDB_FTYPE_HEAD;
179,575✔
50
  if (reader->fd[ftype]) {
179,575✔
51
    int32_t encryptAlgorithm = reader->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
172,240✔
52
    char   *encryptKey = reader->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
172,240✔
53
#if 1
54
    TAOS_CHECK_GOTO(tsdbReadFile(reader->fd[ftype], reader->config->files[ftype].file.size - sizeof(SHeadFooter),
172,240!
55
                                 (uint8_t *)reader->headFooter, sizeof(SHeadFooter), 0, encryptAlgorithm, encryptKey),
56
                    &lino, _exit);
57
#else
58
    int64_t size = reader->config->files[ftype].file.size;
59
    for (; size > TSDB_FHDR_SIZE; size--) {
60
      code = tsdbReadFile(reader->fd[ftype], size - sizeof(SHeadFooter), (uint8_t *)reader->headFooter,
61
                          sizeof(SHeadFooter), 0, encryptAlgorithm, encryptKey);
62
      if (code) continue;
63
      if (reader->headFooter->brinBlkPtr->offset + reader->headFooter->brinBlkPtr->size + sizeof(SHeadFooter) == size) {
64
        break;
65
      }
66
    }
67
    if (size <= TSDB_FHDR_SIZE) {
68
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
69
    }
70
#endif
71
  }
72

73
  reader->ctx->headFooterLoaded = true;
179,522✔
74

75
_exit:
179,522✔
76
  if (code) {
179,522!
77
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, lino,
×
78
              tstrerror(code));
79
  }
80
  return code;
179,539✔
81
}
82

83
static int32_t tsdbDataFileReadTombFooter(SDataFileReader *reader) {
48,101✔
84
  if (reader->ctx->tombFooterLoaded) {
48,101!
85
    return 0;
×
86
  }
87

88
  int32_t code = 0;
48,101✔
89
  int32_t lino = 0;
48,101✔
90

91
  int32_t ftype = TSDB_FTYPE_TOMB;
48,101✔
92
  if (reader->fd[ftype]) {
48,101✔
93
    int32_t encryptAlgorithm = reader->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
47,693✔
94
    char   *encryptKey = reader->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
47,693✔
95
    TAOS_CHECK_GOTO(tsdbReadFile(reader->fd[ftype], reader->config->files[ftype].file.size - sizeof(STombFooter),
47,693!
96
                                 (uint8_t *)reader->tombFooter, sizeof(STombFooter), 0, encryptAlgorithm, encryptKey),
97
                    &lino, _exit);
98
  }
99
  reader->ctx->tombFooterLoaded = true;
48,101✔
100

101
_exit:
48,101✔
102
  if (code) {
48,101!
103
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, lino,
×
104
              tstrerror(code));
105
  }
106
  return code;
48,100✔
107
}
108

109
int32_t tsdbDataFileReaderOpen(const char *fname[], const SDataFileReaderConfig *config, SDataFileReader **reader) {
230,647✔
110
  int32_t code = 0;
230,647✔
111
  int32_t lino = 0;
230,647✔
112

113
  if ((*reader = taosMemoryCalloc(1, sizeof(**reader))) == NULL) {
230,647!
114
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
115
  }
116

117
  for (int32_t i = 0; i < ARRAY_SIZE(reader[0]->local); i++) {
2,536,487✔
118
    tBufferInit(reader[0]->local + i);
2,305,866✔
119
  }
120

121
  reader[0]->config[0] = config[0];
230,621✔
122
  reader[0]->buffers = config->buffers;
230,621✔
123
  if (reader[0]->buffers == NULL) {
230,621✔
124
    reader[0]->buffers = reader[0]->local;
229,408✔
125
  }
126

127
  if (fname) {
230,621✔
128
    for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) {
1,146,402✔
129
      if (fname[i]) {
917,060✔
130
        int32_t lcn = config->files[i].file.lcn;
714,843✔
131
        TAOS_CHECK_GOTO(tsdbOpenFile(fname[i], config->tsdb, TD_FILE_READ, &reader[0]->fd[i], lcn), &lino, _exit);
714,843!
132
      }
133
    }
134
  } else {
135
    for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) {
6,670✔
136
      if (config->files[i].exist) {
5,356✔
137
        char fname1[TSDB_FILENAME_LEN];
138
        tsdbTFileName(config->tsdb, &config->files[i].file, fname1);
2,818✔
139
        int32_t lcn = config->files[i].file.lcn;
2,818✔
140
        TAOS_CHECK_GOTO(tsdbOpenFile(fname1, config->tsdb, TD_FILE_READ, &reader[0]->fd[i], lcn), &lino, _exit);
2,818!
141
      }
142
    }
143
  }
144

145
_exit:
1,314✔
146
  if (code) {
230,656!
147
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(config->tsdb->pVnode), __func__, __FILE__, lino,
×
148
              tstrerror(code));
149
  }
150
  return code;
230,626✔
151
}
152

153
void tsdbDataFileReaderClose(SDataFileReader **reader) {
251,339✔
154
  if (reader[0] == NULL) {
251,339✔
155
    return;
20,748✔
156
  }
157

158
  TARRAY2_DESTROY(reader[0]->tombBlkArray, NULL);
230,591!
159
  TARRAY2_DESTROY(reader[0]->brinBlkArray, NULL);
230,591✔
160

161
  for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) {
1,153,146✔
162
    if (reader[0]->fd[i]) {
922,381✔
163
      tsdbCloseFile(&reader[0]->fd[i]);
717,650✔
164
    }
165
  }
166

167
  for (int32_t i = 0; i < ARRAY_SIZE(reader[0]->local); ++i) {
2,537,017✔
168
    tBufferDestroy(reader[0]->local + i);
2,306,222✔
169
  }
170

171
  taosMemoryFree(reader[0]);
230,795✔
172
  reader[0] = NULL;
230,642✔
173
}
174

175
int32_t tsdbDataFileReadBrinBlk(SDataFileReader *reader, const TBrinBlkArray **brinBlkArray) {
179,578✔
176
  int32_t code = 0;
179,578✔
177
  int32_t lino = 0;
179,578✔
178
  void   *data = NULL;
179,578✔
179

180
  if (!reader->ctx->brinBlkLoaded) {
179,578!
181
    TAOS_CHECK_GOTO(tsdbDataFileReadHeadFooter(reader), &lino, _exit);
179,590!
182

183
    if (reader->headFooter->brinBlkPtr->size > 0) {
179,547✔
184
      data = taosMemoryMalloc(reader->headFooter->brinBlkPtr->size);
172,214!
185
      if (data == NULL) {
172,206!
186
        TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
187
      }
188

189
      int32_t encryptAlgorithm = reader->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
172,206✔
190
      char   *encryptKey = reader->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
172,206✔
191

192
      TAOS_CHECK_GOTO(tsdbReadFile(reader->fd[TSDB_FTYPE_HEAD], reader->headFooter->brinBlkPtr->offset, data,
172,206!
193
                                   reader->headFooter->brinBlkPtr->size, 0, encryptAlgorithm, encryptKey),
194
                      &lino, _exit);
195

196
      int32_t size = reader->headFooter->brinBlkPtr->size / sizeof(SBrinBlk);
172,205✔
197
      TARRAY2_INIT_EX(reader->brinBlkArray, size, size, data);
172,205✔
198
    } else {
199
      TARRAY2_INIT(reader->brinBlkArray);
7,333✔
200
    }
201

202
    reader->ctx->brinBlkLoaded = true;
179,538✔
203
  }
204
  brinBlkArray[0] = reader->brinBlkArray;
179,526✔
205

206
_exit:
179,526✔
207
  if (code) {
179,526!
208
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, lino,
×
209
              tstrerror(code));
210
    taosMemoryFree(data);
×
211
  }
212
  return code;
179,499✔
213
}
214

215
int32_t tsdbDataFileReadBrinBlock(SDataFileReader *reader, const SBrinBlk *brinBlk, SBrinBlock *brinBlock) {
206,986✔
216
  int32_t code = 0;
206,986✔
217
  int32_t lino = 0;
206,986✔
218

219
  SBuffer *buffer = reader->buffers + 0;
206,986✔
220
  SBuffer *assist = reader->buffers + 1;
206,986✔
221

222
  int32_t encryptAlgorithm = reader->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
206,986✔
223
  char   *encryptKey = reader->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
206,986✔
224
  // load data
225
  tBufferClear(buffer);
226
  TAOS_CHECK_GOTO(tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_HEAD], brinBlk->dp->offset, brinBlk->dp->size, buffer, 0,
206,986!
227
                                       encryptAlgorithm, encryptKey),
228
                  &lino, _exit);
229

230
  // decode brin block
231
  SBufferReader br = BUFFER_READER_INITIALIZER(0, buffer);
207,023✔
232
  tBrinBlockClear(brinBlock);
207,023✔
233
  brinBlock->numOfPKs = brinBlk->numOfPKs;
207,036✔
234
  brinBlock->numOfRecords = brinBlk->numRec;
207,036✔
235
  for (int32_t i = 0; i < 10; i++) {  // int64_t
2,276,046✔
236

237
    SCompressInfo cinfo = {
2,068,913✔
238
        .cmprAlg = brinBlk->cmprAlg,
2,068,913✔
239
        .dataType = TSDB_DATA_TYPE_BIGINT,
240
        .compressedSize = brinBlk->size[i],
2,068,913✔
241
        .originalSize = brinBlk->numRec * sizeof(int64_t),
2,068,913✔
242
    };
243
    TAOS_CHECK_GOTO(tDecompressDataToBuffer(BR_PTR(&br), &cinfo, brinBlock->buffers + i, assist), &lino, _exit);
2,068,913!
244
    br.offset += brinBlk->size[i];
2,069,010✔
245
  }
246

247
  for (int32_t i = 10; i < 15; i++) {  // int32_t
1,241,801✔
248
    SCompressInfo cinfo = {
1,034,679✔
249
        .cmprAlg = brinBlk->cmprAlg,
1,034,679✔
250
        .dataType = TSDB_DATA_TYPE_INT,
251
        .compressedSize = brinBlk->size[i],
1,034,679✔
252
        .originalSize = brinBlk->numRec * sizeof(int32_t),
1,034,679✔
253
    };
254
    TAOS_CHECK_GOTO(tDecompressDataToBuffer(BR_PTR(&br), &cinfo, brinBlock->buffers + i, assist), &lino, _exit);
1,034,679!
255
    br.offset += brinBlk->size[i];
1,034,668✔
256
  }
257

258
  // primary keys
259
  if (brinBlk->numOfPKs > 0) {  // decode the primary keys
207,122✔
260
    SValueColumnCompressInfo firstInfos[TD_MAX_PK_COLS];
261
    SValueColumnCompressInfo lastInfos[TD_MAX_PK_COLS];
262

263
    for (int32_t i = 0; i < brinBlk->numOfPKs; i++) {
70,438✔
264
      TAOS_CHECK_GOTO(tValueColumnCompressInfoDecode(&br, firstInfos + i), &lino, _exit);
35,219!
265
    }
266
    for (int32_t i = 0; i < brinBlk->numOfPKs; i++) {
70,438✔
267
      TAOS_CHECK_GOTO(tValueColumnCompressInfoDecode(&br, lastInfos + i), &lino, _exit);
35,219!
268
    }
269

270
    for (int32_t i = 0; i < brinBlk->numOfPKs; i++) {
70,438✔
271
      SValueColumnCompressInfo *info = firstInfos + i;
35,219✔
272

273
      TAOS_CHECK_GOTO(tValueColumnDecompress(BR_PTR(&br), info, brinBlock->firstKeyPKs + i, assist), &lino, _exit);
35,219!
274
      br.offset += (info->offsetCompressedSize + info->dataCompressedSize);
35,219✔
275
    }
276

277
    for (int32_t i = 0; i < brinBlk->numOfPKs; i++) {
70,438✔
278
      SValueColumnCompressInfo *info = lastInfos + i;
35,219✔
279

280
      TAOS_CHECK_GOTO(tValueColumnDecompress(BR_PTR(&br), info, brinBlock->lastKeyPKs + i, assist), &lino, _exit);
35,219!
281
      br.offset += (info->offsetCompressedSize + info->dataCompressedSize);
35,219✔
282
    }
283
  }
284

285
  if (br.offset != br.buffer->size) {
207,122!
286
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
287
  }
288

289
_exit:
207,122✔
290
  if (code) {
207,122!
291
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, lino,
×
292
              tstrerror(code));
293
  }
294
  return code;
207,023✔
295
}
296

297
extern int32_t tBlockDataDecompress(SBufferReader *br, SBlockData *blockData, SBuffer *assist);
298

299
int32_t tsdbDataFileReadBlockData(SDataFileReader *reader, const SBrinRecord *record, SBlockData *bData) {
24,597✔
300
  int32_t code = 0;
24,597✔
301
  int32_t lino = 0;
24,597✔
302
  int32_t fid = reader->config->files[TSDB_FTYPE_DATA].file.fid;
24,597✔
303

304
  SBuffer *buffer = reader->buffers + 0;
24,597✔
305
  SBuffer *assist = reader->buffers + 1;
24,597✔
306

307
  int32_t encryptAlgorithm = reader->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
24,597✔
308
  char   *encryptKey = reader->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
24,597✔
309
  // load data
310
  tBufferClear(buffer);
311
  TAOS_CHECK_GOTO(tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_DATA], record->blockOffset, record->blockSize, buffer, 0,
24,597!
312
                                       encryptAlgorithm, encryptKey),
313
                  &lino, _exit);
314

315
  // decompress
316
  SBufferReader br = BUFFER_READER_INITIALIZER(0, buffer);
24,597✔
317
  TAOS_CHECK_GOTO(tBlockDataDecompress(&br, bData, assist), &lino, _exit);
24,597!
318

319
  if (br.offset != buffer->size) {
24,597!
320
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
321
  }
322

323
_exit:
24,597✔
324
  if (code) {
24,597!
325
    tsdbError("vgId:%d %s fid %d failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, fid,
×
326
              __FILE__, lino, tstrerror(code));
327
  }
328
  return code;
24,597✔
329
}
330

331
int32_t tsdbDataFileReadBlockDataByColumn(SDataFileReader *reader, const SBrinRecord *record, SBlockData *bData,
2,374,915✔
332
                                          STSchema *pTSchema, int16_t cids[], int32_t ncid) {
333
  int32_t code = 0;
2,374,915✔
334
  int32_t lino = 0;
2,374,915✔
335
  int32_t fid = reader->config->files[TSDB_FTYPE_DATA].file.fid;
2,374,915✔
336

337
  SDiskDataHdr hdr;
338
  SBuffer     *buffer0 = reader->buffers + 0;
2,374,915✔
339
  SBuffer     *buffer1 = reader->buffers + 1;
2,374,915✔
340
  SBuffer     *assist = reader->buffers + 2;
2,374,915✔
341

342
  int32_t encryptAlgorithm = reader->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
2,374,915✔
343
  char   *encryptKey = reader->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
2,374,915✔
344
  // load key part
345
  tBufferClear(buffer0);
346
  TAOS_CHECK_GOTO(tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_DATA], record->blockOffset, record->blockKeySize, buffer0,
2,374,915!
347
                                       0, encryptAlgorithm, encryptKey),
348
                  &lino, _exit);
349

350
  // SDiskDataHdr
351
  SBufferReader br = BUFFER_READER_INITIALIZER(0, buffer0);
2,374,947✔
352
  TAOS_CHECK_GOTO(tGetDiskDataHdr(&br, &hdr), &lino, _exit);
2,374,947!
353

354
  if (hdr.delimiter != TSDB_FILE_DLMT) {
2,374,922!
355
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
356
  }
357

358
  tBlockDataReset(bData);
2,374,922✔
359
  bData->suid = hdr.suid;
2,374,787✔
360
  bData->uid = hdr.uid;
2,374,787✔
361
  bData->nRow = hdr.nRow;
2,374,787✔
362

363
  // Key part
364
  TAOS_CHECK_GOTO(tBlockDataDecompressKeyPart(&hdr, &br, bData, assist), &lino, _exit);
2,374,787!
365
  if (br.offset != buffer0->size) {
2,375,054!
366
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
367
  }
368

369
  int extraColIdx = -1;
2,375,054✔
370
  for (int i = 0; i < ncid; i++) {
2,507,878✔
371
    if (tBlockDataGetColData(bData, cids[i]) == NULL) {
1,543,896✔
372
      extraColIdx = i;
1,411,064✔
373
      break;
1,411,064✔
374
    }
375
  }
376

377
  if (extraColIdx < 0) {
2,375,046✔
378
    goto _exit;
963,927✔
379
  }
380

381
  // load SBlockCol part
382
  tBufferClear(buffer0);
383
  TAOS_CHECK_GOTO(tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_DATA], record->blockOffset + record->blockKeySize,
1,411,119!
384
                                       hdr.szBlkCol, buffer0, 0, encryptAlgorithm, encryptKey),
385
                  &lino, _exit);
386

387
  // calc szHint
388
  int64_t szHint = 0;
1,411,121✔
389
  int     extraCols = 1;
1,411,121✔
390
  for (int i = extraColIdx + 1; i < ncid; ++i) {
1,411,119✔
391
    if (tBlockDataGetColData(bData, cids[i]) == NULL) {
108,641!
392
      ++extraCols;
108,645✔
393
      break;
108,645✔
394
    }
395
  }
396

397
  if (extraCols >= 2) {
1,411,123✔
398
    br = BUFFER_READER_INITIALIZER(0, buffer0);
108,646✔
399

400
    SBlockCol blockCol = {.cid = 0};
108,646✔
401
    for (int32_t i = extraColIdx; i < ncid; ++i) {
108,646✔
402
      int16_t extraColCid = cids[i];
108,613✔
403

404
      while (extraColCid > blockCol.cid) {
231,633✔
405
        if (br.offset >= buffer0->size) {
123,079!
406
          blockCol.cid = INT16_MAX;
×
407
          break;
×
408
        }
409

410
        TAOS_CHECK_GOTO(tGetBlockCol(&br, &blockCol, hdr.fmtVer, hdr.cmprAlg), &lino, _exit);
123,079!
411
      }
412

413
      if (extraColCid == blockCol.cid || blockCol.cid == INT16_MAX) {
108,554!
414
        extraColIdx = i;
108,554✔
415
        break;
108,554✔
416
      }
417
    }
418

419
    if (blockCol.cid > 0 && blockCol.cid < INT16_MAX /*&& blockCol->flag == HAS_VALUE*/) {
108,587!
420
      int64_t   offset = blockCol.offset;
108,586✔
421
      SBlockCol lastNonNoneBlockCol = {.cid = 0};
108,586✔
422

423
      for (int32_t i = extraColIdx; i < ncid; ++i) {
849,736✔
424
        int16_t extraColCid = cids[i];
744,053✔
425

426
        while (extraColCid > blockCol.cid) {
1,397,419✔
427
          if (br.offset >= buffer0->size) {
656,269✔
428
            blockCol.cid = INT16_MAX;
2,578✔
429
            break;
2,578✔
430
          }
431

432
          TAOS_CHECK_GOTO(tGetBlockCol(&br, &blockCol, hdr.fmtVer, hdr.cmprAlg), &lino, _exit);
653,691!
433
        }
434

435
        if (extraColCid == blockCol.cid) {
743,728✔
436
          lastNonNoneBlockCol = blockCol;
741,118✔
437
          continue;
741,118✔
438
        }
439

440
        if (blockCol.cid == INT16_MAX) {
2,610✔
441
          break;
2,578✔
442
        }
443
      }
444

445
      if (lastNonNoneBlockCol.cid > 0) {
108,261!
446
        szHint = lastNonNoneBlockCol.offset + lastNonNoneBlockCol.szBitmap + lastNonNoneBlockCol.szOffset +
108,656✔
447
                 lastNonNoneBlockCol.szValue - offset;
108,656✔
448
      }
449
    }
450
  }
451

452
  // load each column
453
  SBlockCol blockCol = {
1,410,739✔
454
      .cid = 0,
455
  };
456
  bool firstRead = true;
1,410,739✔
457
  br = BUFFER_READER_INITIALIZER(0, buffer0);
1,410,739✔
458
  for (int32_t i = 0; i < ncid; i++) {
3,553,041✔
459
    int16_t cid = cids[i];
2,142,452✔
460

461
    if (tBlockDataGetColData(bData, cid)) {  // already loaded
2,142,452✔
462
      continue;
73,068✔
463
    }
464

465
    while (cid > blockCol.cid) {
4,895,493✔
466
      if (br.offset >= buffer0->size) {
2,829,307✔
467
        blockCol.cid = INT16_MAX;
2,578✔
468
        break;
2,578✔
469
      }
470

471
      TAOS_CHECK_GOTO(tGetBlockCol(&br, &blockCol, hdr.fmtVer, hdr.cmprAlg), &lino, _exit);
2,826,729!
472
    }
473

474
    if (cid < blockCol.cid) {
2,068,764✔
475
      const STColumn *tcol = tTSchemaSearchColumn(pTSchema, cid);
25,780✔
476
      TSDB_CHECK_NULL(tcol, code, lino, _exit, TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER);
25,780!
477
      SBlockCol none = {
25,780✔
478
          .cid = cid,
479
          .type = tcol->type,
25,780✔
480
          .cflag = tcol->flags,
25,780✔
481
          .flag = HAS_NONE,
482
          .szOrigin = 0,
483
          .szBitmap = 0,
484
          .szOffset = 0,
485
          .szValue = 0,
486
          .offset = 0,
487
      };
488
      TAOS_CHECK_GOTO(tBlockDataDecompressColData(&hdr, &none, &br, bData, assist), &lino, _exit);
25,780!
489
    } else if (cid == blockCol.cid) {
2,042,984!
490
      int32_t encryptAlgorithm = reader->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
2,043,184✔
491
      char   *encryptKey = reader->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
2,043,184✔
492
      // load from file
493
      tBufferClear(buffer1);
494
      TAOS_CHECK_GOTO(tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_DATA],
2,043,184!
495
                                           record->blockOffset + record->blockKeySize + hdr.szBlkCol + blockCol.offset,
496
                                           blockCol.szBitmap + blockCol.szOffset + blockCol.szValue, buffer1,
497
                                           firstRead ? szHint : 0, encryptAlgorithm, encryptKey),
498
                      &lino, _exit);
499

500
      firstRead = false;
2,043,674✔
501

502
      // decode the buffer
503
      SBufferReader br1 = BUFFER_READER_INITIALIZER(0, buffer1);
2,043,674✔
504
      TAOS_CHECK_GOTO(tBlockDataDecompressColData(&hdr, &blockCol, &br1, bData, assist), &lino, _exit);
2,043,674!
505
    }
506
  }
507

508
_exit:
1,410,589✔
509
  if (code) {
2,374,516!
510
    tsdbError("vgId:%d %s fid:%d failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, fid,
×
511
              __FILE__, lino, tstrerror(code));
512
  }
513
  return code;
2,374,997✔
514
}
515

516
int32_t tsdbDataFileReadBlockSma(SDataFileReader *reader, const SBrinRecord *record,
1,708,541✔
517
                                 TColumnDataAggArray *columnDataAggArray) {
518
  int32_t  code = 0;
1,708,541✔
519
  int32_t  lino = 0;
1,708,541✔
520
  SBuffer *buffer = reader->buffers + 0;
1,708,541✔
521

522
  TARRAY2_CLEAR(columnDataAggArray, NULL);
1,708,541✔
523
  if (record->smaSize > 0) {
1,708,541!
524
    tBufferClear(buffer);
525
    int32_t encryptAlgorithm = reader->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
1,708,541✔
526
    char   *encryptKey = reader->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
1,708,541✔
527
    TAOS_CHECK_GOTO(tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_SMA], record->smaOffset, record->smaSize, buffer, 0,
1,708,541!
528
                                         encryptAlgorithm, encryptKey),
529
                    &lino, _exit);
530

531
    // decode sma data
532
    SBufferReader br = BUFFER_READER_INITIALIZER(0, buffer);
1,708,541✔
533
    while (br.offset < record->smaSize) {
6,882,404✔
534
      SColumnDataAgg sma[1];
535

536
      TAOS_CHECK_GOTO(tGetColumnDataAgg(&br, sma), &lino, _exit);
5,173,868!
537
      TAOS_CHECK_GOTO(TARRAY2_APPEND_PTR(columnDataAggArray, sma), &lino, _exit);
10,347,726!
538
    }
539
    if (br.offset != record->smaSize) {
1,708,536!
540
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
541
    }
542
  }
543

544
_exit:
×
545
  if (code) {
1,708,536!
546
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, lino,
×
547
              tstrerror(code));
548
  }
549
  return code;
1,708,541✔
550
}
551

552
int32_t tsdbDataFileReadTombBlk(SDataFileReader *reader, const TTombBlkArray **tombBlkArray) {
67,660✔
553
  int32_t code = 0;
67,660✔
554
  int32_t lino = 0;
67,660✔
555
  void   *data = NULL;
67,660✔
556

557
  if (!reader->ctx->tombBlkLoaded) {
67,660✔
558
    TAOS_CHECK_GOTO(tsdbDataFileReadTombFooter(reader), &lino, _exit);
48,101!
559

560
    if (reader->tombFooter->tombBlkPtr->size > 0) {
48,100✔
561
      if ((data = taosMemoryMalloc(reader->tombFooter->tombBlkPtr->size)) == NULL) {
47,692!
562
        TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
563
      }
564

565
      int32_t encryptAlgorithm = reader->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
47,693✔
566
      char   *encryptKey = reader->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
47,693✔
567
      TAOS_CHECK_GOTO(tsdbReadFile(reader->fd[TSDB_FTYPE_TOMB], reader->tombFooter->tombBlkPtr->offset, data,
47,693!
568
                                   reader->tombFooter->tombBlkPtr->size, 0, encryptAlgorithm, encryptKey),
569
                      &lino, _exit);
570

571
      int32_t size = reader->tombFooter->tombBlkPtr->size / sizeof(STombBlk);
47,692✔
572
      TARRAY2_INIT_EX(reader->tombBlkArray, size, size, data);
47,692✔
573
    } else {
574
      TARRAY2_INIT(reader->tombBlkArray);
408✔
575
    }
576

577
    reader->ctx->tombBlkLoaded = true;
48,100✔
578
  }
579
  tombBlkArray[0] = reader->tombBlkArray;
67,659✔
580

581
_exit:
67,659✔
582
  if (code) {
67,659!
583
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, lino,
×
584
              tstrerror(code));
585
    taosMemoryFree(data);
×
586
  }
587
  return code;
67,658✔
588
}
589

590
int32_t tsdbDataFileReadTombBlock(SDataFileReader *reader, const STombBlk *tombBlk, STombBlock *tData) {
59,426✔
591
  int32_t code = 0;
59,426✔
592
  int32_t lino = 0;
59,426✔
593

594
  SBuffer *buffer0 = reader->buffers + 0;
59,426✔
595
  SBuffer *assist = reader->buffers + 1;
59,426✔
596

597
  tBufferClear(buffer0);
598
  int32_t encryptAlgorithm = reader->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
59,426✔
599
  char   *encryptKey = reader->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
59,426✔
600
  TAOS_CHECK_GOTO(tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_TOMB], tombBlk->dp->offset, tombBlk->dp->size, buffer0, 0,
59,426!
601
                                       encryptAlgorithm, encryptKey),
602
                  &lino, _exit);
603

604
  int32_t       size = 0;
59,427✔
605
  SBufferReader br = BUFFER_READER_INITIALIZER(0, buffer0);
59,427✔
606
  tTombBlockClear(tData);
59,427✔
607
  tData->numOfRecords = tombBlk->numRec;
59,426✔
608
  for (int32_t i = 0; i < ARRAY_SIZE(tData->buffers); ++i) {
356,558✔
609
    SCompressInfo cinfo = {
297,133✔
610
        .cmprAlg = tombBlk->cmprAlg,
297,133✔
611
        .dataType = TSDB_DATA_TYPE_BIGINT,
612
        .originalSize = tombBlk->numRec * sizeof(int64_t),
297,133✔
613
        .compressedSize = tombBlk->size[i],
297,133✔
614
    };
615
    TAOS_CHECK_GOTO(tDecompressDataToBuffer(BR_PTR(&br), &cinfo, tData->buffers + i, assist), &lino, _exit);
297,133!
616
    br.offset += tombBlk->size[i];
297,132✔
617
  }
618

619
_exit:
59,425✔
620
  if (code) {
59,425!
621
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, lino,
×
622
              tstrerror(code));
623
  }
624
  return code;
59,427✔
625
}
626

627
// SDataFileWriter =============================================
628
struct SDataFileWriter {
629
  SDataFileWriterConfig config[1];
630

631
  SSkmInfo skmTb[1];
632
  SSkmInfo skmRow[1];
633
  SBuffer  local[10];
634
  SBuffer *buffers;
635

636
  struct {
637
    bool             opened;
638
    SDataFileReader *reader;
639

640
    // for ts data
641
    TABLEID tbid[1];
642
    bool    tbHasOldData;
643

644
    const TBrinBlkArray *brinBlkArray;
645
    int32_t              brinBlkArrayIdx;
646
    SBrinBlock           brinBlock[1];
647
    int32_t              brinBlockIdx;
648
    SBlockData           blockData[1];
649
    int32_t              blockDataIdx;
650
    // for tomb data
651
    bool                 hasOldTomb;
652
    const TTombBlkArray *tombBlkArray;
653
    int32_t              tombBlkArrayIdx;
654
    STombBlock           tombBlock[1];
655
    int32_t              tombBlockIdx;
656
    // range
657
    SVersionRange range;
658
    SVersionRange tombRange;
659
  } ctx[1];
660

661
  STFile   files[TSDB_FTYPE_MAX];
662
  STsdbFD *fd[TSDB_FTYPE_MAX];
663

664
  SHeadFooter headFooter[1];
665
  STombFooter tombFooter[1];
666

667
  TBrinBlkArray brinBlkArray[1];
668
  SBrinBlock    brinBlock[1];
669
  SBlockData    blockData[1];
670

671
  TTombBlkArray tombBlkArray[1];
672
  STombBlock    tombBlock[1];
673
};
674

675
static int32_t tsdbDataFileWriterCloseAbort(SDataFileWriter *writer) {
×
676
  tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, __LINE__,
×
677
            "not implemented");
678
  return 0;
×
679
}
680

681
static void tsdbDataFileWriterDoClose(SDataFileWriter *writer) {
3,534✔
682
  if (writer->ctx->reader) {
3,534✔
683
    tsdbDataFileReaderClose(&writer->ctx->reader);
1,233✔
684
  }
685

686
  tTombBlockDestroy(writer->tombBlock);
3,534✔
687
  TARRAY2_DESTROY(writer->tombBlkArray, NULL);
3,534!
688
  tBlockDataDestroy(writer->blockData);
3,534✔
689
  tBrinBlockDestroy(writer->brinBlock);
3,534✔
690
  TARRAY2_DESTROY(writer->brinBlkArray, NULL);
3,534!
691

692
  tTombBlockDestroy(writer->ctx->tombBlock);
3,534✔
693
  tBlockDataDestroy(writer->ctx->blockData);
3,534✔
694
  tBrinBlockDestroy(writer->ctx->brinBlock);
3,534✔
695

696
  for (int32_t i = 0; i < ARRAY_SIZE(writer->local); ++i) {
38,874✔
697
    tBufferDestroy(writer->local + i);
35,340!
698
  }
699

700
  tDestroyTSchema(writer->skmRow->pTSchema);
3,534!
701
  tDestroyTSchema(writer->skmTb->pTSchema);
3,534!
702
}
3,534✔
703

704
static int32_t tsdbDataFileWriterDoOpenReader(SDataFileWriter *writer) {
3,534✔
705
  int32_t code = 0;
3,534✔
706
  int32_t lino = 0;
3,534✔
707

708
  for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) {
14,316✔
709
    if (writer->config->files[i].exist) {
12,015✔
710
      SDataFileReaderConfig config[1] = {{
1,233✔
711
          .tsdb = writer->config->tsdb,
1,233✔
712
          .szPage = writer->config->szPage,
1,233✔
713
          .buffers = writer->buffers,
1,233✔
714
      }};
715

716
      for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) {
6,162✔
717
        config->files[i].exist = writer->config->files[i].exist;
4,929✔
718
        if (config->files[i].exist) {
4,929✔
719
          config->files[i].file = writer->config->files[i].file;
2,788✔
720
        }
721
      }
722

723
      TAOS_CHECK_GOTO(tsdbDataFileReaderOpen(NULL, config, &writer->ctx->reader), &lino, _exit);
1,233!
724
      break;
1,233✔
725
    }
726
  }
727

728
_exit:
2,301✔
729
  if (code) {
3,534!
730
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
731
              tstrerror(code));
732
  }
733
  return code;
3,534✔
734
}
735

736
static int32_t tsdbDataFileWriterDoOpen(SDataFileWriter *writer) {
3,534✔
737
  int32_t code = 0;
3,534✔
738
  int32_t lino = 0;
3,534✔
739
  int32_t ftype;
740
  SDiskID diskId = {0};
3,534✔
741

742
  if (!writer->config->skmTb) writer->config->skmTb = writer->skmTb;
3,534!
743
  if (!writer->config->skmRow) writer->config->skmRow = writer->skmRow;
3,534!
744
  writer->buffers = writer->config->buffers;
3,534✔
745
  if (writer->buffers == NULL) {
3,534!
746
    writer->buffers = writer->local;
×
747
  }
748

749
  // open reader
750
  TAOS_CHECK_GOTO(tsdbDataFileWriterDoOpenReader(writer), &lino, _exit);
3,534!
751

752
  // .head
753
  ftype = TSDB_FTYPE_HEAD;
3,534✔
754
  code = tsdbAllocateDisk(writer->config->tsdb, tsdbFTypeLabel(ftype), writer->config->expLevel, &diskId);
3,534✔
755
  TSDB_CHECK_CODE(code, lino, _exit);
3,532!
756
  writer->files[ftype] = (STFile){
3,532✔
757
      .type = ftype,
758
      .did = diskId,
759
      .fid = writer->config->fid,
3,532✔
760
      .cid = writer->config->cid,
3,532✔
761
      .size = 0,
762
      .minVer = VERSION_MAX,
763
      .maxVer = VERSION_MIN,
764
  };
765

766
  // .data
767
  ftype = TSDB_FTYPE_DATA;
3,532✔
768
  if (writer->config->files[ftype].exist) {
3,532✔
769
    writer->files[ftype] = writer->config->files[ftype].file;
705✔
770
  } else {
771
    code = tsdbAllocateDisk(writer->config->tsdb, tsdbFTypeLabel(ftype), writer->config->expLevel, &diskId);
2,827✔
772
    TSDB_CHECK_CODE(code, lino, _exit);
2,827!
773
    writer->files[ftype] = (STFile){
5,654✔
774
        .type = ftype,
775
        .did = diskId,
776
        .fid = writer->config->fid,
2,827✔
777
        .cid = writer->config->cid,
2,827✔
778
        .size = 0,
779
        .lcn = writer->config->lcn == -1 ? 0 : -1,
2,827✔
780
        .minVer = VERSION_MAX,
781
        .maxVer = VERSION_MIN,
782
    };
783
  }
784

785
  // .sma
786
  ftype = TSDB_FTYPE_SMA;
3,532✔
787
  if (writer->config->files[ftype].exist) {
3,532✔
788
    writer->files[ftype] = writer->config->files[ftype].file;
705✔
789
  } else {
790
    code = tsdbAllocateDisk(writer->config->tsdb, tsdbFTypeLabel(ftype), writer->config->expLevel, &diskId);
2,827✔
791
    TSDB_CHECK_CODE(code, lino, _exit);
2,827!
792
    writer->files[ftype] = (STFile){
2,827✔
793
        .type = ftype,
794
        .did = diskId,
795
        .fid = writer->config->fid,
2,827✔
796
        .cid = writer->config->cid,
2,827✔
797
        .size = 0,
798
        .minVer = VERSION_MAX,
799
        .maxVer = VERSION_MIN,
800
    };
801
  }
802

803
  // .tomb
804
  ftype = TSDB_FTYPE_TOMB;
3,532✔
805
  code = tsdbAllocateDisk(writer->config->tsdb, tsdbFTypeLabel(ftype), writer->config->expLevel, &diskId);
3,532✔
806
  TSDB_CHECK_CODE(code, lino, _exit);
3,533!
807
  writer->files[ftype] = (STFile){
3,533✔
808
      .type = ftype,
809
      .did = diskId,
810
      .fid = writer->config->fid,
3,533✔
811
      .cid = writer->config->cid,
3,533✔
812
      .size = 0,
813
      .minVer = VERSION_MAX,
814
      .maxVer = VERSION_MIN,
815
  };
816

817
  // range
818
  writer->ctx->range = (SVersionRange){.minVer = VERSION_MAX, .maxVer = VERSION_MIN};
3,533✔
819
  writer->ctx->tombRange = (SVersionRange){.minVer = VERSION_MAX, .maxVer = VERSION_MIN};
3,533✔
820

821
  writer->ctx->opened = true;
3,533✔
822

823
_exit:
3,533✔
824
  if (code) {
3,533!
825
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
826
              tstrerror(code));
827
  }
828
  return code;
3,534✔
829
}
830

831
void tsdbWriterUpdVerRange(SVersionRange *range, int64_t minVer, int64_t maxVer) {
984,564✔
832
  range->minVer = TMIN(range->minVer, minVer);
984,564✔
833
  range->maxVer = TMAX(range->maxVer, maxVer);
984,564✔
834
}
984,564✔
835

836
int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, uint32_t cmprAlg, int64_t *fileSize,
25,130✔
837
                               TBrinBlkArray *brinBlkArray, SBuffer *buffers, SVersionRange *range,
838
                               int32_t encryptAlgorithm, char *encryptKey) {
839
  if (brinBlock->numOfRecords == 0) {
25,130!
840
    return 0;
×
841
  }
842

843
  int32_t  code;
844
  SBuffer *buffer0 = buffers + 0;
25,130✔
845
  SBuffer *buffer1 = buffers + 1;
25,130✔
846
  SBuffer *assist = buffers + 2;
25,130✔
847

848
  SBrinBlk brinBlk = {
25,130✔
849
      .dp[0] =
850
          {
851
              .offset = *fileSize,
25,130✔
852
              .size = 0,
853
          },
854
      .numRec = brinBlock->numOfRecords,
25,130✔
855
      .numOfPKs = brinBlock->numOfPKs,
25,130✔
856
      .cmprAlg = cmprAlg,
857
  };
858
  for (int i = 0; i < brinBlock->numOfRecords; i++) {
5,857,016✔
859
    SBrinRecord record;
860

861
    TAOS_CHECK_RETURN(tBrinBlockGet(brinBlock, i, &record));
5,831,894!
862
    if (i == 0) {
5,831,886✔
863
      brinBlk.minTbid.suid = record.suid;
25,128✔
864
      brinBlk.minTbid.uid = record.uid;
25,128✔
865
      brinBlk.minVer = record.minVer;
25,128✔
866
      brinBlk.maxVer = record.maxVer;
25,128✔
867
    }
868
    if (i == brinBlock->numOfRecords - 1) {
5,831,886✔
869
      brinBlk.maxTbid.suid = record.suid;
25,129✔
870
      brinBlk.maxTbid.uid = record.uid;
25,129✔
871
    }
872
    if (record.minVer < brinBlk.minVer) {
5,831,886✔
873
      brinBlk.minVer = record.minVer;
4,853✔
874
    }
875
    if (record.maxVer > brinBlk.maxVer) {
5,831,886✔
876
      brinBlk.maxVer = record.maxVer;
2,224,001✔
877
    }
878
  }
879

880
  tsdbWriterUpdVerRange(range, brinBlk.minVer, brinBlk.maxVer);
25,122✔
881

882
  // write to file
883
  for (int32_t i = 0; i < 10; ++i) {
276,422✔
884
    SCompressInfo info = {
251,292✔
885
        .cmprAlg = cmprAlg,
886
        .dataType = TSDB_DATA_TYPE_BIGINT,
887
        .originalSize = brinBlock->buffers[i].size,
251,292✔
888
    };
889

890
    tBufferClear(buffer0);
891
    TAOS_CHECK_RETURN(tCompressDataToBuffer(brinBlock->buffers[i].data, &info, buffer0, assist));
251,292!
892
    TAOS_CHECK_RETURN(tsdbWriteFile(fd, *fileSize, buffer0->data, buffer0->size, encryptAlgorithm, encryptKey));
251,296!
893
    brinBlk.size[i] = info.compressedSize;
251,293✔
894
    brinBlk.dp->size += info.compressedSize;
251,293✔
895
    *fileSize += info.compressedSize;
251,293✔
896
  }
897
  for (int32_t i = 10; i < 15; ++i) {
150,779✔
898
    SCompressInfo info = {
125,649✔
899
        .cmprAlg = cmprAlg,
900
        .dataType = TSDB_DATA_TYPE_INT,
901
        .originalSize = brinBlock->buffers[i].size,
125,649✔
902
    };
903

904
    tBufferClear(buffer0);
905
    TAOS_CHECK_RETURN(tCompressDataToBuffer(brinBlock->buffers[i].data, &info, buffer0, assist));
125,649!
906
    TAOS_CHECK_RETURN(tsdbWriteFile(fd, *fileSize, buffer0->data, buffer0->size, encryptAlgorithm, encryptKey));
125,650!
907
    brinBlk.size[i] = info.compressedSize;
125,649✔
908
    brinBlk.dp->size += info.compressedSize;
125,649✔
909
    *fileSize += info.compressedSize;
125,649✔
910
  }
911

912
  // write primary keys to file
913
  if (brinBlock->numOfPKs > 0) {
25,130✔
914
    tBufferClear(buffer0);
915
    tBufferClear(buffer1);
916

917
    // encode
918
    for (int i = 0; i < brinBlock->numOfPKs; i++) {
3,488✔
919
      SValueColumnCompressInfo info = {.cmprAlg = cmprAlg};
1,744✔
920
      TAOS_CHECK_RETURN(tValueColumnCompress(&brinBlock->firstKeyPKs[i], &info, buffer1, assist));
1,744!
921
      TAOS_CHECK_RETURN(tValueColumnCompressInfoEncode(&info, buffer0));
1,744!
922
    }
923
    for (int i = 0; i < brinBlock->numOfPKs; i++) {
3,488✔
924
      SValueColumnCompressInfo info = {.cmprAlg = cmprAlg};
1,744✔
925
      TAOS_CHECK_RETURN(tValueColumnCompress(&brinBlock->lastKeyPKs[i], &info, buffer1, assist));
1,744!
926
      TAOS_CHECK_RETURN(tValueColumnCompressInfoEncode(&info, buffer0));
1,744!
927
    }
928

929
    // write to file
930
    TAOS_CHECK_RETURN(tsdbWriteFile(fd, *fileSize, buffer0->data, buffer0->size, encryptAlgorithm, encryptKey));
1,744!
931
    *fileSize += buffer0->size;
1,744✔
932
    brinBlk.dp->size += buffer0->size;
1,744✔
933
    TAOS_CHECK_RETURN(tsdbWriteFile(fd, *fileSize, buffer1->data, buffer1->size, encryptAlgorithm, encryptKey));
1,744!
934
    *fileSize += buffer1->size;
1,744✔
935
    brinBlk.dp->size += buffer1->size;
1,744✔
936
  }
937

938
  // append to brinBlkArray
939
  TAOS_CHECK_RETURN(TARRAY2_APPEND_PTR(brinBlkArray, &brinBlk));
50,260!
940

941
  tBrinBlockClear(brinBlock);
25,130✔
942

943
  return 0;
25,130✔
944
}
945

946
static int32_t tsdbDataFileWriteBrinBlock(SDataFileWriter *writer) {
25,152✔
947
  if (writer->brinBlock->numOfRecords == 0) {
25,152✔
948
    return 0;
22✔
949
  }
950

951
  int32_t code = 0;
25,130✔
952
  int32_t lino = 0;
25,130✔
953

954
  int32_t encryptAlgorithm = writer->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
25,130✔
955
  char   *encryptKey = writer->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
25,130✔
956

957
  TAOS_CHECK_GOTO(tsdbFileWriteBrinBlock(writer->fd[TSDB_FTYPE_HEAD], writer->brinBlock, writer->config->cmprAlg,
25,130!
958
                                         &writer->files[TSDB_FTYPE_HEAD].size, writer->brinBlkArray, writer->buffers,
959
                                         &writer->ctx->range, encryptAlgorithm, encryptKey),
960
                  &lino, _exit);
961

962
_exit:
25,130✔
963
  if (code) {
25,130!
964
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
965
              tstrerror(code));
966
  }
967
  return code;
25,130✔
968
}
969

970
static int32_t tsdbDataFileWriteBrinRecord(SDataFileWriter *writer, const SBrinRecord *record) {
5,832,142✔
971
  int32_t code = 0;
5,832,142✔
972
  int32_t lino = 0;
5,832,142✔
973

974
  for (;;) {
975
    code = tBrinBlockPut(writer->brinBlock, record);
5,832,145✔
976
    if (code == TSDB_CODE_INVALID_PARA) {
5,832,145!
977
      // different records with different primary keys
UNCOV
978
      TAOS_CHECK_GOTO(tsdbDataFileWriteBrinBlock(writer), &lino, _exit);
×
979
      continue;
3✔
980
    } else {
981
      TSDB_CHECK_CODE(code, lino, _exit);
5,832,149!
982
    }
983
    break;
5,832,149✔
984
  }
985

986
  if ((writer->brinBlock->numOfRecords) >= 256) {
5,832,149✔
987
    TAOS_CHECK_GOTO(tsdbDataFileWriteBrinBlock(writer), &lino, _exit);
22,608!
988
  }
989

990
_exit:
5,832,149✔
991
  if (code) {
5,832,149!
992
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
993
              tstrerror(code));
994
  }
995
  return code;
5,832,144✔
996
}
997

998
static int32_t tsdbDataFileDoWriteBlockData(SDataFileWriter *writer, SBlockData *bData) {
222,072✔
999
  if (bData->nRow == 0) {
222,072✔
1000
    return 0;
32,688✔
1001
  }
1002

1003
  if (!bData->uid) {
189,384!
1004
    return TSDB_CODE_INVALID_PARA;
×
1005
  }
1006

1007
  int32_t  code = 0;
189,384✔
1008
  int32_t  lino = 0;
189,384✔
1009
  SBuffer *buffers = writer->buffers;
189,384✔
1010
  SBuffer *assist = writer->buffers + 4;
189,384✔
1011

1012
  SColCompressInfo cmprInfo = {.pColCmpr = NULL, .defaultCmprAlg = writer->config->cmprAlg};
189,384✔
1013

1014
  SBrinRecord record[1] = {{
189,384✔
1015
      .suid = bData->suid,
189,384✔
1016
      .uid = bData->uid,
189,384✔
1017
      .minVer = bData->aVersion[0],
189,384✔
1018
      .maxVer = bData->aVersion[0],
189,384✔
1019
      .blockOffset = writer->files[TSDB_FTYPE_DATA].size,
189,384✔
1020
      .smaOffset = writer->files[TSDB_FTYPE_SMA].size,
189,384✔
1021
      .blockSize = 0,
1022
      .blockKeySize = 0,
1023
      .smaSize = 0,
1024
      .numRow = bData->nRow,
189,384✔
1025
      .count = 1,
1026
  }};
1027

1028
  tsdbRowGetKey(&tsdbRowFromBlockData(bData, 0), &record->firstKey);
189,384✔
1029
  tsdbRowGetKey(&tsdbRowFromBlockData(bData, bData->nRow - 1), &record->lastKey);
189,386✔
1030

1031
  for (int32_t i = 1; i < bData->nRow; ++i) {
629,896,719✔
1032
    if (tsdbRowCompareWithoutVersion(&tsdbRowFromBlockData(bData, i - 1), &tsdbRowFromBlockData(bData, i)) != 0) {
629,707,332!
1033
      record->count++;
629,703,459✔
1034
    }
1035
    if (bData->aVersion[i] < record->minVer) {
629,702,126✔
1036
      record->minVer = bData->aVersion[i];
33,247✔
1037
    }
1038
    if (bData->aVersion[i] > record->maxVer) {
629,702,126✔
1039
      record->maxVer = bData->aVersion[i];
7,070,463✔
1040
    }
1041
  }
1042

1043
  tsdbWriterUpdVerRange(&writer->ctx->range, record->minVer, record->maxVer);
189,387✔
1044

1045
  code = metaGetColCmpr(writer->config->tsdb->pVnode->pMeta, bData->suid != 0 ? bData->suid : bData->uid,
189,386✔
1046
                        &cmprInfo.pColCmpr);
1047
  if (code) {
189,382!
1048
    tsdbWarn("vgId:%d failed to get column compress algrithm", TD_VID(writer->config->tsdb->pVnode));
×
1049
  }
1050

1051
  TAOS_CHECK_GOTO(tBlockDataCompress(bData, &cmprInfo, buffers, assist), &lino, _exit);
189,382!
1052

1053
  record->blockKeySize = buffers[0].size + buffers[1].size;
189,384✔
1054
  record->blockSize = record->blockKeySize + buffers[2].size + buffers[3].size;
189,384✔
1055

1056
  int32_t encryptAlgorithm = writer->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
189,384✔
1057
  char   *encryptKey = writer->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
189,384✔
1058
  for (int i = 0; i < 4; i++) {
946,921✔
1059
    TAOS_CHECK_GOTO(tsdbWriteFile(writer->fd[TSDB_FTYPE_DATA], writer->files[TSDB_FTYPE_DATA].size, buffers[i].data,
757,544!
1060
                                  buffers[i].size, encryptAlgorithm, encryptKey),
1061
                    &lino, _exit);
1062
    writer->files[TSDB_FTYPE_DATA].size += buffers[i].size;
757,537✔
1063
  }
1064

1065
  // to .sma file
1066
  tBufferClear(&buffers[0]);
1067
  for (int32_t i = 0; i < bData->nColData; ++i) {
816,707✔
1068
    SColData *colData = bData->aColData + i;
627,331✔
1069
    if ((colData->cflag & COL_SMA_ON) == 0 || ((colData->flag & HAS_VALUE) == 0)) continue;
627,331✔
1070

1071
    SColumnDataAgg sma[1] = {{.colId = colData->cid}};
592,464✔
1072
    tColDataCalcSMA[colData->type](colData, sma);
592,464✔
1073

1074
    TAOS_CHECK_GOTO(tPutColumnDataAgg(&buffers[0], sma), &lino, _exit);
592,467!
1075
  }
1076
  record->smaSize = buffers[0].size;
189,376✔
1077

1078
  if (record->smaSize > 0) {
189,376✔
1079
    TAOS_CHECK_GOTO(tsdbWriteFile(writer->fd[TSDB_FTYPE_SMA], record->smaOffset, buffers[0].data, record->smaSize,
182,407!
1080
                                  encryptAlgorithm, encryptKey),
1081
                    &lino, _exit);
1082
    writer->files[TSDB_FTYPE_SMA].size += record->smaSize;
182,407✔
1083
  }
1084

1085
  // append SBrinRecord
1086
  TAOS_CHECK_GOTO(tsdbDataFileWriteBrinRecord(writer, record), &lino, _exit);
189,376!
1087

1088
  tBlockDataClear(bData);
189,383✔
1089

1090
_exit:
189,385✔
1091
  if (code) {
189,385!
1092
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1093
              tstrerror(code));
1094
  }
1095
  taosHashCleanup(cmprInfo.pColCmpr);
189,385✔
1096
  return code;
189,386✔
1097
}
1098

1099
static int32_t tsdbDataFileDoWriteTSRow(SDataFileWriter *writer, TSDBROW *row) {
237,650,109✔
1100
  int32_t code = 0;
237,650,109✔
1101
  int32_t lino = 0;
237,650,109✔
1102

1103
  // update/append
1104
  if (row->type == TSDBROW_ROW_FMT) {
237,650,109!
1105
    TAOS_CHECK_GOTO(
×
1106
        tsdbUpdateSkmRow(writer->config->tsdb, writer->ctx->tbid, TSDBROW_SVERSION(row), writer->config->skmRow), &lino,
1107
        _exit);
1108
  }
1109

1110
  if (TSDBROW_VERSION(row) <= writer->config->compactVersion  //
237,650,109!
1111
      && writer->blockData->nRow > 0                          //
237,674,073✔
1112
      &&
237,526,841✔
1113
      tsdbRowCompareWithoutVersion(row, &tsdbRowFromBlockData(writer->blockData, writer->blockData->nRow - 1)) == 0  //
237,624,637✔
1114
  ) {
1115
    TAOS_CHECK_GOTO(tBlockDataUpdateRow(writer->blockData, row, writer->config->skmRow->pTSchema), &lino, _exit);
31,382,321!
1116
  } else {
1117
    if (writer->blockData->nRow >= writer->config->maxRow) {
206,169,992✔
1118
      TAOS_CHECK_GOTO(tsdbDataFileDoWriteBlockData(writer, writer->blockData), &lino, _exit);
18,091!
1119
    }
1120

1121
    TAOS_CHECK_GOTO(
206,169,992!
1122
        tBlockDataAppendRow(writer->blockData, row, writer->config->skmRow->pTSchema, writer->ctx->tbid->uid), &lino,
1123
        _exit);
1124
  }
1125

1126
_exit:
237,693,114✔
1127
  if (code) {
237,693,114!
1128
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1129
              tstrerror(code));
1130
  }
1131
  return code;
237,695,834✔
1132
}
1133

1134
static FORCE_INLINE int32_t tsdbRowKeyCmprNullAsLargest(const STsdbRowKey *key1, const STsdbRowKey *key2) {
1135
  if (key1 == NULL) {
135,814,293!
1136
    return 1;
8✔
1137
  } else if (key2 == NULL) {
135,814,285!
1138
    return -1;
7,098✔
1139
  } else {
1140
    return tsdbRowKeyCmpr(key1, key2);
135,807,187✔
1141
  }
1142
}
1143

1144
static int32_t tsdbDataFileDoWriteTableOldData(SDataFileWriter *writer, const STsdbRowKey *key) {
49,205,335✔
1145
  if (writer->ctx->tbHasOldData == false) {
49,205,335!
1146
    return 0;
×
1147
  }
1148

1149
  int32_t     code = 0;
49,205,335✔
1150
  int32_t     lino = 0;
49,205,335✔
1151
  STsdbRowKey rowKey;
1152

1153
  for (;;) {
20,639✔
1154
    for (;;) {
1155
      // SBlockData
1156
      for (; writer->ctx->blockDataIdx < writer->ctx->blockData->nRow; writer->ctx->blockDataIdx++) {
125,276,562✔
1157
        TSDBROW row = tsdbRowFromBlockData(writer->ctx->blockData, writer->ctx->blockDataIdx);
125,171,880✔
1158

1159
        tsdbRowGetKey(&row, &rowKey);
125,171,880✔
1160
        if (tsdbRowKeyCmprNullAsLargest(&rowKey, key) < 0) {  // key <= rowKey
125,171,880✔
1161
          TAOS_CHECK_GOTO(tsdbDataFileDoWriteTSRow(writer, &row), &lino, _exit);
125,171,880!
1162
        } else {
1163
          goto _exit;
49,166,549✔
1164
        }
1165
      }
1166

1167
      // SBrinBlock
1168
      if (writer->ctx->brinBlockIdx >= writer->ctx->brinBlock->numOfRecords) {
104,682✔
1169
        break;
20,851✔
1170
      }
1171

1172
      for (; writer->ctx->brinBlockIdx < writer->ctx->brinBlock->numOfRecords; writer->ctx->brinBlockIdx++) {
5,366,208✔
1173
        SBrinRecord record;
1174
        code = tBrinBlockGet(writer->ctx->brinBlock, writer->ctx->brinBlockIdx, &record);
5,345,461✔
1175
        TSDB_CHECK_CODE(code, lino, _exit);
5,384,035!
1176
        if (record.uid != writer->ctx->tbid->uid) {
5,345,461✔
1177
          writer->ctx->tbHasOldData = false;
9,935✔
1178
          goto _exit;
9,935✔
1179
        }
1180

1181
        if (tsdbRowKeyCmprNullAsLargest(key, &record.firstKey) < 0) {  // key < record->firstKey
5,335,526✔
1182
          goto _exit;
28,639✔
1183
        } else {
1184
          SBrinRecord record[1];
1185
          code = tBrinBlockGet(writer->ctx->brinBlock, writer->ctx->brinBlockIdx, record);
5,306,887✔
1186
          TSDB_CHECK_CODE(code, lino, _exit);
5,306,887!
1187
          if (tsdbRowKeyCmprNullAsLargest(key, &record->lastKey) > 0) {  // key > record->lastKey
5,306,887✔
1188
            if (writer->blockData->nRow > 0) {
5,282,377✔
1189
              TAOS_CHECK_GOTO(tsdbDataFileDoWriteBlockData(writer, writer->blockData), &lino, _exit);
4!
1190
            }
1191

1192
            TAOS_CHECK_GOTO(tsdbDataFileWriteBrinRecord(writer, record), &lino, _exit);
5,282,377!
1193
          } else {
1194
            TAOS_CHECK_GOTO(tsdbDataFileReadBlockData(writer->ctx->reader, record, writer->ctx->blockData), &lino,
24,510!
1195
                            _exit);
1196

1197
            writer->ctx->blockDataIdx = 0;
24,510✔
1198
            writer->ctx->brinBlockIdx++;
24,510✔
1199
            break;
24,510✔
1200
          }
1201
        }
1202
      }
1203
    }
1204

1205
    // SBrinBlk
1206
    if (writer->ctx->brinBlkArrayIdx >= TARRAY2_SIZE(writer->ctx->brinBlkArray)) {
20,851✔
1207
      writer->ctx->brinBlkArray = NULL;
170✔
1208
      writer->ctx->tbHasOldData = false;
170✔
1209
      goto _exit;
170✔
1210
    } else {
1211
      const SBrinBlk *brinBlk = TARRAY2_GET_PTR(writer->ctx->brinBlkArray, writer->ctx->brinBlkArrayIdx);
20,681✔
1212

1213
      if (brinBlk->minTbid.uid != writer->ctx->tbid->uid) {
20,681✔
1214
        writer->ctx->tbHasOldData = false;
42✔
1215
        goto _exit;
42✔
1216
      }
1217

1218
      TAOS_CHECK_GOTO(tsdbDataFileReadBrinBlock(writer->ctx->reader, brinBlk, writer->ctx->brinBlock), &lino, _exit);
20,639!
1219

1220
      writer->ctx->brinBlockIdx = 0;
20,639✔
1221
      writer->ctx->brinBlkArrayIdx++;
20,639✔
1222
    }
1223
  }
1224

1225
_exit:
49,205,335✔
1226
  if (code) {
49,205,335!
1227
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1228
              tstrerror(code));
1229
  }
1230
  return code;
49,205,335✔
1231
}
1232

1233
static int32_t tsdbDataFileDoWriteTSData(SDataFileWriter *writer, TSDBROW *row) {
161,640,220✔
1234
  int32_t code = 0;
161,640,220✔
1235
  int32_t lino = 0;
161,640,220✔
1236

1237
  if (writer->ctx->tbHasOldData) {
161,640,220✔
1238
    STsdbRowKey key;
1239
    tsdbRowGetKey(row, &key);
49,205,020✔
1240
    TAOS_CHECK_GOTO(tsdbDataFileDoWriteTableOldData(writer, &key), &lino, _exit);
49,205,020!
1241
  }
1242

1243
  TAOS_CHECK_GOTO(tsdbDataFileDoWriteTSRow(writer, row), &lino, _exit);
161,640,220!
1244

1245
_exit:
161,677,623✔
1246
  if (code) {
161,677,623!
1247
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1248
              tstrerror(code));
1249
  }
1250
  return code;
161,684,411✔
1251
}
1252

1253
static int32_t tsdbDataFileWriteTableDataEnd(SDataFileWriter *writer) {
65,079✔
1254
  if (writer->ctx->tbid->uid == 0) {
65,079✔
1255
    return 0;
2,541✔
1256
  }
1257

1258
  int32_t code = 0;
62,538✔
1259
  int32_t lino = 0;
62,538✔
1260

1261
  if (writer->ctx->tbHasOldData) {
62,538✔
1262
    TAOS_CHECK_GOTO(tsdbDataFileDoWriteTableOldData(writer, NULL /* as the largest key */), &lino, _exit);
10!
1263
  }
1264

1265
  TAOS_CHECK_GOTO(tsdbDataFileDoWriteBlockData(writer, writer->blockData), &lino, _exit);
62,538!
1266

1267
_exit:
62,537✔
1268
  if (code) {
62,537!
1269
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1270
              tstrerror(code));
1271
  }
1272
  return code;
62,537✔
1273
}
1274

1275
static int32_t tsdbDataFileWriteTableDataBegin(SDataFileWriter *writer, const TABLEID *tbid) {
65,077✔
1276
  int32_t code = 0;
65,077✔
1277
  int32_t lino = 0;
65,077✔
1278

1279
  SMetaInfo info;
1280
  bool      drop = false;
65,077✔
1281
  TABLEID   tbid1[1];
1282
  writer->ctx->tbHasOldData = false;
65,077✔
1283
  while (writer->ctx->brinBlkArray) {  // skip data of previous table
66,766✔
1284
    for (; writer->ctx->brinBlockIdx < writer->ctx->brinBlock->numOfRecords; writer->ctx->brinBlockIdx++) {
393,567✔
1285
      SBrinRecord record;
1286
      TAOS_CHECK_GOTO(tBrinBlockGet(writer->ctx->brinBlock, writer->ctx->brinBlockIdx, &record), &lino, _exit);
391,743!
1287

1288
      if (record.uid == tbid->uid) {
391,675✔
1289
        writer->ctx->tbHasOldData = true;
10,147✔
1290
        goto _begin;
31,157✔
1291
      } else if (record.suid > tbid->suid || (record.suid == tbid->suid && record.uid > tbid->uid)) {
381,528✔
1292
        goto _begin;
21,010✔
1293
      } else {
1294
        if (record.uid != writer->ctx->tbid->uid) {
360,518✔
1295
          if (drop && tbid1->uid == record.uid) {
133,012!
1296
            continue;
205✔
1297
          } else if (metaGetInfo(writer->config->tsdb->pVnode->pMeta, record.uid, &info, NULL) != 0) {
133,012✔
1298
            drop = true;
205✔
1299
            tbid1->suid = record.suid;
205✔
1300
            tbid1->uid = record.uid;
205✔
1301
            continue;
205✔
1302
          } else {
1303
            drop = false;
132,812✔
1304
            writer->ctx->tbid->suid = record.suid;
132,812✔
1305
            writer->ctx->tbid->uid = record.uid;
132,812✔
1306
          }
1307
        }
1308

1309
        TAOS_CHECK_GOTO(tsdbDataFileWriteBrinRecord(writer, &record), &lino, _exit);
360,318!
1310
      }
1311
    }
1312

1313
    if (writer->ctx->brinBlkArrayIdx >= TARRAY2_SIZE(writer->ctx->brinBlkArray)) {
1,824✔
1314
      writer->ctx->brinBlkArray = NULL;
133✔
1315
      break;
133✔
1316
    } else {
1317
      const SBrinBlk *brinBlk = TARRAY2_GET_PTR(writer->ctx->brinBlkArray, writer->ctx->brinBlkArrayIdx);
1,691✔
1318

1319
      TAOS_CHECK_GOTO(tsdbDataFileReadBrinBlock(writer->ctx->reader, brinBlk, writer->ctx->brinBlock), &lino, _exit);
1,691!
1320

1321
      writer->ctx->brinBlockIdx = 0;
1,689✔
1322
      writer->ctx->brinBlkArrayIdx++;
1,689✔
1323
    }
1324
  }
1325

1326
_begin:
33,787✔
1327
  writer->ctx->tbid[0] = *tbid;
65,077✔
1328

1329
  if (tbid->uid == INT64_MAX) {
65,077✔
1330
    goto _exit;
2,541✔
1331
  }
1332

1333
  TAOS_CHECK_GOTO(tsdbUpdateSkmTb(writer->config->tsdb, tbid, writer->config->skmTb), &lino, _exit);
62,536!
1334
  TAOS_CHECK_GOTO(tBlockDataInit(writer->blockData, writer->ctx->tbid, writer->config->skmTb->pTSchema, NULL, 0), &lino,
62,537!
1335
                  _exit);
1336

1337
_exit:
62,538✔
1338
  if (code) {
65,079!
1339
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1340
              tstrerror(code));
1341
  }
1342
  return code;
65,079✔
1343
}
1344

1345
int32_t tsdbFileWriteHeadFooter(STsdbFD *fd, int64_t *fileSize, const SHeadFooter *footer, int32_t encryptAlgorithm,
2,541✔
1346
                                char *encryptKey) {
1347
  TAOS_CHECK_RETURN(
2,541!
1348
      tsdbWriteFile(fd, *fileSize, (const uint8_t *)footer, sizeof(*footer), encryptAlgorithm, encryptKey));
1349
  *fileSize += sizeof(*footer);
2,541✔
1350
  return 0;
2,541✔
1351
}
1352

1353
int32_t tsdbFileWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAlg, int64_t *fileSize,
9,807✔
1354
                               TTombBlkArray *tombBlkArray, SBuffer *buffers, SVersionRange *range,
1355
                               int32_t encryptAlgorithm, char *encryptKey) {
1356
  int32_t code;
1357

1358
  if (TOMB_BLOCK_SIZE(tombBlock) == 0) {
9,807!
1359
    return 0;
×
1360
  }
1361

1362
  SBuffer *buffer0 = buffers + 0;
9,807✔
1363
  SBuffer *assist = buffers + 1;
9,807✔
1364

1365
  STombBlk tombBlk = {
9,807✔
1366
      .dp[0] =
1367
          {
1368
              .offset = *fileSize,
9,807✔
1369
              .size = 0,
1370
          },
1371
      .numRec = TOMB_BLOCK_SIZE(tombBlock),
9,807✔
1372
      .cmprAlg = cmprAlg,
1373
  };
1374
  for (int i = 0; i < TOMB_BLOCK_SIZE(tombBlock); i++) {
347,190✔
1375
    STombRecord record;
1376
    TAOS_CHECK_RETURN(tTombBlockGet(tombBlock, i, &record));
337,383!
1377

1378
    if (i == 0) {
337,383✔
1379
      tombBlk.minTbid.suid = record.suid;
9,807✔
1380
      tombBlk.minTbid.uid = record.uid;
9,807✔
1381
      tombBlk.minVer = record.version;
9,807✔
1382
      tombBlk.maxVer = record.version;
9,807✔
1383
    }
1384
    if (i == TOMB_BLOCK_SIZE(tombBlock) - 1) {
337,383✔
1385
      tombBlk.maxTbid.suid = record.suid;
9,807✔
1386
      tombBlk.maxTbid.uid = record.uid;
9,807✔
1387
    }
1388
    if (record.version < tombBlk.minVer) {
337,383✔
1389
      tombBlk.minVer = record.version;
1,311✔
1390
    }
1391
    if (record.version > tombBlk.maxVer) {
337,383✔
1392
      tombBlk.maxVer = record.version;
178,365✔
1393
    }
1394
  }
1395

1396
  tsdbWriterUpdVerRange(range, tombBlk.minVer, tombBlk.maxVer);
9,807✔
1397

1398
  for (int32_t i = 0; i < ARRAY_SIZE(tombBlock->buffers); i++) {
58,837✔
1399
    tBufferClear(buffer0);
1400

1401
    SCompressInfo cinfo = {
49,035✔
1402
        .cmprAlg = cmprAlg,
1403
        .dataType = TSDB_DATA_TYPE_BIGINT,
1404
        .originalSize = tombBlock->buffers[i].size,
49,035✔
1405
    };
1406
    TAOS_CHECK_RETURN(tCompressDataToBuffer(tombBlock->buffers[i].data, &cinfo, buffer0, assist));
49,035!
1407
    TAOS_CHECK_RETURN(tsdbWriteFile(fd, *fileSize, buffer0->data, buffer0->size, encryptAlgorithm, encryptKey));
49,035✔
1408

1409
    tombBlk.size[i] = cinfo.compressedSize;
49,030✔
1410
    tombBlk.dp->size += tombBlk.size[i];
49,030✔
1411
    *fileSize += tombBlk.size[i];
49,030✔
1412
  }
1413

1414
  TAOS_CHECK_RETURN(TARRAY2_APPEND_PTR(tombBlkArray, &tombBlk));
19,604!
1415

1416
  tTombBlockClear(tombBlock);
9,802✔
1417
  return 0;
9,807✔
1418
}
1419

1420
static int32_t tsdbDataFileWriteHeadFooter(SDataFileWriter *writer) {
2,541✔
1421
  int32_t code = 0;
2,541✔
1422
  int32_t lino = 0;
2,541✔
1423

1424
  int32_t encryptAlgorithm = writer->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
2,541✔
1425
  char   *encryptKey = writer->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
2,541✔
1426

1427
  TAOS_CHECK_GOTO(tsdbFileWriteHeadFooter(writer->fd[TSDB_FTYPE_HEAD], &writer->files[TSDB_FTYPE_HEAD].size,
2,541!
1428
                                          writer->headFooter, encryptAlgorithm, encryptKey),
1429
                  &lino, _exit);
1430

1431
_exit:
2,541✔
1432
  if (code) {
2,541!
1433
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1434
              tstrerror(code));
1435
  }
1436
  return code;
2,541✔
1437
}
1438

1439
static int32_t tsdbDataFileDoWriteTombBlock(SDataFileWriter *writer) {
1,589✔
1440
  if (TOMB_BLOCK_SIZE(writer->tombBlock) == 0) return 0;
1,589!
1441

1442
  int32_t code = 0;
1,589✔
1443
  int32_t lino = 0;
1,589✔
1444

1445
  int32_t encryptAlgorithm = writer->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
1,589✔
1446
  char   *encryptKey = writer->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
1,589✔
1447

1448
  TAOS_CHECK_GOTO(tsdbFileWriteTombBlock(writer->fd[TSDB_FTYPE_TOMB], writer->tombBlock, writer->config->cmprAlg,
1,589!
1449
                                         &writer->files[TSDB_FTYPE_TOMB].size, writer->tombBlkArray, writer->buffers,
1450
                                         &writer->ctx->tombRange, encryptAlgorithm, encryptKey),
1451
                  &lino, _exit);
1452

1453
_exit:
1,589✔
1454
  if (code) {
1,589!
1455
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1456
              tstrerror(code));
1457
  }
1458
  return code;
1,589✔
1459
}
1460

1461
int32_t tsdbFileWriteTombBlk(STsdbFD *fd, const TTombBlkArray *tombBlkArray, SFDataPtr *ptr, int64_t *fileSize,
341,134✔
1462
                             int32_t encryptAlgorithm, char *encryptKey) {
1463
  ptr->size = TARRAY2_DATA_LEN(tombBlkArray);
341,134✔
1464
  if (ptr->size > 0) {
341,134✔
1465
    ptr->offset = *fileSize;
9,801✔
1466

1467
    TAOS_CHECK_RETURN(tsdbWriteFile(fd, *fileSize, (const uint8_t *)TARRAY2_DATA(tombBlkArray), ptr->size,
9,801!
1468
                                    encryptAlgorithm, encryptKey));
1469

1470
    *fileSize += ptr->size;
9,801✔
1471
  }
1472
  return 0;
341,134✔
1473
}
1474

1475
static int32_t tsdbDataFileDoWriteTombBlk(SDataFileWriter *writer) {
1,583✔
1476
  if (TARRAY2_SIZE(writer->tombBlkArray) <= 0) {
1,583!
1477
    return TSDB_CODE_INVALID_PARA;
×
1478
  }
1479

1480
  int32_t code = 0;
1,583✔
1481
  int32_t lino = 0;
1,583✔
1482

1483
  int32_t encryptAlgorithm = writer->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
1,583✔
1484
  char   *encryptKey = writer->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
1,583✔
1485

1486
  TAOS_CHECK_GOTO(
1,583!
1487
      tsdbFileWriteTombBlk(writer->fd[TSDB_FTYPE_TOMB], writer->tombBlkArray, writer->tombFooter->tombBlkPtr,
1488
                           &writer->files[TSDB_FTYPE_TOMB].size, encryptAlgorithm, encryptKey),
1489
      &lino, _exit);
1490

1491
_exit:
1,583✔
1492
  if (code) {
1,583!
1493
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1494
              tstrerror(code));
1495
  }
1496
  return code;
1,583✔
1497
}
1498

1499
int32_t tsdbFileWriteTombFooter(STsdbFD *fd, const STombFooter *footer, int64_t *fileSize, int32_t encryptAlgorithm,
1,583✔
1500
                                char *encryptKey) {
1501
  TAOS_CHECK_RETURN(
1,583!
1502
      tsdbWriteFile(fd, *fileSize, (const uint8_t *)footer, sizeof(*footer), encryptAlgorithm, encryptKey));
1503
  *fileSize += sizeof(*footer);
1,583✔
1504
  return 0;
1,583✔
1505
}
1506

1507
static int32_t tsdbDataFileWriteTombFooter(SDataFileWriter *writer) {
1,583✔
1508
  int32_t code = 0;
1,583✔
1509
  int32_t lino = 0;
1,583✔
1510

1511
  int32_t encryptAlgorithm = writer->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
1,583✔
1512
  char   *encryptKey = writer->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
1,583✔
1513

1514
  TAOS_CHECK_GOTO(tsdbFileWriteTombFooter(writer->fd[TSDB_FTYPE_TOMB], writer->tombFooter,
1,583!
1515
                                          &writer->files[TSDB_FTYPE_TOMB].size, encryptAlgorithm, encryptKey),
1516
                  &lino, _exit);
1517

1518
_exit:
1,583✔
1519
  if (code) {
1,583!
1520
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1521
              tstrerror(code));
1522
  }
1523
  return code;
1,583✔
1524
}
1525

1526
static int32_t tsdbDataFileDoWriteTombRecord(SDataFileWriter *writer, const STombRecord *record) {
19,029✔
1527
  int32_t code = 0;
19,029✔
1528
  int32_t lino = 0;
19,029✔
1529

1530
  while (writer->ctx->hasOldTomb) {
19,696✔
1531
    for (; writer->ctx->tombBlockIdx < TOMB_BLOCK_SIZE(writer->ctx->tombBlock); writer->ctx->tombBlockIdx++) {
251,875✔
1532
      STombRecord record1[1];
1533
      TAOS_CHECK_GOTO(tTombBlockGet(writer->ctx->tombBlock, writer->ctx->tombBlockIdx, record1), &lino, _exit);
250,546!
1534

1535
      int32_t c = tTombRecordCompare(record, record1);
250,546✔
1536
      if (c < 0) {
250,546✔
1537
        goto _write;
11,565✔
1538
      } else if (c > 0) {
238,981!
1539
        TAOS_CHECK_GOTO(tTombBlockPut(writer->tombBlock, record1), &lino, _exit);
238,981!
1540

1541
        tsdbTrace("vgId:%d write tomb record to tomb file:%s, cid:%" PRId64 ", suid:%" PRId64 ", uid:%" PRId64
238,981!
1542
                  ", version:%" PRId64,
1543
                  TD_VID(writer->config->tsdb->pVnode), writer->fd[TSDB_FTYPE_TOMB]->path, writer->config->cid,
1544
                  record1->suid, record1->uid, record1->version);
1545

1546
        if (TOMB_BLOCK_SIZE(writer->tombBlock) >= writer->config->maxRow) {
238,981✔
1547
          TAOS_CHECK_GOTO(tsdbDataFileDoWriteTombBlock(writer), &lino, _exit);
6!
1548
        }
1549
      } else {
1550
        tsdbError("vgId:%d duplicate tomb record, cid:%" PRId64 ", suid:%" PRId64 ", uid:%" PRId64 ", version:%" PRId64,
×
1551
                  TD_VID(writer->config->tsdb->pVnode), writer->config->cid, record->suid, record->uid,
1552
                  record->version);
1553
      }
1554
    }
1555

1556
    if (writer->ctx->tombBlkArrayIdx >= TARRAY2_SIZE(writer->ctx->tombBlkArray)) {
1,329✔
1557
      writer->ctx->hasOldTomb = false;
662✔
1558
      break;
662✔
1559
    } else {
1560
      const STombBlk *tombBlk = TARRAY2_GET_PTR(writer->ctx->tombBlkArray, writer->ctx->tombBlkArrayIdx);
667✔
1561

1562
      TAOS_CHECK_GOTO(tsdbDataFileReadTombBlock(writer->ctx->reader, tombBlk, writer->ctx->tombBlock), &lino, _exit);
667!
1563

1564
      writer->ctx->tombBlockIdx = 0;
667✔
1565
      writer->ctx->tombBlkArrayIdx++;
667✔
1566
    }
1567
  }
1568

1569
_write:
6,802✔
1570
  if (record->suid == INT64_MAX) {
19,029✔
1571
    goto _exit;
1,583✔
1572
  }
1573

1574
  TAOS_CHECK_GOTO(tTombBlockPut(writer->tombBlock, record), &lino, _exit);
17,446!
1575

1576
  tsdbTrace("vgId:%d write tomb record to tomb file:%s, cid:%" PRId64 ", suid:%" PRId64 ", uid:%" PRId64
17,446!
1577
            ", version:%" PRId64,
1578
            TD_VID(writer->config->tsdb->pVnode), writer->fd[TSDB_FTYPE_TOMB]->path, writer->config->cid, record->suid,
1579
            record->uid, record->version);
1580

1581
  if (TOMB_BLOCK_SIZE(writer->tombBlock) >= writer->config->maxRow) {
17,446!
UNCOV
1582
    TAOS_CHECK_GOTO(tsdbDataFileDoWriteTombBlock(writer), &lino, _exit);
×
1583
  }
1584

1585
_exit:
17,446✔
1586
  if (code) {
19,029!
1587
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1588
              tstrerror(code));
1589
  }
1590
  return code;
19,029✔
1591
}
1592

1593
int32_t tsdbFileWriteBrinBlk(STsdbFD *fd, TBrinBlkArray *brinBlkArray, SFDataPtr *ptr, int64_t *fileSize,
2,541✔
1594
                             int32_t encryptAlgorithm, char *encryptKey) {
1595
  if (TARRAY2_SIZE(brinBlkArray) <= 0) {
2,541!
1596
    return TSDB_CODE_INVALID_PARA;
×
1597
  }
1598
  ptr->offset = *fileSize;
2,541✔
1599
  ptr->size = TARRAY2_DATA_LEN(brinBlkArray);
2,541✔
1600

1601
  TAOS_CHECK_RETURN(
2,541!
1602
      tsdbWriteFile(fd, ptr->offset, (uint8_t *)TARRAY2_DATA(brinBlkArray), ptr->size, encryptAlgorithm, encryptKey));
1603

1604
  *fileSize += ptr->size;
2,541✔
1605
  return 0;
2,541✔
1606
}
1607

1608
static int32_t tsdbDataFileWriteBrinBlk(SDataFileWriter *writer) {
2,541✔
1609
  int32_t code = 0;
2,541✔
1610
  int32_t lino = 0;
2,541✔
1611

1612
  int32_t encryptAlgorithm = writer->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
2,541✔
1613
  char   *encryptKey = writer->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
2,541✔
1614

1615
  TAOS_CHECK_GOTO(
2,541!
1616
      tsdbFileWriteBrinBlk(writer->fd[TSDB_FTYPE_HEAD], writer->brinBlkArray, writer->headFooter->brinBlkPtr,
1617
                           &writer->files[TSDB_FTYPE_HEAD].size, encryptAlgorithm, encryptKey),
1618
      &lino, _exit);
1619

1620
_exit:
2,541✔
1621
  if (code) {
2,541!
1622
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1623
              tstrerror(code));
1624
  }
1625
  return code;
2,541✔
1626
}
1627

1628
void tsdbTFileUpdVerRange(STFile *f, SVersionRange range) {
352,950✔
1629
  f->minVer = TMIN(f->minVer, range.minVer);
352,950✔
1630
  f->maxVer = TMAX(f->maxVer, range.maxVer);
352,950✔
1631
}
352,950✔
1632

1633
static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer, TFileOpArray *opArr) {
3,534✔
1634
  int32_t code = 0;
3,534✔
1635
  int32_t lino = 0;
3,534✔
1636

1637
  int32_t  ftype;
1638
  STFileOp op;
1639

1640
  if (writer->fd[TSDB_FTYPE_HEAD]) {
3,534✔
1641
    TABLEID tbid[1] = {{
2,541✔
1642
        .suid = INT64_MAX,
1643
        .uid = INT64_MAX,
1644
    }};
1645

1646
    TAOS_CHECK_GOTO(tsdbDataFileWriteTableDataEnd(writer), &lino, _exit);
2,541!
1647
    TAOS_CHECK_GOTO(tsdbDataFileWriteTableDataBegin(writer, tbid), &lino, _exit);
2,541!
1648
    TAOS_CHECK_GOTO(tsdbDataFileWriteBrinBlock(writer), &lino, _exit);
2,541!
1649
    TAOS_CHECK_GOTO(tsdbDataFileWriteBrinBlk(writer), &lino, _exit);
2,541!
1650
    TAOS_CHECK_GOTO(tsdbDataFileWriteHeadFooter(writer), &lino, _exit);
2,541!
1651

1652
    SVersionRange ofRange = {.minVer = VERSION_MAX, .maxVer = VERSION_MIN};
2,541✔
1653

1654
    // .head
1655
    ftype = TSDB_FTYPE_HEAD;
2,541✔
1656
    if (writer->config->files[ftype].exist) {
2,541✔
1657
      op = (STFileOp){
303✔
1658
          .optype = TSDB_FOP_REMOVE,
1659
          .fid = writer->config->fid,
303✔
1660
          .of = writer->config->files[ftype].file,
303✔
1661
      };
1662
      ofRange = (SVersionRange){.minVer = op.of.minVer, .maxVer = op.of.maxVer};
303✔
1663
      TAOS_CHECK_GOTO(TARRAY2_APPEND(opArr, op), &lino, _exit);
606!
1664
    }
1665
    op = (STFileOp){
2,541✔
1666
        .optype = TSDB_FOP_CREATE,
1667
        .fid = writer->config->fid,
2,541✔
1668
        .nf = writer->files[ftype],
2,541✔
1669
    };
1670
    tsdbTFileUpdVerRange(&op.nf, ofRange);
2,541✔
1671
    tsdbTFileUpdVerRange(&op.nf, writer->ctx->range);
2,541✔
1672
    TAOS_CHECK_GOTO(TARRAY2_APPEND(opArr, op), &lino, _exit);
5,082!
1673

1674
    // .data
1675
    ftype = TSDB_FTYPE_DATA;
2,541✔
1676
    if (!writer->config->files[ftype].exist) {
2,541✔
1677
      op = (STFileOp){
2,238✔
1678
          .optype = TSDB_FOP_CREATE,
1679
          .fid = writer->config->fid,
2,238✔
1680
          .nf = writer->files[ftype],
2,238✔
1681
      };
1682
      tsdbTFileUpdVerRange(&op.nf, writer->ctx->range);
2,238✔
1683
      TAOS_CHECK_GOTO(TARRAY2_APPEND(opArr, op), &lino, _exit);
4,476!
1684
    } else if (writer->config->files[ftype].file.size != writer->files[ftype].size) {
303!
1685
      op = (STFileOp){
303✔
1686
          .optype = TSDB_FOP_MODIFY,
1687
          .fid = writer->config->fid,
303✔
1688
          .of = writer->config->files[ftype].file,
303✔
1689
          .nf = writer->files[ftype],
303✔
1690
      };
1691
      tsdbTFileUpdVerRange(&op.nf, writer->ctx->range);
303✔
1692
      TAOS_CHECK_GOTO(TARRAY2_APPEND(opArr, op), &lino, _exit);
606!
1693
    }
1694

1695
    // .sma
1696
    ftype = TSDB_FTYPE_SMA;
2,541✔
1697
    if (!writer->config->files[ftype].exist) {
2,541✔
1698
      op = (STFileOp){
2,238✔
1699
          .optype = TSDB_FOP_CREATE,
1700
          .fid = writer->config->fid,
2,238✔
1701
          .nf = writer->files[ftype],
2,238✔
1702
      };
1703
      tsdbTFileUpdVerRange(&op.nf, writer->ctx->range);
2,238✔
1704
      TAOS_CHECK_GOTO(TARRAY2_APPEND(opArr, op), &lino, _exit);
4,476!
1705
    } else if (writer->config->files[ftype].file.size != writer->files[ftype].size) {
303!
1706
      op = (STFileOp){
303✔
1707
          .optype = TSDB_FOP_MODIFY,
1708
          .fid = writer->config->fid,
303✔
1709
          .of = writer->config->files[ftype].file,
303✔
1710
          .nf = writer->files[ftype],
303✔
1711
      };
1712
      tsdbTFileUpdVerRange(&op.nf, writer->ctx->range);
303✔
1713
      TAOS_CHECK_GOTO(TARRAY2_APPEND(opArr, op), &lino, _exit);
606!
1714
    }
1715
  }
1716

1717
  if (writer->fd[TSDB_FTYPE_TOMB]) {
3,534✔
1718
    STombRecord record[1] = {{
1,583✔
1719
        .suid = INT64_MAX,
1720
        .uid = INT64_MAX,
1721
        .version = INT64_MAX,
1722
    }};
1723

1724
    TAOS_CHECK_GOTO(tsdbDataFileDoWriteTombRecord(writer, record), &lino, _exit);
1,583!
1725
    TAOS_CHECK_GOTO(tsdbDataFileDoWriteTombBlock(writer), &lino, _exit);
1,583!
1726
    TAOS_CHECK_GOTO(tsdbDataFileDoWriteTombBlk(writer), &lino, _exit);
1,583!
1727
    TAOS_CHECK_GOTO(tsdbDataFileWriteTombFooter(writer), &lino, _exit);
1,583!
1728

1729
    SVersionRange ofRange = (SVersionRange){.minVer = VERSION_MAX, .maxVer = VERSION_MIN};
1,583✔
1730

1731
    ftype = TSDB_FTYPE_TOMB;
1,583✔
1732
    if (writer->config->files[ftype].exist) {
1,583✔
1733
      op = (STFileOp){
662✔
1734
          .optype = TSDB_FOP_REMOVE,
1735
          .fid = writer->config->fid,
662✔
1736
          .of = writer->config->files[ftype].file,
662✔
1737
      };
1738
      ofRange = (SVersionRange){.minVer = op.of.minVer, .maxVer = op.of.maxVer};
662✔
1739
      TAOS_CHECK_GOTO(TARRAY2_APPEND(opArr, op), &lino, _exit);
1,324!
1740
    }
1741
    op = (STFileOp){
1,583✔
1742
        .optype = TSDB_FOP_CREATE,
1743
        .fid = writer->config->fid,
1,583✔
1744
        .nf = writer->files[ftype],
1,583✔
1745
    };
1746
    tsdbTFileUpdVerRange(&op.nf, ofRange);
1,583✔
1747
    tsdbTFileUpdVerRange(&op.nf, writer->ctx->tombRange);
1,583✔
1748
    TAOS_CHECK_GOTO(TARRAY2_APPEND(opArr, op), &lino, _exit);
3,166!
1749
  }
1750
  int32_t encryptAlgorithm = writer->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
3,534✔
1751
  char   *encryptKey = writer->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
3,534✔
1752
  for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) {
17,670✔
1753
    if (writer->fd[i]) {
14,136✔
1754
      TAOS_CHECK_GOTO(tsdbFsyncFile(writer->fd[i], encryptAlgorithm, encryptKey), &lino, _exit);
9,206!
1755
      tsdbCloseFile(&writer->fd[i]);
9,205✔
1756
    }
1757
  }
1758

1759
_exit:
3,534✔
1760
  if (code) {
3,534!
1761
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1762
              tstrerror(code));
1763
  }
1764
  return code;
3,534✔
1765
}
1766

1767
static int32_t tsdbDataFileWriterOpenDataFD(SDataFileWriter *writer) {
2,541✔
1768
  int32_t code = 0;
2,541✔
1769
  int32_t lino = 0;
2,541✔
1770

1771
  int32_t ftypes[] = {TSDB_FTYPE_HEAD, TSDB_FTYPE_DATA, TSDB_FTYPE_SMA};
2,541✔
1772

1773
  for (int32_t i = 0; i < ARRAY_SIZE(ftypes); ++i) {
10,162✔
1774
    int32_t ftype = ftypes[i];
7,622✔
1775

1776
    char    fname[TSDB_FILENAME_LEN];
1777
    int32_t flag = TD_FILE_READ | TD_FILE_WRITE;
7,622✔
1778

1779
    if (writer->files[ftype].size == 0) {
7,622✔
1780
      flag |= (TD_FILE_CREATE | TD_FILE_TRUNC);
7,017✔
1781
    }
1782

1783
    int32_t lcn = writer->files[ftype].lcn;
7,622✔
1784
    tsdbTFileName(writer->config->tsdb, &writer->files[ftype], fname);
7,622✔
1785
    TAOS_CHECK_GOTO(tsdbOpenFile(fname, writer->config->tsdb, flag, &writer->fd[ftype], lcn), &lino, _exit);
7,623!
1786

1787
    if (writer->files[ftype].size == 0) {
7,624✔
1788
      uint8_t hdr[TSDB_FHDR_SIZE] = {0};
7,017✔
1789

1790
      int32_t encryptAlgorithm = writer->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
7,017✔
1791
      char   *encryptKey = writer->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
7,017✔
1792

1793
      TAOS_CHECK_GOTO(tsdbWriteFile(writer->fd[ftype], 0, hdr, TSDB_FHDR_SIZE, encryptAlgorithm, encryptKey), &lino,
7,017!
1794
                      _exit);
1795

1796
      writer->files[ftype].size += TSDB_FHDR_SIZE;
7,014✔
1797
    }
1798
  }
1799

1800
  if (writer->ctx->reader) {
2,540✔
1801
    TAOS_CHECK_GOTO(tsdbDataFileReadBrinBlk(writer->ctx->reader, &writer->ctx->brinBlkArray), &lino, _exit);
303!
1802
  }
1803

1804
_exit:
2,540✔
1805
  if (code) {
2,540!
1806
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1807
              tstrerror(code));
1808
  }
1809
  return code;
2,540✔
1810
}
1811

1812
int32_t tsdbDataFileWriterOpen(const SDataFileWriterConfig *config, SDataFileWriter **writer) {
7,478✔
1813
  writer[0] = taosMemoryCalloc(1, sizeof(*writer[0]));
7,478!
1814
  if (!writer[0]) {
7,477!
1815
    return terrno;
×
1816
  }
1817

1818
  writer[0]->config[0] = config[0];
7,477✔
1819
  return 0;
7,477✔
1820
}
1821

1822
int32_t tsdbDataFileWriterClose(SDataFileWriter **writer, bool abort, TFileOpArray *opArr) {
7,480✔
1823
  if (writer[0] == NULL) return 0;
7,480!
1824

1825
  int32_t code = 0;
7,480✔
1826
  int32_t lino = 0;
7,480✔
1827

1828
  if (writer[0]->ctx->opened) {
7,480✔
1829
    if (abort) {
3,534!
1830
      TAOS_CHECK_GOTO(tsdbDataFileWriterCloseAbort(writer[0]), &lino, _exit);
×
1831
    } else {
1832
      TAOS_CHECK_GOTO(tsdbDataFileWriterCloseCommit(writer[0], opArr), &lino, _exit);
3,534!
1833
    }
1834
    tsdbDataFileWriterDoClose(writer[0]);
3,534✔
1835
  }
1836
  taosMemoryFree(writer[0]);
7,480!
1837
  writer[0] = NULL;
7,480✔
1838

1839
_exit:
7,480✔
1840
  if (code) {
7,480!
1841
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer[0]->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1842
              tstrerror(code));
1843
  }
1844
  return code;
7,480✔
1845
}
1846

1847
int32_t tsdbDataFileWriteRow(SDataFileWriter *writer, SRowInfo *row) {
161,636,711✔
1848
  int32_t code = 0;
161,636,711✔
1849
  int32_t lino = 0;
161,636,711✔
1850

1851
  if (!writer->ctx->opened) {
161,636,711✔
1852
    TAOS_CHECK_GOTO(tsdbDataFileWriterDoOpen(writer), &lino, _exit);
76!
1853
  }
1854

1855
  if (writer->fd[TSDB_FTYPE_HEAD] == NULL) {
161,636,711✔
1856
    TAOS_CHECK_GOTO(tsdbDataFileWriterOpenDataFD(writer), &lino, _exit);
76!
1857
  }
1858

1859
  if (row->uid != writer->ctx->tbid->uid) {
161,636,711✔
1860
    TAOS_CHECK_GOTO(tsdbDataFileWriteTableDataEnd(writer), &lino, _exit);
15,905!
1861
    TAOS_CHECK_GOTO(tsdbDataFileWriteTableDataBegin(writer, (TABLEID *)row), &lino, _exit);
15,905!
1862
  }
1863

1864
  TAOS_CHECK_GOTO(tsdbDataFileDoWriteTSData(writer, &row->row), &lino, _exit);
161,636,711!
1865

1866
_exit:
161,657,906✔
1867
  if (code) {
161,657,906!
1868
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1869
              tstrerror(code));
1870
  }
1871
  return code;
161,660,469✔
1872
}
1873

1874
int32_t tsdbDataFileWriteBlockData(SDataFileWriter *writer, SBlockData *bData) {
121,442✔
1875
  if (bData->nRow == 0) {
121,442!
1876
    return 0;
×
1877
  }
1878

1879
  int32_t code = 0;
121,442✔
1880
  int32_t lino = 0;
121,442✔
1881

1882
  if (!bData->uid) {
121,442!
1883
    return TSDB_CODE_INVALID_PARA;
×
1884
  }
1885

1886
  if (!writer->ctx->opened) {
121,442✔
1887
    TAOS_CHECK_GOTO(tsdbDataFileWriterDoOpen(writer), &lino, _exit);
2,388!
1888
  }
1889

1890
  if (writer->fd[TSDB_FTYPE_DATA] == NULL) {
121,442✔
1891
    TAOS_CHECK_GOTO(tsdbDataFileWriterOpenDataFD(writer), &lino, _exit);
2,465!
1892
  }
1893

1894
  if (bData->uid != writer->ctx->tbid->uid) {
121,442✔
1895
    TAOS_CHECK_GOTO(tsdbDataFileWriteTableDataEnd(writer), &lino, _exit);
46,632!
1896
    TAOS_CHECK_GOTO(tsdbDataFileWriteTableDataBegin(writer, (TABLEID *)bData), &lino, _exit);
46,631!
1897
  }
1898

1899
  if (writer->ctx->tbHasOldData) {
121,443✔
1900
    STsdbRowKey key;
1901

1902
    tsdbRowGetKey(&tsdbRowFromBlockData(bData, 0), &key);
305✔
1903
    TAOS_CHECK_GOTO(tsdbDataFileDoWriteTableOldData(writer, &key), &lino, _exit);
305!
1904
  }
1905

1906
  if (!writer->ctx->tbHasOldData       //
121,443✔
1907
      && writer->blockData->nRow == 0  //
121,433!
1908
  ) {
1909
    TAOS_CHECK_GOTO(tsdbDataFileDoWriteBlockData(writer, bData), &lino, _exit);
121,434!
1910

1911
  } else {
1912
    for (int32_t i = 0; i < bData->nRow; ++i) {
18,237✔
1913
      TSDBROW row[1] = {tsdbRowFromBlockData(bData, i)};
18,228✔
1914
      TAOS_CHECK_GOTO(tsdbDataFileDoWriteTSData(writer, row), &lino, _exit);
18,228!
1915
    }
1916
  }
1917

1918
_exit:
9✔
1919
  if (code) {
121,440!
1920
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1921
              tstrerror(code));
1922
  }
1923
  return code;
121,439✔
1924
}
1925

1926
int32_t tsdbDataFileFlush(SDataFileWriter *writer) {
29,845✔
1927
  if (!writer->ctx->opened) {
29,845!
1928
    return TSDB_CODE_INVALID_PARA;
×
1929
  }
1930

1931
  if (writer->blockData->nRow == 0) return 0;
29,845!
1932
  if (writer->ctx->tbHasOldData) return 0;
29,845✔
1933

1934
  return tsdbDataFileDoWriteBlockData(writer, writer->blockData);
20,007✔
1935
}
1936

1937
static int32_t tsdbDataFileWriterOpenTombFD(SDataFileWriter *writer) {
1,580✔
1938
  int32_t code = 0;
1,580✔
1939
  int32_t lino = 0;
1,580✔
1940

1941
  char    fname[TSDB_FILENAME_LEN];
1942
  int32_t ftype = TSDB_FTYPE_TOMB;
1,580✔
1943

1944
  int32_t flag = (TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
1,580✔
1945

1946
  int32_t lcn = writer->files[ftype].lcn;
1,580✔
1947
  tsdbTFileName(writer->config->tsdb, writer->files + ftype, fname);
1,580✔
1948

1949
  TAOS_CHECK_GOTO(tsdbOpenFile(fname, writer->config->tsdb, flag, &writer->fd[ftype], lcn), &lino, _exit);
1,583!
1950

1951
  uint8_t hdr[TSDB_FHDR_SIZE] = {0};
1,582✔
1952
  int32_t encryptAlgorithm = writer->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
1,582✔
1953
  char   *encryptKey = writer->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
1,582✔
1954

1955
  TAOS_CHECK_GOTO(tsdbWriteFile(writer->fd[ftype], 0, hdr, TSDB_FHDR_SIZE, encryptAlgorithm, encryptKey), &lino, _exit);
1,582!
1956
  writer->files[ftype].size += TSDB_FHDR_SIZE;
1,583✔
1957

1958
  if (writer->ctx->reader) {
1,583✔
1959
    TAOS_CHECK_GOTO(tsdbDataFileReadTombBlk(writer->ctx->reader, &writer->ctx->tombBlkArray), &lino, _exit);
964!
1960

1961
    if (TARRAY2_SIZE(writer->ctx->tombBlkArray) > 0) {
964✔
1962
      writer->ctx->hasOldTomb = true;
662✔
1963
    }
1964

1965
    writer->ctx->tombBlkArrayIdx = 0;
964✔
1966
    tTombBlockClear(writer->ctx->tombBlock);
964✔
1967
    writer->ctx->tombBlockIdx = 0;
964✔
1968
  }
1969

1970
_exit:
619✔
1971
  if (code) {
1,583!
1972
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1973
              tstrerror(code));
1974
  }
1975
  return code;
1,583✔
1976
}
1977

1978
int32_t tsdbDataFileWriteTombRecord(SDataFileWriter *writer, const STombRecord *record) {
17,446✔
1979
  int32_t code = 0;
17,446✔
1980
  int32_t lino = 0;
17,446✔
1981

1982
  if (!writer->ctx->opened) {
17,446✔
1983
    TAOS_CHECK_GOTO(tsdbDataFileWriterDoOpen(writer), &lino, _exit);
1,070!
1984
  }
1985

1986
  if (writer->fd[TSDB_FTYPE_TOMB] == NULL) {
17,445✔
1987
    TAOS_CHECK_GOTO(tsdbDataFileWriterOpenTombFD(writer), &lino, _exit);
1,581!
1988
  }
1989

1990
  TAOS_CHECK_GOTO(tsdbDataFileDoWriteTombRecord(writer, record), &lino, _exit);
17,447!
1991

1992
_exit:
17,446✔
1993
  if (code) {
17,446!
1994
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1995
              tstrerror(code));
1996
  }
1997
  return code;
17,446✔
1998
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc