• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4380

25 Jun 2025 06:58AM UTC coverage: 62.307% (-0.09%) from 62.393%
#4380

push

travis-ci

web-flow
feat(mqtt): mqtt subscription (#30127)

* feat(mqtt): Initial commit for mqtt

* chore(xnode/mnd): xnode message handlers for mnode

* chore(mnd/xnode): mnode part for xnode

* chore(xnode/translater): fix show commands

* fix(ast/creater): fix xnode create option

* fix(xnode/ci): fix ci & doc's error codes

* chore(xnode/sql): make create/drop/show work properly

* fix(xnode/sql): commit new files

* fix(xnode/sql): commit cmake files

* fix: fix testing cases

* fix(xnode/tsc): fix tokens

* fix(ast/anode): fix anode update decl.

* fix(xnode/error): fix xnode error codes

* fix: xnode make/destroy

* chore: xnode with option & dnode id

* chore: use taosmqtt for xnode

* chore: new error code for xnode launching

* chore(xnode): new error code

* chore: header for _xnode_mgmt_mqtt

* chore: source for _xnode_mgmt_mqtt

* chore: remove test directory from cmake

* chore: remove taosmqtt for ci to compile

* chore: remove taosudf header from xnode

* chore: new window macro

* chore: remove xnode mgmt mqtt for windows compilation

* Revert "chore: remove xnode mgmt mqtt for windows compilation"

This reverts commit 197e1640c.

* chore: cleanup code

* chore: xnode mgmt comment windows part out

* chore: mgmt/mqtt, move uv head toppest

* xnode/mnode: create xnode once per dnode

* fix(xnode/systable/test): fix column count

* xnode/sdb: renumber sdb type for xnode to make start/stop order correct

* xnode/mqtt: new param mqttPort

* fix SXnode's struct type

* transfer dnode id to mqtt subscription

* tmqtt: remove uv_a linking

* tmqtt/tools: sources for tools

* tools: fix windows compilation

* tools/producer: fix windows sleep param

* tools/producer: fix uninited var rc

* make tools only for linux

* test/mnodes: wail 1 or 2 seconds for offline to be leader

* update topic producer tool for geometry data type testing

* format tool sql statements

* show xnodes' ep

* make shell auto complete xnodes

* use usleep... (continued)

156642 of 320746 branches covered (48.84%)

Branch coverage included in aggregate %.

61 of 1020 new or added lines in 21 files covered. (5.98%)

1736 existing lines in 172 files now uncovered.

242538 of 319922 relevant lines covered (75.81%)

6277604.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.77
/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tsdbDataFileRW.h"
17
#include "meta.h"
18

19
// SDataFileReader =============================================
20
struct SDataFileReader {
21
  SDataFileReaderConfig config[1];
22

23
  SBuffer  local[10];
24
  SBuffer *buffers;
25

26
  struct {
27
    bool headFooterLoaded;
28
    bool tombFooterLoaded;
29
    bool brinBlkLoaded;
30
    bool tombBlkLoaded;
31
  } ctx[1];
32

33
  STsdbFD *fd[TSDB_FTYPE_MAX];
34

35
  SHeadFooter   headFooter[1];
36
  STombFooter   tombFooter[1];
37
  TBrinBlkArray brinBlkArray[1];
38
  TTombBlkArray tombBlkArray[1];
39
};
40

41
static int32_t tsdbDataFileReadHeadFooter(SDataFileReader *reader) {
136,498✔
42
  if (reader->ctx->headFooterLoaded) {
136,498!
43
    return 0;
×
44
  }
45

46
  int32_t code = 0;
136,498✔
47
  int32_t lino = 0;
136,498✔
48

49
  int32_t ftype = TSDB_FTYPE_HEAD;
136,498✔
50
  if (reader->fd[ftype]) {
136,498✔
51
    int32_t encryptAlgorithm = reader->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
129,965✔
52
    char   *encryptKey = reader->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
129,965✔
53
#if 1
54
    TAOS_CHECK_GOTO(tsdbReadFile(reader->fd[ftype], reader->config->files[ftype].file.size - sizeof(SHeadFooter),
129,965!
55
                                 (uint8_t *)reader->headFooter, sizeof(SHeadFooter), 0, encryptAlgorithm, encryptKey),
56
                    &lino, _exit);
57
#else
58
    int64_t size = reader->config->files[ftype].file.size;
59
    for (; size > TSDB_FHDR_SIZE; size--) {
60
      code = tsdbReadFile(reader->fd[ftype], size - sizeof(SHeadFooter), (uint8_t *)reader->headFooter,
61
                          sizeof(SHeadFooter), 0, encryptAlgorithm, encryptKey);
62
      if (code) continue;
63
      if (reader->headFooter->brinBlkPtr->offset + reader->headFooter->brinBlkPtr->size + sizeof(SHeadFooter) == size) {
64
        break;
65
      }
66
    }
67
    if (size <= TSDB_FHDR_SIZE) {
68
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
69
    }
70
#endif
71
  }
72

73
  reader->ctx->headFooterLoaded = true;
136,450✔
74

75
_exit:
136,450✔
76
  if (code) {
136,450!
77
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, lino,
×
78
              tstrerror(code));
79
  }
80
  return code;
136,467✔
81
}
82

83
static int32_t tsdbDataFileReadTombFooter(SDataFileReader *reader) {
7,709✔
84
  if (reader->ctx->tombFooterLoaded) {
7,709!
85
    return 0;
×
86
  }
87

88
  int32_t code = 0;
7,709✔
89
  int32_t lino = 0;
7,709✔
90

91
  int32_t ftype = TSDB_FTYPE_TOMB;
7,709✔
92
  if (reader->fd[ftype]) {
7,709✔
93
    int32_t encryptAlgorithm = reader->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
7,406✔
94
    char   *encryptKey = reader->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
7,406✔
95
    TAOS_CHECK_GOTO(tsdbReadFile(reader->fd[ftype], reader->config->files[ftype].file.size - sizeof(STombFooter),
7,406!
96
                                 (uint8_t *)reader->tombFooter, sizeof(STombFooter), 0, encryptAlgorithm, encryptKey),
97
                    &lino, _exit);
98
  }
99
  reader->ctx->tombFooterLoaded = true;
7,709✔
100

101
_exit:
7,709✔
102
  if (code) {
7,709!
103
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, lino,
×
104
              tstrerror(code));
105
  }
106
  return code;
7,709✔
107
}
108

109
int32_t tsdbDataFileReaderOpen(const char *fname[], const SDataFileReaderConfig *config, SDataFileReader **reader) {
137,487✔
110
  int32_t code = 0;
137,487✔
111
  int32_t lino = 0;
137,487✔
112

113
  if ((*reader = taosMemoryCalloc(1, sizeof(**reader))) == NULL) {
137,487!
114
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
115
  }
116

117
  for (int32_t i = 0; i < ARRAY_SIZE(reader[0]->local); i++) {
1,511,783✔
118
    tBufferInit(reader[0]->local + i);
1,374,312✔
119
  }
120

121
  reader[0]->config[0] = config[0];
137,471✔
122
  reader[0]->buffers = config->buffers;
137,471✔
123
  if (reader[0]->buffers == NULL) {
137,471✔
124
    reader[0]->buffers = reader[0]->local;
136,606✔
125
  }
126

127
  if (fname) {
137,471✔
128
    for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) {
682,781✔
129
      if (fname[i]) {
546,218✔
130
        int32_t lcn = config->files[i].file.lcn;
396,847✔
131
        TAOS_CHECK_GOTO(tsdbOpenFile(fname[i], config->tsdb, TD_FILE_READ, &reader[0]->fd[i], lcn), &lino, _exit);
396,847!
132
      }
133
    }
134
  } else {
135
    for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) {
4,474✔
136
      if (config->files[i].exist) {
3,588✔
137
        char fname1[TSDB_FILENAME_LEN];
138
        tsdbTFileName(config->tsdb, &config->files[i].file, fname1);
1,608✔
139
        int32_t lcn = config->files[i].file.lcn;
1,608✔
140
        TAOS_CHECK_GOTO(tsdbOpenFile(fname1, config->tsdb, TD_FILE_READ, &reader[0]->fd[i], lcn), &lino, _exit);
1,608!
141
      }
142
    }
143
  }
144

145
_exit:
886✔
146
  if (code) {
137,449!
147
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(config->tsdb->pVnode), __func__, __FILE__, lino,
×
148
              tstrerror(code));
149
  }
150
  return code;
137,435✔
151
}
152

153
void tsdbDataFileReaderClose(SDataFileReader **reader) {
158,534✔
154
  if (reader[0] == NULL) {
158,534✔
155
    return;
21,136✔
156
  }
157

158
  TARRAY2_DESTROY(reader[0]->tombBlkArray, NULL);
137,398!
159
  TARRAY2_DESTROY(reader[0]->brinBlkArray, NULL);
137,398!
160

161
  for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) {
687,356✔
162
    if (reader[0]->fd[i]) {
549,705✔
163
      tsdbCloseFile(&reader[0]->fd[i]);
398,361✔
164
    }
165
  }
166

167
  for (int32_t i = 0; i < ARRAY_SIZE(reader[0]->local); ++i) {
1,512,337✔
168
    tBufferDestroy(reader[0]->local + i);
1,374,632✔
169
  }
170

171
  taosMemoryFree(reader[0]);
137,705✔
172
  reader[0] = NULL;
137,493✔
173
}
174

175
int32_t tsdbDataFileReadBrinBlk(SDataFileReader *reader, const TBrinBlkArray **brinBlkArray) {
136,514✔
176
  int32_t code = 0;
136,514✔
177
  int32_t lino = 0;
136,514✔
178
  void   *data = NULL;
136,514✔
179

180
  if (!reader->ctx->brinBlkLoaded) {
136,514!
181
    TAOS_CHECK_GOTO(tsdbDataFileReadHeadFooter(reader), &lino, _exit);
136,526!
182

183
    if (reader->headFooter->brinBlkPtr->size > 0) {
136,465✔
184
      data = taosMemoryMalloc(reader->headFooter->brinBlkPtr->size);
129,876!
185
      if (data == NULL) {
129,924!
186
        TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
187
      }
188

189
      int32_t encryptAlgorithm = reader->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
129,924✔
190
      char   *encryptKey = reader->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
129,924✔
191

192
      TAOS_CHECK_GOTO(tsdbReadFile(reader->fd[TSDB_FTYPE_HEAD], reader->headFooter->brinBlkPtr->offset, data,
129,924!
193
                                   reader->headFooter->brinBlkPtr->size, 0, encryptAlgorithm, encryptKey),
194
                      &lino, _exit);
195

196
      int32_t size = reader->headFooter->brinBlkPtr->size / sizeof(SBrinBlk);
129,902✔
197
      TARRAY2_INIT_EX(reader->brinBlkArray, size, size, data);
129,902✔
198
    } else {
199
      TARRAY2_INIT(reader->brinBlkArray);
6,589✔
200
    }
201

202
    reader->ctx->brinBlkLoaded = true;
136,491✔
203
  }
204
  brinBlkArray[0] = reader->brinBlkArray;
136,479✔
205

206
_exit:
136,479✔
207
  if (code) {
136,479!
208
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, lino,
×
209
              tstrerror(code));
210
    taosMemoryFree(data);
×
211
  }
212
  return code;
136,436✔
213
}
214

215
int32_t tsdbDataFileReadBrinBlock(SDataFileReader *reader, const SBrinBlk *brinBlk, SBrinBlock *brinBlock) {
130,040✔
216
  int32_t code = 0;
130,040✔
217
  int32_t lino = 0;
130,040✔
218

219
  SBuffer *buffer = reader->buffers + 0;
130,040✔
220
  SBuffer *assist = reader->buffers + 1;
130,040✔
221

222
  int32_t encryptAlgorithm = reader->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
130,040✔
223
  char   *encryptKey = reader->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
130,040✔
224
  // load data
225
  tBufferClear(buffer);
226
  TAOS_CHECK_GOTO(tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_HEAD], brinBlk->dp->offset, brinBlk->dp->size, buffer, 0,
130,040!
227
                                       encryptAlgorithm, encryptKey),
228
                  &lino, _exit);
229

230
  // decode brin block
231
  SBufferReader br = BUFFER_READER_INITIALIZER(0, buffer);
130,088✔
232
  tBrinBlockClear(brinBlock);
130,088✔
233
  brinBlock->numOfPKs = brinBlk->numOfPKs;
130,097✔
234
  brinBlock->numOfRecords = brinBlk->numRec;
130,097✔
235
  for (int32_t i = 0; i < 10; i++) {  // int64_t
1,429,397✔
236

237
    SCompressInfo cinfo = {
1,299,431✔
238
        .cmprAlg = brinBlk->cmprAlg,
1,299,431✔
239
        .dataType = TSDB_DATA_TYPE_BIGINT,
240
        .compressedSize = brinBlk->size[i],
1,299,431✔
241
        .originalSize = brinBlk->numRec * sizeof(int64_t),
1,299,431✔
242
    };
243
    TAOS_CHECK_GOTO(tDecompressDataToBuffer(BR_PTR(&br), &cinfo, brinBlock->buffers + i, assist), &lino, _exit);
1,299,431!
244
    br.offset += brinBlk->size[i];
1,299,300✔
245
  }
246

247
  for (int32_t i = 10; i < 15; i++) {  // int32_t
780,007✔
248
    SCompressInfo cinfo = {
650,090✔
249
        .cmprAlg = brinBlk->cmprAlg,
650,090✔
250
        .dataType = TSDB_DATA_TYPE_INT,
251
        .compressedSize = brinBlk->size[i],
650,090✔
252
        .originalSize = brinBlk->numRec * sizeof(int32_t),
650,090✔
253
    };
254
    TAOS_CHECK_GOTO(tDecompressDataToBuffer(BR_PTR(&br), &cinfo, brinBlock->buffers + i, assist), &lino, _exit);
650,090!
255
    br.offset += brinBlk->size[i];
650,041✔
256
  }
257

258
  // primary keys
259
  if (brinBlk->numOfPKs > 0) {  // decode the primary keys
129,917!
260
    SValueColumnCompressInfo firstInfos[TD_MAX_PK_COLS];
261
    SValueColumnCompressInfo lastInfos[TD_MAX_PK_COLS];
262

UNCOV
263
    for (int32_t i = 0; i < brinBlk->numOfPKs; i++) {
×
UNCOV
264
      TAOS_CHECK_GOTO(tValueColumnCompressInfoDecode(&br, firstInfos + i), &lino, _exit);
×
265
    }
UNCOV
266
    for (int32_t i = 0; i < brinBlk->numOfPKs; i++) {
×
UNCOV
267
      TAOS_CHECK_GOTO(tValueColumnCompressInfoDecode(&br, lastInfos + i), &lino, _exit);
×
268
    }
269

UNCOV
270
    for (int32_t i = 0; i < brinBlk->numOfPKs; i++) {
×
UNCOV
271
      SValueColumnCompressInfo *info = firstInfos + i;
×
272

UNCOV
273
      TAOS_CHECK_GOTO(tValueColumnDecompress(BR_PTR(&br), info, brinBlock->firstKeyPKs + i, assist), &lino, _exit);
×
UNCOV
274
      br.offset += (info->offsetCompressedSize + info->dataCompressedSize);
×
275
    }
276

UNCOV
277
    for (int32_t i = 0; i < brinBlk->numOfPKs; i++) {
×
UNCOV
278
      SValueColumnCompressInfo *info = lastInfos + i;
×
279

UNCOV
280
      TAOS_CHECK_GOTO(tValueColumnDecompress(BR_PTR(&br), info, brinBlock->lastKeyPKs + i, assist), &lino, _exit);
×
UNCOV
281
      br.offset += (info->offsetCompressedSize + info->dataCompressedSize);
×
282
    }
283
  }
284

285
  if (br.offset != br.buffer->size) {
129,917!
286
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
287
  }
288

289
_exit:
129,917✔
290
  if (code) {
129,917!
291
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, lino,
×
292
              tstrerror(code));
293
  }
294
  return code;
130,137✔
295
}
296

297
extern int32_t tBlockDataDecompress(SBufferReader *br, SBlockData *blockData, SBuffer *assist);
298

299
int32_t tsdbDataFileReadBlockData(SDataFileReader *reader, const SBrinRecord *record, SBlockData *bData) {
2,440✔
300
  int32_t code = 0;
2,440✔
301
  int32_t lino = 0;
2,440✔
302
  int32_t fid = reader->config->files[TSDB_FTYPE_DATA].file.fid;
2,440✔
303

304
  SBuffer *buffer = reader->buffers + 0;
2,440✔
305
  SBuffer *assist = reader->buffers + 1;
2,440✔
306

307
  int32_t encryptAlgorithm = reader->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
2,440✔
308
  char   *encryptKey = reader->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
2,440✔
309
  // load data
310
  tBufferClear(buffer);
311
  TAOS_CHECK_GOTO(tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_DATA], record->blockOffset, record->blockSize, buffer, 0,
2,440!
312
                                       encryptAlgorithm, encryptKey),
313
                  &lino, _exit);
314

315
  // decompress
316
  SBufferReader br = BUFFER_READER_INITIALIZER(0, buffer);
2,440✔
317
  TAOS_CHECK_GOTO(tBlockDataDecompress(&br, bData, assist), &lino, _exit);
2,440!
318

319
  if (br.offset != buffer->size) {
2,440!
320
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
321
  }
322

323
_exit:
2,440✔
324
  if (code) {
2,440!
325
    tsdbError("vgId:%d %s fid %d failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, fid,
×
326
              __FILE__, lino, tstrerror(code));
327
  }
328
  return code;
2,440✔
329
}
330

331
int32_t tsdbDataFileReadBlockDataByColumn(SDataFileReader *reader, const SBrinRecord *record, SBlockData *bData,
354,470✔
332
                                          STSchema *pTSchema, int16_t cids[], int32_t ncid) {
333
  int32_t code = 0;
354,470✔
334
  int32_t lino = 0;
354,470✔
335
  int32_t fid = reader->config->files[TSDB_FTYPE_DATA].file.fid;
354,470✔
336

337
  SDiskDataHdr hdr;
338
  SBuffer     *buffer0 = reader->buffers + 0;
354,470✔
339
  SBuffer     *buffer1 = reader->buffers + 1;
354,470✔
340
  SBuffer     *assist = reader->buffers + 2;
354,470✔
341

342
  int32_t encryptAlgorithm = reader->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
354,470✔
343
  char   *encryptKey = reader->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
354,470✔
344
  // load key part
345
  tBufferClear(buffer0);
346
  TAOS_CHECK_GOTO(tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_DATA], record->blockOffset, record->blockKeySize, buffer0,
354,470!
347
                                       0, encryptAlgorithm, encryptKey),
348
                  &lino, _exit);
349

350
  // SDiskDataHdr
351
  SBufferReader br = BUFFER_READER_INITIALIZER(0, buffer0);
354,540✔
352
  TAOS_CHECK_GOTO(tGetDiskDataHdr(&br, &hdr), &lino, _exit);
354,540!
353

354
  if (hdr.delimiter != TSDB_FILE_DLMT) {
354,314!
355
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
356
  }
357

358
  tBlockDataReset(bData);
354,314✔
359
  bData->suid = hdr.suid;
354,337✔
360
  bData->uid = hdr.uid;
354,337✔
361
  bData->nRow = hdr.nRow;
354,337✔
362

363
  // Key part
364
  TAOS_CHECK_GOTO(tBlockDataDecompressKeyPart(&hdr, &br, bData, assist), &lino, _exit);
354,337!
365
  if (br.offset != buffer0->size) {
354,552!
366
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
367
  }
368

369
  int extraColIdx = -1;
354,552✔
370
  for (int i = 0; i < ncid; i++) {
354,530✔
371
    if (tBlockDataGetColData(bData, cids[i]) == NULL) {
332,710!
372
      extraColIdx = i;
332,704✔
373
      break;
332,704✔
374
    }
375
  }
376

377
  if (extraColIdx < 0) {
354,524✔
378
    goto _exit;
21,855✔
379
  }
380

381
  // load SBlockCol part
382
  tBufferClear(buffer0);
383
  TAOS_CHECK_GOTO(tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_DATA], record->blockOffset + record->blockKeySize,
332,669!
384
                                       hdr.szBlkCol, buffer0, 0, encryptAlgorithm, encryptKey),
385
                  &lino, _exit);
386

387
  // calc szHint
388
  int64_t szHint = 0;
332,769✔
389
  int     extraCols = 1;
332,769✔
390
  for (int i = extraColIdx + 1; i < ncid; ++i) {
332,761✔
391
    if (tBlockDataGetColData(bData, cids[i]) == NULL) {
115,761!
392
      ++extraCols;
115,771✔
393
      break;
115,771✔
394
    }
395
  }
396

397
  if (extraCols >= 2) {
332,771✔
398
    br = BUFFER_READER_INITIALIZER(0, buffer0);
115,769✔
399

400
    SBlockCol blockCol = {.cid = 0};
115,769✔
401
    for (int32_t i = extraColIdx; i < ncid; ++i) {
115,769✔
402
      int16_t extraColCid = cids[i];
115,766✔
403

404
      while (extraColCid > blockCol.cid) {
252,711✔
405
        if (br.offset >= buffer0->size) {
136,991!
406
          blockCol.cid = INT16_MAX;
×
407
          break;
×
408
        }
409

410
        TAOS_CHECK_GOTO(tGetBlockCol(&br, &blockCol, hdr.fmtVer, hdr.cmprAlg), &lino, _exit);
136,991!
411
      }
412

413
      if (extraColCid == blockCol.cid || blockCol.cid == INT16_MAX) {
115,720!
414
        extraColIdx = i;
115,720✔
415
        break;
115,720✔
416
      }
417
    }
418

419
    if (blockCol.cid > 0 && blockCol.cid < INT16_MAX /*&& blockCol->flag == HAS_VALUE*/) {
115,723!
420
      int64_t   offset = blockCol.offset;
115,712✔
421
      SBlockCol lastNonNoneBlockCol = {.cid = 0};
115,712✔
422

423
      for (int32_t i = extraColIdx; i < ncid; ++i) {
844,953✔
424
        int16_t extraColCid = cids[i];
729,589✔
425

426
        while (extraColCid > blockCol.cid) {
1,364,764✔
427
          if (br.offset >= buffer0->size) {
635,523!
UNCOV
428
            blockCol.cid = INT16_MAX;
×
UNCOV
429
            break;
×
430
          }
431

432
          TAOS_CHECK_GOTO(tGetBlockCol(&br, &blockCol, hdr.fmtVer, hdr.cmprAlg), &lino, _exit);
635,523!
433
        }
434

435
        if (extraColCid == blockCol.cid) {
729,241✔
436
          lastNonNoneBlockCol = blockCol;
729,126✔
437
          continue;
729,126✔
438
        }
439

440
        if (blockCol.cid == INT16_MAX) {
115!
UNCOV
441
          break;
×
442
        }
443
      }
444

445
      if (lastNonNoneBlockCol.cid > 0) {
115,364!
446
        szHint = lastNonNoneBlockCol.offset + lastNonNoneBlockCol.szBitmap + lastNonNoneBlockCol.szOffset +
115,779✔
447
                 lastNonNoneBlockCol.szValue - offset;
115,779✔
448
      }
449
    }
450
  }
451

452
  // load each column
453
  SBlockCol blockCol = {
332,377✔
454
      .cid = 0,
455
  };
456
  bool firstRead = true;
332,377✔
457
  br = BUFFER_READER_INITIALIZER(0, buffer0);
332,377✔
458
  for (int32_t i = 0; i < ncid; i++) {
1,278,712✔
459
    int16_t cid = cids[i];
946,449✔
460

461
    if (tBlockDataGetColData(bData, cid)) {  // already loaded
946,449!
UNCOV
462
      continue;
×
463
    }
464

465
    while (cid > blockCol.cid) {
2,708,039✔
466
      if (br.offset >= buffer0->size) {
1,762,343!
UNCOV
467
        blockCol.cid = INT16_MAX;
×
UNCOV
468
        break;
×
469
      }
470

471
      TAOS_CHECK_GOTO(tGetBlockCol(&br, &blockCol, hdr.fmtVer, hdr.cmprAlg), &lino, _exit);
1,762,343!
472
    }
473

474
    if (cid < blockCol.cid) {
945,696!
UNCOV
475
      const STColumn *tcol = tTSchemaSearchColumn(pTSchema, cid);
×
UNCOV
476
      TSDB_CHECK_NULL(tcol, code, lino, _exit, TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER);
×
UNCOV
477
      SBlockCol none = {
×
478
          .cid = cid,
UNCOV
479
          .type = tcol->type,
×
UNCOV
480
          .cflag = tcol->flags,
×
481
          .flag = HAS_NONE,
482
          .szOrigin = 0,
483
          .szBitmap = 0,
484
          .szOffset = 0,
485
          .szValue = 0,
486
          .offset = 0,
487
      };
UNCOV
488
      TAOS_CHECK_GOTO(tBlockDataDecompressColData(&hdr, &none, &br, bData, assist), &lino, _exit);
×
489
    } else if (cid == blockCol.cid) {
945,696!
490
      int32_t encryptAlgorithm = reader->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
945,847✔
491
      char   *encryptKey = reader->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
945,847✔
492
      // load from file
493
      tBufferClear(buffer1);
494
      TAOS_CHECK_GOTO(tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_DATA],
945,847!
495
                                           record->blockOffset + record->blockKeySize + hdr.szBlkCol + blockCol.offset,
496
                                           blockCol.szBitmap + blockCol.szOffset + blockCol.szValue, buffer1,
497
                                           firstRead ? szHint : 0, encryptAlgorithm, encryptKey),
498
                      &lino, _exit);
499

500
      firstRead = false;
946,637✔
501

502
      // decode the buffer
503
      SBufferReader br1 = BUFFER_READER_INITIALIZER(0, buffer1);
946,637✔
504
      TAOS_CHECK_GOTO(tBlockDataDecompressColData(&hdr, &blockCol, &br1, bData, assist), &lino, _exit);
946,637!
505
    }
506
  }
507

508
_exit:
332,263✔
509
  if (code) {
354,118!
510
    tsdbError("vgId:%d %s fid:%d failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, fid,
×
511
              __FILE__, lino, tstrerror(code));
512
  }
513
  return code;
354,502✔
514
}
515

516
int32_t tsdbDataFileReadBlockSma(SDataFileReader *reader, const SBrinRecord *record,
46,983✔
517
                                 TColumnDataAggArray *columnDataAggArray) {
518
  int32_t  code = 0;
46,983✔
519
  int32_t  lino = 0;
46,983✔
520
  SBuffer *buffer = reader->buffers + 0;
46,983✔
521

522
  TARRAY2_CLEAR(columnDataAggArray, NULL);
46,983✔
523
  if (record->smaSize > 0) {
46,983!
524
    tBufferClear(buffer);
525
    int32_t encryptAlgorithm = reader->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
47,003✔
526
    char   *encryptKey = reader->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
47,003✔
527
    TAOS_CHECK_GOTO(tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_SMA], record->smaOffset, record->smaSize, buffer, 0,
47,003!
528
                                         encryptAlgorithm, encryptKey),
529
                    &lino, _exit);
530

531
    // decode sma data
532
    SBufferReader br = BUFFER_READER_INITIALIZER(0, buffer);
46,991✔
533
    while (br.offset < record->smaSize) {
271,178✔
534
      SColumnDataAgg sma[1];
535

536
      TAOS_CHECK_GOTO(tGetColumnDataAgg(&br, sma), &lino, _exit);
224,643!
537
      TAOS_CHECK_GOTO(TARRAY2_APPEND_PTR(columnDataAggArray, sma), &lino, _exit);
448,374!
538
    }
539
    if (br.offset != record->smaSize) {
46,535!
540
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
541
    }
542
  }
543

544
_exit:
×
545
  if (code) {
46,515!
546
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, lino,
×
547
              tstrerror(code));
548
  }
549
  return code;
46,910✔
550
}
551

552
int32_t tsdbDataFileReadTombBlk(SDataFileReader *reader, const TTombBlkArray **tombBlkArray) {
14,283✔
553
  int32_t code = 0;
14,283✔
554
  int32_t lino = 0;
14,283✔
555
  void   *data = NULL;
14,283✔
556

557
  if (!reader->ctx->tombBlkLoaded) {
14,283✔
558
    TAOS_CHECK_GOTO(tsdbDataFileReadTombFooter(reader), &lino, _exit);
7,709!
559

560
    if (reader->tombFooter->tombBlkPtr->size > 0) {
7,709✔
561
      if ((data = taosMemoryMalloc(reader->tombFooter->tombBlkPtr->size)) == NULL) {
7,406!
562
        TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
563
      }
564

565
      int32_t encryptAlgorithm = reader->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
7,406✔
566
      char   *encryptKey = reader->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
7,406✔
567
      TAOS_CHECK_GOTO(tsdbReadFile(reader->fd[TSDB_FTYPE_TOMB], reader->tombFooter->tombBlkPtr->offset, data,
7,406!
568
                                   reader->tombFooter->tombBlkPtr->size, 0, encryptAlgorithm, encryptKey),
569
                      &lino, _exit);
570

571
      int32_t size = reader->tombFooter->tombBlkPtr->size / sizeof(STombBlk);
7,406✔
572
      TARRAY2_INIT_EX(reader->tombBlkArray, size, size, data);
7,406✔
573
    } else {
574
      TARRAY2_INIT(reader->tombBlkArray);
303✔
575
    }
576

577
    reader->ctx->tombBlkLoaded = true;
7,709✔
578
  }
579
  tombBlkArray[0] = reader->tombBlkArray;
14,283✔
580

581
_exit:
14,283✔
582
  if (code) {
14,283!
583
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, lino,
×
584
              tstrerror(code));
585
    taosMemoryFree(data);
×
586
  }
587
  return code;
14,283✔
588
}
589

590
int32_t tsdbDataFileReadTombBlock(SDataFileReader *reader, const STombBlk *tombBlk, STombBlock *tData) {
12,918✔
591
  int32_t code = 0;
12,918✔
592
  int32_t lino = 0;
12,918✔
593

594
  SBuffer *buffer0 = reader->buffers + 0;
12,918✔
595
  SBuffer *assist = reader->buffers + 1;
12,918✔
596

597
  tBufferClear(buffer0);
598
  int32_t encryptAlgorithm = reader->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
12,918✔
599
  char   *encryptKey = reader->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
12,918✔
600
  TAOS_CHECK_GOTO(tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_TOMB], tombBlk->dp->offset, tombBlk->dp->size, buffer0, 0,
12,918!
601
                                       encryptAlgorithm, encryptKey),
602
                  &lino, _exit);
603

604
  int32_t       size = 0;
12,918✔
605
  SBufferReader br = BUFFER_READER_INITIALIZER(0, buffer0);
12,918✔
606
  tTombBlockClear(tData);
12,918✔
607
  tData->numOfRecords = tombBlk->numRec;
12,918✔
608
  for (int32_t i = 0; i < ARRAY_SIZE(tData->buffers); ++i) {
77,508✔
609
    SCompressInfo cinfo = {
64,590✔
610
        .cmprAlg = tombBlk->cmprAlg,
64,590✔
611
        .dataType = TSDB_DATA_TYPE_BIGINT,
612
        .originalSize = tombBlk->numRec * sizeof(int64_t),
64,590✔
613
        .compressedSize = tombBlk->size[i],
64,590✔
614
    };
615
    TAOS_CHECK_GOTO(tDecompressDataToBuffer(BR_PTR(&br), &cinfo, tData->buffers + i, assist), &lino, _exit);
64,590!
616
    br.offset += tombBlk->size[i];
64,590✔
617
  }
618

619
_exit:
12,918✔
620
  if (code) {
12,918!
621
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, lino,
×
622
              tstrerror(code));
623
  }
624
  return code;
12,918✔
625
}
626

627
// SDataFileWriter =============================================
628
struct SDataFileWriter {
629
  SDataFileWriterConfig config[1];
630

631
  SSkmInfo skmTb[1];
632
  SSkmInfo skmRow[1];
633
  SBuffer  local[10];
634
  SBuffer *buffers;
635

636
  struct {
637
    bool             opened;
638
    SDataFileReader *reader;
639

640
    // for ts data
641
    TABLEID tbid[1];
642
    bool    tbHasOldData;
643

644
    const TBrinBlkArray *brinBlkArray;
645
    int32_t              brinBlkArrayIdx;
646
    SBrinBlock           brinBlock[1];
647
    int32_t              brinBlockIdx;
648
    SBlockData           blockData[1];
649
    int32_t              blockDataIdx;
650
    // for tomb data
651
    bool                 hasOldTomb;
652
    const TTombBlkArray *tombBlkArray;
653
    int32_t              tombBlkArrayIdx;
654
    STombBlock           tombBlock[1];
655
    int32_t              tombBlockIdx;
656
    // range
657
    SVersionRange range;
658
    SVersionRange tombRange;
659
  } ctx[1];
660

661
  STFile   files[TSDB_FTYPE_MAX];
662
  STsdbFD *fd[TSDB_FTYPE_MAX];
663

664
  SHeadFooter headFooter[1];
665
  STombFooter tombFooter[1];
666

667
  TBrinBlkArray brinBlkArray[1];
668
  SBrinBlock    brinBlock[1];
669
  SBlockData    blockData[1];
670

671
  TTombBlkArray tombBlkArray[1];
672
  STombBlock    tombBlock[1];
673
};
674

675
static int32_t tsdbDataFileWriterCloseAbort(SDataFileWriter *writer) {
×
676
  tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, __LINE__,
×
677
            "not implemented");
678
  return 0;
×
679
}
680

681
static void tsdbDataFileWriterDoClose(SDataFileWriter *writer) {
2,248✔
682
  if (writer->ctx->reader) {
2,248✔
683
    tsdbDataFileReaderClose(&writer->ctx->reader);
871✔
684
  }
685

686
  tTombBlockDestroy(writer->tombBlock);
2,248✔
687
  TARRAY2_DESTROY(writer->tombBlkArray, NULL);
2,248!
688
  tBlockDataDestroy(writer->blockData);
2,248✔
689
  tBrinBlockDestroy(writer->brinBlock);
2,248✔
690
  TARRAY2_DESTROY(writer->brinBlkArray, NULL);
2,248!
691

692
  tTombBlockDestroy(writer->ctx->tombBlock);
2,248✔
693
  tBlockDataDestroy(writer->ctx->blockData);
2,248✔
694
  tBrinBlockDestroy(writer->ctx->brinBlock);
2,248✔
695

696
  for (int32_t i = 0; i < ARRAY_SIZE(writer->local); ++i) {
24,728✔
697
    tBufferDestroy(writer->local + i);
22,480!
698
  }
699

700
  tDestroyTSchema(writer->skmRow->pTSchema);
2,248!
701
  tDestroyTSchema(writer->skmTb->pTSchema);
2,248!
702
}
2,248✔
703

704
static int32_t tsdbDataFileWriterDoOpenReader(SDataFileWriter *writer) {
2,248✔
705
  int32_t code = 0;
2,248✔
706
  int32_t lino = 0;
2,248✔
707

708
  for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) {
9,334✔
709
    if (writer->config->files[i].exist) {
7,957✔
710
      SDataFileReaderConfig config[1] = {{
871✔
711
          .tsdb = writer->config->tsdb,
871✔
712
          .szPage = writer->config->szPage,
871✔
713
          .buffers = writer->buffers,
871✔
714
      }};
715

716
      for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) {
4,355✔
717
        config->files[i].exist = writer->config->files[i].exist;
3,484✔
718
        if (config->files[i].exist) {
3,484✔
719
          config->files[i].file = writer->config->files[i].file;
1,569✔
720
        }
721
      }
722

723
      TAOS_CHECK_GOTO(tsdbDataFileReaderOpen(NULL, config, &writer->ctx->reader), &lino, _exit);
871!
724
      break;
871✔
725
    }
726
  }
727

728
_exit:
1,377✔
729
  if (code) {
2,248!
730
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
731
              tstrerror(code));
732
  }
733
  return code;
2,248✔
734
}
735

736
static int32_t tsdbDataFileWriterDoOpen(SDataFileWriter *writer) {
2,248✔
737
  int32_t code = 0;
2,248✔
738
  int32_t lino = 0;
2,248✔
739
  int32_t ftype;
740
  SDiskID diskId = {0};
2,248✔
741

742
  if (!writer->config->skmTb) writer->config->skmTb = writer->skmTb;
2,248!
743
  if (!writer->config->skmRow) writer->config->skmRow = writer->skmRow;
2,248!
744
  writer->buffers = writer->config->buffers;
2,248✔
745
  if (writer->buffers == NULL) {
2,248!
746
    writer->buffers = writer->local;
×
747
  }
748

749
  // open reader
750
  TAOS_CHECK_GOTO(tsdbDataFileWriterDoOpenReader(writer), &lino, _exit);
2,248!
751

752
  // .head
753
  ftype = TSDB_FTYPE_HEAD;
2,248✔
754
  code = tsdbAllocateDisk(writer->config->tsdb, tsdbFTypeLabel(ftype), writer->config->expLevel, &diskId);
2,248✔
755
  TSDB_CHECK_CODE(code, lino, _exit);
2,248!
756
  writer->files[ftype] = (STFile){
2,248✔
757
      .type = ftype,
758
      .did = diskId,
759
      .fid = writer->config->fid,
2,248✔
760
      .cid = writer->config->cid,
2,248✔
761
      .size = 0,
762
      .minVer = VERSION_MAX,
763
      .maxVer = VERSION_MIN,
764
  };
765

766
  // .data
767
  ftype = TSDB_FTYPE_DATA;
2,248✔
768
  if (writer->config->files[ftype].exist) {
2,248✔
769
    writer->files[ftype] = writer->config->files[ftype].file;
345✔
770
  } else {
771
    code = tsdbAllocateDisk(writer->config->tsdb, tsdbFTypeLabel(ftype), writer->config->expLevel, &diskId);
1,903✔
772
    TSDB_CHECK_CODE(code, lino, _exit);
1,903!
773
    writer->files[ftype] = (STFile){
3,806✔
774
        .type = ftype,
775
        .did = diskId,
776
        .fid = writer->config->fid,
1,903✔
777
        .cid = writer->config->cid,
1,903✔
778
        .size = 0,
779
        .lcn = writer->config->lcn == -1 ? 0 : -1,
1,903✔
780
        .minVer = VERSION_MAX,
781
        .maxVer = VERSION_MIN,
782
    };
783
  }
784

785
  // .sma
786
  ftype = TSDB_FTYPE_SMA;
2,248✔
787
  if (writer->config->files[ftype].exist) {
2,248✔
788
    writer->files[ftype] = writer->config->files[ftype].file;
345✔
789
  } else {
790
    code = tsdbAllocateDisk(writer->config->tsdb, tsdbFTypeLabel(ftype), writer->config->expLevel, &diskId);
1,903✔
791
    TSDB_CHECK_CODE(code, lino, _exit);
1,903!
792
    writer->files[ftype] = (STFile){
1,903✔
793
        .type = ftype,
794
        .did = diskId,
795
        .fid = writer->config->fid,
1,903✔
796
        .cid = writer->config->cid,
1,903✔
797
        .size = 0,
798
        .minVer = VERSION_MAX,
799
        .maxVer = VERSION_MIN,
800
    };
801
  }
802

803
  // .tomb
804
  ftype = TSDB_FTYPE_TOMB;
2,248✔
805
  code = tsdbAllocateDisk(writer->config->tsdb, tsdbFTypeLabel(ftype), writer->config->expLevel, &diskId);
2,248✔
806
  TSDB_CHECK_CODE(code, lino, _exit);
2,248!
807
  writer->files[ftype] = (STFile){
2,248✔
808
      .type = ftype,
809
      .did = diskId,
810
      .fid = writer->config->fid,
2,248✔
811
      .cid = writer->config->cid,
2,248✔
812
      .size = 0,
813
      .minVer = VERSION_MAX,
814
      .maxVer = VERSION_MIN,
815
  };
816

817
  // range
818
  writer->ctx->range = (SVersionRange){.minVer = VERSION_MAX, .maxVer = VERSION_MIN};
2,248✔
819
  writer->ctx->tombRange = (SVersionRange){.minVer = VERSION_MAX, .maxVer = VERSION_MIN};
2,248✔
820

821
  writer->ctx->opened = true;
2,248✔
822

823
_exit:
2,248✔
824
  if (code) {
2,248!
825
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
826
              tstrerror(code));
827
  }
828
  return code;
2,248✔
829
}
830

831
void tsdbWriterUpdVerRange(SVersionRange *range, int64_t minVer, int64_t maxVer) {
660,803✔
832
  range->minVer = TMIN(range->minVer, minVer);
660,803✔
833
  range->maxVer = TMAX(range->maxVer, maxVer);
660,803✔
834
}
660,803✔
835

836
int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, uint32_t cmprAlg, int64_t *fileSize,
1,616✔
837
                               TBrinBlkArray *brinBlkArray, SBuffer *buffers, SVersionRange *range,
838
                               int32_t encryptAlgorithm, char *encryptKey) {
839
  if (brinBlock->numOfRecords == 0) {
1,616!
840
    return 0;
×
841
  }
842

843
  int32_t  code;
844
  SBuffer *buffer0 = buffers + 0;
1,616✔
845
  SBuffer *buffer1 = buffers + 1;
1,616✔
846
  SBuffer *assist = buffers + 2;
1,616✔
847

848
  SBrinBlk brinBlk = {
1,616✔
849
      .dp[0] =
850
          {
851
              .offset = *fileSize,
1,616✔
852
              .size = 0,
853
          },
854
      .numRec = brinBlock->numOfRecords,
1,616✔
855
      .numOfPKs = brinBlock->numOfPKs,
1,616✔
856
      .cmprAlg = cmprAlg,
857
  };
858
  for (int i = 0; i < brinBlock->numOfRecords; i++) {
64,315✔
859
    SBrinRecord record;
860

861
    TAOS_CHECK_RETURN(tBrinBlockGet(brinBlock, i, &record));
62,700!
862
    if (i == 0) {
62,699✔
863
      brinBlk.minTbid.suid = record.suid;
1,615✔
864
      brinBlk.minTbid.uid = record.uid;
1,615✔
865
      brinBlk.minVer = record.minVer;
1,615✔
866
      brinBlk.maxVer = record.maxVer;
1,615✔
867
    }
868
    if (i == brinBlock->numOfRecords - 1) {
62,699✔
869
      brinBlk.maxTbid.suid = record.suid;
1,615✔
870
      brinBlk.maxTbid.uid = record.uid;
1,615✔
871
    }
872
    if (record.minVer < brinBlk.minVer) {
62,699✔
873
      brinBlk.minVer = record.minVer;
177✔
874
    }
875
    if (record.maxVer > brinBlk.maxVer) {
62,699✔
876
      brinBlk.maxVer = record.maxVer;
36,816✔
877
    }
878
  }
879

880
  tsdbWriterUpdVerRange(range, brinBlk.minVer, brinBlk.maxVer);
1,615✔
881

882
  // write to file
883
  for (int32_t i = 0; i < 10; ++i) {
17,772✔
884
    SCompressInfo info = {
16,155✔
885
        .cmprAlg = cmprAlg,
886
        .dataType = TSDB_DATA_TYPE_BIGINT,
887
        .originalSize = brinBlock->buffers[i].size,
16,155✔
888
    };
889

890
    tBufferClear(buffer0);
891
    TAOS_CHECK_RETURN(tCompressDataToBuffer(brinBlock->buffers[i].data, &info, buffer0, assist));
16,155!
892
    TAOS_CHECK_RETURN(tsdbWriteFile(fd, *fileSize, buffer0->data, buffer0->size, encryptAlgorithm, encryptKey));
16,159!
893
    brinBlk.size[i] = info.compressedSize;
16,157✔
894
    brinBlk.dp->size += info.compressedSize;
16,157✔
895
    *fileSize += info.compressedSize;
16,157✔
896
  }
897
  for (int32_t i = 10; i < 15; ++i) {
9,697✔
898
    SCompressInfo info = {
8,079✔
899
        .cmprAlg = cmprAlg,
900
        .dataType = TSDB_DATA_TYPE_INT,
901
        .originalSize = brinBlock->buffers[i].size,
8,079✔
902
    };
903

904
    tBufferClear(buffer0);
905
    TAOS_CHECK_RETURN(tCompressDataToBuffer(brinBlock->buffers[i].data, &info, buffer0, assist));
8,079!
906
    TAOS_CHECK_RETURN(tsdbWriteFile(fd, *fileSize, buffer0->data, buffer0->size, encryptAlgorithm, encryptKey));
8,080!
907
    brinBlk.size[i] = info.compressedSize;
8,080✔
908
    brinBlk.dp->size += info.compressedSize;
8,080✔
909
    *fileSize += info.compressedSize;
8,080✔
910
  }
911

912
  // write primary keys to file
913
  if (brinBlock->numOfPKs > 0) {
1,618!
914
    tBufferClear(buffer0);
915
    tBufferClear(buffer1);
916

917
    // encode
UNCOV
918
    for (int i = 0; i < brinBlock->numOfPKs; i++) {
×
UNCOV
919
      SValueColumnCompressInfo info = {.cmprAlg = cmprAlg};
×
UNCOV
920
      TAOS_CHECK_RETURN(tValueColumnCompress(&brinBlock->firstKeyPKs[i], &info, buffer1, assist));
×
UNCOV
921
      TAOS_CHECK_RETURN(tValueColumnCompressInfoEncode(&info, buffer0));
×
922
    }
UNCOV
923
    for (int i = 0; i < brinBlock->numOfPKs; i++) {
×
UNCOV
924
      SValueColumnCompressInfo info = {.cmprAlg = cmprAlg};
×
UNCOV
925
      TAOS_CHECK_RETURN(tValueColumnCompress(&brinBlock->lastKeyPKs[i], &info, buffer1, assist));
×
UNCOV
926
      TAOS_CHECK_RETURN(tValueColumnCompressInfoEncode(&info, buffer0));
×
927
    }
928

929
    // write to file
UNCOV
930
    TAOS_CHECK_RETURN(tsdbWriteFile(fd, *fileSize, buffer0->data, buffer0->size, encryptAlgorithm, encryptKey));
×
UNCOV
931
    *fileSize += buffer0->size;
×
UNCOV
932
    brinBlk.dp->size += buffer0->size;
×
UNCOV
933
    TAOS_CHECK_RETURN(tsdbWriteFile(fd, *fileSize, buffer1->data, buffer1->size, encryptAlgorithm, encryptKey));
×
UNCOV
934
    *fileSize += buffer1->size;
×
UNCOV
935
    brinBlk.dp->size += buffer1->size;
×
936
  }
937

938
  // append to brinBlkArray
939
  TAOS_CHECK_RETURN(TARRAY2_APPEND_PTR(brinBlkArray, &brinBlk));
3,236!
940

941
  tBrinBlockClear(brinBlock);
1,618✔
942

943
  return 0;
1,616✔
944
}
945

946
static int32_t tsdbDataFileWriteBrinBlock(SDataFileWriter *writer) {
1,617✔
947
  if (writer->brinBlock->numOfRecords == 0) {
1,617✔
948
    return 0;
1✔
949
  }
950

951
  int32_t code = 0;
1,616✔
952
  int32_t lino = 0;
1,616✔
953

954
  int32_t encryptAlgorithm = writer->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
1,616✔
955
  char   *encryptKey = writer->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
1,616✔
956

957
  TAOS_CHECK_GOTO(tsdbFileWriteBrinBlock(writer->fd[TSDB_FTYPE_HEAD], writer->brinBlock, writer->config->cmprAlg,
1,616!
958
                                         &writer->files[TSDB_FTYPE_HEAD].size, writer->brinBlkArray, writer->buffers,
959
                                         &writer->ctx->range, encryptAlgorithm, encryptKey),
960
                  &lino, _exit);
961

962
_exit:
1,616✔
963
  if (code) {
1,616!
964
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
965
              tstrerror(code));
966
  }
967
  return code;
1,616✔
968
}
969

970
static int32_t tsdbDataFileWriteBrinRecord(SDataFileWriter *writer, const SBrinRecord *record) {
62,488✔
971
  int32_t code = 0;
62,488✔
972
  int32_t lino = 0;
62,488✔
973

974
  for (;;) {
975
    code = tBrinBlockPut(writer->brinBlock, record);
62,488✔
976
    if (code == TSDB_CODE_INVALID_PARA) {
62,693✔
977
      // different records with different primary keys
978
      TAOS_CHECK_GOTO(tsdbDataFileWriteBrinBlock(writer), &lino, _exit);
1!
UNCOV
979
      continue;
×
980
    } else {
981
      TSDB_CHECK_CODE(code, lino, _exit);
62,692!
982
    }
983
    break;
62,692✔
984
  }
985

986
  if ((writer->brinBlock->numOfRecords) >= 256) {
62,692✔
987
    TAOS_CHECK_GOTO(tsdbDataFileWriteBrinBlock(writer), &lino, _exit);
182!
988
  }
989

990
_exit:
62,692✔
991
  if (code) {
62,692!
992
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
993
              tstrerror(code));
994
  }
995
  return code;
62,667✔
996
}
997

998
static int32_t tsdbDataFileDoWriteBlockData(SDataFileWriter *writer, SBlockData *bData) {
45,130✔
999
  if (bData->nRow == 0) {
45,130✔
1000
    return 0;
13,880✔
1001
  }
1002

1003
  if (!bData->uid) {
31,250!
1004
    return TSDB_CODE_INVALID_PARA;
×
1005
  }
1006

1007
  int32_t  code = 0;
31,250✔
1008
  int32_t  lino = 0;
31,250✔
1009
  SBuffer *buffers = writer->buffers;
31,250✔
1010
  SBuffer *assist = writer->buffers + 4;
31,250✔
1011

1012
  SColCompressInfo cmprInfo = {.pColCmpr = NULL, .defaultCmprAlg = writer->config->cmprAlg};
31,250✔
1013

1014
  SBrinRecord record[1] = {{
31,250✔
1015
      .suid = bData->suid,
31,250✔
1016
      .uid = bData->uid,
31,250✔
1017
      .minVer = bData->aVersion[0],
31,250✔
1018
      .maxVer = bData->aVersion[0],
31,250✔
1019
      .blockOffset = writer->files[TSDB_FTYPE_DATA].size,
31,250✔
1020
      .smaOffset = writer->files[TSDB_FTYPE_SMA].size,
31,250✔
1021
      .blockSize = 0,
1022
      .blockKeySize = 0,
1023
      .smaSize = 0,
1024
      .numRow = bData->nRow,
31,250✔
1025
      .count = 1,
1026
  }};
1027

1028
  tsdbRowGetKey(&tsdbRowFromBlockData(bData, 0), &record->firstKey);
31,250✔
1029
  tsdbRowGetKey(&tsdbRowFromBlockData(bData, bData->nRow - 1), &record->lastKey);
31,254✔
1030

1031
  for (int32_t i = 1; i < bData->nRow; ++i) {
76,176,626✔
1032
    if (tsdbRowCompareWithoutVersion(&tsdbRowFromBlockData(bData, i - 1), &tsdbRowFromBlockData(bData, i)) != 0) {
76,145,371!
1033
      record->count++;
76,143,129✔
1034
    }
1035
    if (bData->aVersion[i] < record->minVer) {
76,125,852✔
1036
      record->minVer = bData->aVersion[i];
8,501✔
1037
    }
1038
    if (bData->aVersion[i] > record->maxVer) {
76,125,852✔
1039
      record->maxVer = bData->aVersion[i];
203,752✔
1040
    }
1041
  }
1042

1043
  tsdbWriterUpdVerRange(&writer->ctx->range, record->minVer, record->maxVer);
31,255✔
1044

1045
  code = metaGetColCmpr(writer->config->tsdb->pVnode->pMeta, bData->suid != 0 ? bData->suid : bData->uid,
31,254✔
1046
                        &cmprInfo.pColCmpr);
1047
  if (code) {
31,254!
1048
    tsdbWarn("vgId:%d failed to get column compress algrithm", TD_VID(writer->config->tsdb->pVnode));
×
1049
  }
1050

1051
  TAOS_CHECK_GOTO(tBlockDataCompress(bData, &cmprInfo, buffers, assist), &lino, _exit);
31,254!
1052

1053
  record->blockKeySize = buffers[0].size + buffers[1].size;
31,255✔
1054
  record->blockSize = record->blockKeySize + buffers[2].size + buffers[3].size;
31,255✔
1055

1056
  int32_t encryptAlgorithm = writer->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
31,255✔
1057
  char   *encryptKey = writer->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
31,255✔
1058
  for (int i = 0; i < 4; i++) {
156,269✔
1059
    TAOS_CHECK_GOTO(tsdbWriteFile(writer->fd[TSDB_FTYPE_DATA], writer->files[TSDB_FTYPE_DATA].size, buffers[i].data,
125,013!
1060
                                  buffers[i].size, encryptAlgorithm, encryptKey),
1061
                    &lino, _exit);
1062
    writer->files[TSDB_FTYPE_DATA].size += buffers[i].size;
125,014✔
1063
  }
1064

1065
  // to .sma file
1066
  tBufferClear(&buffers[0]);
1067
  for (int32_t i = 0; i < bData->nColData; ++i) {
336,502✔
1068
    SColData *colData = bData->aColData + i;
305,244✔
1069
    if ((colData->cflag & COL_SMA_ON) == 0 || ((colData->flag & HAS_VALUE) == 0)) continue;
305,244✔
1070

1071
    SColumnDataAgg sma[1] = {{.colId = colData->cid}};
281,774✔
1072
    tColDataCalcSMA[colData->type](colData, sma);
281,774✔
1073

1074
    TAOS_CHECK_GOTO(tPutColumnDataAgg(&buffers[0], sma), &lino, _exit);
281,774!
1075
  }
1076
  record->smaSize = buffers[0].size;
31,258✔
1077

1078
  if (record->smaSize > 0) {
31,258✔
1079
    TAOS_CHECK_GOTO(tsdbWriteFile(writer->fd[TSDB_FTYPE_SMA], record->smaOffset, buffers[0].data, record->smaSize,
31,243!
1080
                                  encryptAlgorithm, encryptKey),
1081
                    &lino, _exit);
1082
    writer->files[TSDB_FTYPE_SMA].size += record->smaSize;
31,243✔
1083
  }
1084

1085
  // append SBrinRecord
1086
  TAOS_CHECK_GOTO(tsdbDataFileWriteBrinRecord(writer, record), &lino, _exit);
31,258!
1087

1088
  tBlockDataClear(bData);
31,255✔
1089

1090
_exit:
31,252✔
1091
  if (code) {
31,252!
1092
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1093
              tstrerror(code));
1094
  }
1095
  taosHashCleanup(cmprInfo.pColCmpr);
31,252✔
1096
  return code;
31,255✔
1097
}
1098

1099
static int32_t tsdbDataFileDoWriteTSRow(SDataFileWriter *writer, TSDBROW *row) {
30,201,463✔
1100
  int32_t code = 0;
30,201,463✔
1101
  int32_t lino = 0;
30,201,463✔
1102

1103
  // update/append
1104
  if (row->type == TSDBROW_ROW_FMT) {
30,201,463!
1105
    TAOS_CHECK_GOTO(
×
1106
        tsdbUpdateSkmRow(writer->config->tsdb, writer->ctx->tbid, TSDBROW_SVERSION(row), writer->config->skmRow), &lino,
1107
        _exit);
1108
  }
1109

1110
  if (TSDBROW_VERSION(row) <= writer->config->compactVersion  //
30,201,463!
1111
      && writer->blockData->nRow > 0                          //
30,212,579✔
1112
      &&
30,031,673✔
1113
      tsdbRowCompareWithoutVersion(row, &tsdbRowFromBlockData(writer->blockData, writer->blockData->nRow - 1)) == 0  //
30,212,403✔
1114
  ) {
1115
    TAOS_CHECK_GOTO(tBlockDataUpdateRow(writer->blockData, row, writer->config->skmRow->pTSchema), &lino, _exit);
5,163,827!
1116
  } else {
1117
    if (writer->blockData->nRow >= writer->config->maxRow) {
24,856,906✔
1118
      TAOS_CHECK_GOTO(tsdbDataFileDoWriteBlockData(writer, writer->blockData), &lino, _exit);
3,051!
1119
    }
1120

1121
    TAOS_CHECK_GOTO(
24,856,906!
1122
        tBlockDataAppendRow(writer->blockData, row, writer->config->skmRow->pTSchema, writer->ctx->tbid->uid), &lino,
1123
        _exit);
1124
  }
1125

1126
_exit:
30,432,708✔
1127
  if (code) {
30,432,708!
1128
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1129
              tstrerror(code));
1130
  }
1131
  return code;
30,428,920✔
1132
}
1133

1134
static FORCE_INLINE int32_t tsdbRowKeyCmprNullAsLargest(const STsdbRowKey *key1, const STsdbRowKey *key2) {
1135
  if (key1 == NULL) {
17,566,548!
1136
    return 1;
496✔
1137
  } else if (key2 == NULL) {
17,566,052!
1138
    return -1;
8,810✔
1139
  } else {
1140
    return tsdbRowKeyCmpr(key1, key2);
17,557,242✔
1141
  }
1142
}
1143

1144
static int32_t tsdbDataFileDoWriteTableOldData(SDataFileWriter *writer, const STsdbRowKey *key) {
8,758,800✔
1145
  if (writer->ctx->tbHasOldData == false) {
8,758,800!
1146
    return 0;
×
1147
  }
1148

1149
  int32_t     code = 0;
8,758,800✔
1150
  int32_t     lino = 0;
8,758,800✔
1151
  STsdbRowKey rowKey;
1152

1153
  for (;;) {
×
1154
    for (;;) {
1155
      // SBlockData
1156
      for (; writer->ctx->blockDataIdx < writer->ctx->blockData->nRow; writer->ctx->blockDataIdx++) {
17,766,347✔
1157
        TSDBROW row = tsdbRowFromBlockData(writer->ctx->blockData, writer->ctx->blockDataIdx);
17,651,269✔
1158

1159
        tsdbRowGetKey(&row, &rowKey);
17,651,269✔
1160
        if (tsdbRowKeyCmprNullAsLargest(&rowKey, key) < 0) {  // key <= rowKey
17,544,848✔
1161
          TAOS_CHECK_GOTO(tsdbDataFileDoWriteTSRow(writer, &row), &lino, _exit);
17,544,848!
1162
        } else {
1163
          goto _exit;
8,573,887✔
1164
        }
1165
      }
1166

1167
      // SBrinBlock
1168
      if (writer->ctx->brinBlockIdx >= writer->ctx->brinBlock->numOfRecords) {
115,078✔
1169
        break;
41✔
1170
      }
1171

1172
      for (; writer->ctx->brinBlockIdx < writer->ctx->brinBlock->numOfRecords; writer->ctx->brinBlockIdx++) {
115,977✔
1173
        SBrinRecord record;
1174
        code = tBrinBlockGet(writer->ctx->brinBlock, writer->ctx->brinBlockIdx, &record);
22,268✔
1175
        TSDB_CHECK_CODE(code, lino, _exit);
41,372!
1176
        if (record.uid != writer->ctx->tbid->uid) {
22,268✔
1177
          writer->ctx->tbHasOldData = false;
84✔
1178
          goto _exit;
84✔
1179
        }
1180

1181
        if (tsdbRowKeyCmprNullAsLargest(key, &record.firstKey) < 0) {  // key < record->firstKey
22,184✔
1182
          goto _exit;
19,020✔
1183
        } else {
1184
          SBrinRecord record[1];
1185
          code = tBrinBlockGet(writer->ctx->brinBlock, writer->ctx->brinBlockIdx, record);
3,164✔
1186
          TSDB_CHECK_CODE(code, lino, _exit);
3,164!
1187
          if (tsdbRowKeyCmprNullAsLargest(key, &record->lastKey) > 0) {  // key > record->lastKey
3,164✔
1188
            if (writer->blockData->nRow > 0) {
940✔
1189
              TAOS_CHECK_GOTO(tsdbDataFileDoWriteBlockData(writer, writer->blockData), &lino, _exit);
70!
1190
            }
1191

1192
            TAOS_CHECK_GOTO(tsdbDataFileWriteBrinRecord(writer, record), &lino, _exit);
940!
1193
          } else {
1194
            TAOS_CHECK_GOTO(tsdbDataFileReadBlockData(writer->ctx->reader, record, writer->ctx->blockData), &lino,
2,224!
1195
                            _exit);
1196

1197
            writer->ctx->blockDataIdx = 0;
2,224✔
1198
            writer->ctx->brinBlockIdx++;
2,224✔
1199
            break;
2,224✔
1200
          }
1201
        }
1202
      }
1203
    }
1204

1205
    // SBrinBlk
1206
    if (writer->ctx->brinBlkArrayIdx >= TARRAY2_SIZE(writer->ctx->brinBlkArray)) {
41✔
1207
      writer->ctx->brinBlkArray = NULL;
24✔
1208
      writer->ctx->tbHasOldData = false;
24✔
1209
      goto _exit;
24✔
1210
    } else {
1211
      const SBrinBlk *brinBlk = TARRAY2_GET_PTR(writer->ctx->brinBlkArray, writer->ctx->brinBlkArrayIdx);
17✔
1212

1213
      if (brinBlk->minTbid.uid != writer->ctx->tbid->uid) {
17!
UNCOV
1214
        writer->ctx->tbHasOldData = false;
×
UNCOV
1215
        goto _exit;
×
1216
      }
1217

1218
      TAOS_CHECK_GOTO(tsdbDataFileReadBrinBlock(writer->ctx->reader, brinBlk, writer->ctx->brinBlock), &lino, _exit);
17!
1219

1220
      writer->ctx->brinBlockIdx = 0;
×
1221
      writer->ctx->brinBlkArrayIdx++;
×
1222
    }
1223
  }
1224

1225
_exit:
8,683,897✔
1226
  if (code) {
8,683,897!
1227
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1228
              tstrerror(code));
1229
  }
1230
  return code;
8,760,833✔
1231
}
1232

1233
static int32_t tsdbDataFileDoWriteTSData(SDataFileWriter *writer, TSDBROW *row) {
21,479,500✔
1234
  int32_t code = 0;
21,479,500✔
1235
  int32_t lino = 0;
21,479,500✔
1236

1237
  if (writer->ctx->tbHasOldData) {
21,479,500✔
1238
    STsdbRowKey key;
1239
    tsdbRowGetKey(row, &key);
8,793,525✔
1240
    TAOS_CHECK_GOTO(tsdbDataFileDoWriteTableOldData(writer, &key), &lino, _exit);
8,758,660!
1241
  }
1242

1243
  TAOS_CHECK_GOTO(tsdbDataFileDoWriteTSRow(writer, row), &lino, _exit);
21,444,099!
1244

1245
_exit:
21,511,958✔
1246
  if (code) {
21,511,958!
1247
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1248
              tstrerror(code));
1249
  }
1250
  return code;
21,512,916✔
1251
}
1252

1253
static int32_t tsdbDataFileWriteTableDataEnd(SDataFileWriter *writer) {
17,541✔
1254
  if (writer->ctx->tbid->uid == 0) {
17,541✔
1255
    return 0;
1,435✔
1256
  }
1257

1258
  int32_t code = 0;
16,106✔
1259
  int32_t lino = 0;
16,106✔
1260

1261
  if (writer->ctx->tbHasOldData) {
16,106✔
1262
    TAOS_CHECK_GOTO(tsdbDataFileDoWriteTableOldData(writer, NULL /* as the largest key */), &lino, _exit);
11!
1263
  }
1264

1265
  TAOS_CHECK_GOTO(tsdbDataFileDoWriteBlockData(writer, writer->blockData), &lino, _exit);
16,106!
1266

1267
_exit:
16,107✔
1268
  if (code) {
16,107!
1269
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1270
              tstrerror(code));
1271
  }
1272
  return code;
16,107✔
1273
}
1274

1275
static int32_t tsdbDataFileWriteTableDataBegin(SDataFileWriter *writer, const TABLEID *tbid) {
17,542✔
1276
  int32_t code = 0;
17,542✔
1277
  int32_t lino = 0;
17,542✔
1278

1279
  SMetaInfo info;
1280
  bool      drop = false;
17,542✔
1281
  TABLEID   tbid1[1];
1282
  writer->ctx->tbHasOldData = false;
17,542✔
1283
  while (writer->ctx->brinBlkArray) {  // skip data of previous table
17,714✔
1284
    for (; writer->ctx->brinBlockIdx < writer->ctx->brinBlock->numOfRecords; writer->ctx->brinBlockIdx++) {
30,889✔
1285
      SBrinRecord record;
1286
      TAOS_CHECK_GOTO(tBrinBlockGet(writer->ctx->brinBlock, writer->ctx->brinBlockIdx, &record), &lino, _exit);
30,668!
1287

1288
      if (record.uid == tbid->uid) {
30,373✔
1289
        writer->ctx->tbHasOldData = true;
108✔
1290
        goto _begin;
191✔
1291
      } else if (record.suid > tbid->suid || (record.suid == tbid->suid && record.uid > tbid->uid)) {
30,265✔
1292
        goto _begin;
83✔
1293
      } else {
1294
        if (record.uid != writer->ctx->tbid->uid) {
30,182✔
1295
          if (drop && tbid1->uid == record.uid) {
9,035!
1296
            continue;
12✔
1297
          } else if (metaGetInfo(writer->config->tsdb->pVnode->pMeta, record.uid, &info, NULL) != 0) {
9,035✔
1298
            drop = true;
12✔
1299
            tbid1->suid = record.suid;
12✔
1300
            tbid1->uid = record.uid;
12✔
1301
            continue;
12✔
1302
          } else {
1303
            drop = false;
9,069✔
1304
            writer->ctx->tbid->suid = record.suid;
9,069✔
1305
            writer->ctx->tbid->uid = record.uid;
9,069✔
1306
          }
1307
        }
1308

1309
        TAOS_CHECK_GOTO(tsdbDataFileWriteBrinRecord(writer, &record), &lino, _exit);
30,216!
1310
      }
1311
    }
1312

1313
    if (writer->ctx->brinBlkArrayIdx >= TARRAY2_SIZE(writer->ctx->brinBlkArray)) {
221✔
1314
      writer->ctx->brinBlkArray = NULL;
45✔
1315
      break;
45✔
1316
    } else {
1317
      const SBrinBlk *brinBlk = TARRAY2_GET_PTR(writer->ctx->brinBlkArray, writer->ctx->brinBlkArrayIdx);
176✔
1318

1319
      TAOS_CHECK_GOTO(tsdbDataFileReadBrinBlock(writer->ctx->reader, brinBlk, writer->ctx->brinBlock), &lino, _exit);
176!
1320

1321
      writer->ctx->brinBlockIdx = 0;
172✔
1322
      writer->ctx->brinBlkArrayIdx++;
172✔
1323
    }
1324
  }
1325

1326
_begin:
17,306✔
1327
  writer->ctx->tbid[0] = *tbid;
17,542✔
1328

1329
  if (tbid->uid == INT64_MAX) {
17,542✔
1330
    goto _exit;
1,435✔
1331
  }
1332

1333
  TAOS_CHECK_GOTO(tsdbUpdateSkmTb(writer->config->tsdb, tbid, writer->config->skmTb), &lino, _exit);
16,107!
1334
  TAOS_CHECK_GOTO(tBlockDataInit(writer->blockData, writer->ctx->tbid, writer->config->skmTb->pTSchema, NULL, 0), &lino,
16,107!
1335
                  _exit);
1336

1337
_exit:
16,104✔
1338
  if (code) {
17,539!
1339
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1340
              tstrerror(code));
1341
  }
1342
  return code;
17,539✔
1343
}
1344

1345
int32_t tsdbFileWriteHeadFooter(STsdbFD *fd, int64_t *fileSize, const SHeadFooter *footer, int32_t encryptAlgorithm,
1,435✔
1346
                                char *encryptKey) {
1347
  TAOS_CHECK_RETURN(
1,435!
1348
      tsdbWriteFile(fd, *fileSize, (const uint8_t *)footer, sizeof(*footer), encryptAlgorithm, encryptKey));
1349
  *fileSize += sizeof(*footer);
1,435✔
1350
  return 0;
1,435✔
1351
}
1352

1353
int32_t tsdbFileWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAlg, int64_t *fileSize,
5,265✔
1354
                               TTombBlkArray *tombBlkArray, SBuffer *buffers, SVersionRange *range,
1355
                               int32_t encryptAlgorithm, char *encryptKey) {
1356
  int32_t code;
1357

1358
  if (TOMB_BLOCK_SIZE(tombBlock) == 0) {
5,265!
1359
    return 0;
×
1360
  }
1361

1362
  SBuffer *buffer0 = buffers + 0;
5,265✔
1363
  SBuffer *assist = buffers + 1;
5,265✔
1364

1365
  STombBlk tombBlk = {
5,265✔
1366
      .dp[0] =
1367
          {
1368
              .offset = *fileSize,
5,265✔
1369
              .size = 0,
1370
          },
1371
      .numRec = TOMB_BLOCK_SIZE(tombBlock),
5,265✔
1372
      .cmprAlg = cmprAlg,
1373
  };
1374
  for (int i = 0; i < TOMB_BLOCK_SIZE(tombBlock); i++) {
189,260✔
1375
    STombRecord record;
1376
    TAOS_CHECK_RETURN(tTombBlockGet(tombBlock, i, &record));
183,996!
1377

1378
    if (i == 0) {
183,995✔
1379
      tombBlk.minTbid.suid = record.suid;
5,265✔
1380
      tombBlk.minTbid.uid = record.uid;
5,265✔
1381
      tombBlk.minVer = record.version;
5,265✔
1382
      tombBlk.maxVer = record.version;
5,265✔
1383
    }
1384
    if (i == TOMB_BLOCK_SIZE(tombBlock) - 1) {
183,995✔
1385
      tombBlk.maxTbid.suid = record.suid;
5,265✔
1386
      tombBlk.maxTbid.uid = record.uid;
5,265✔
1387
    }
1388
    if (record.version < tombBlk.minVer) {
183,995✔
1389
      tombBlk.minVer = record.version;
2,309✔
1390
    }
1391
    if (record.version > tombBlk.maxVer) {
183,995✔
1392
      tombBlk.maxVer = record.version;
117,003✔
1393
    }
1394
  }
1395

1396
  tsdbWriterUpdVerRange(range, tombBlk.minVer, tombBlk.maxVer);
5,264✔
1397

1398
  for (int32_t i = 0; i < ARRAY_SIZE(tombBlock->buffers); i++) {
31,590✔
1399
    tBufferClear(buffer0);
1400

1401
    SCompressInfo cinfo = {
26,325✔
1402
        .cmprAlg = cmprAlg,
1403
        .dataType = TSDB_DATA_TYPE_BIGINT,
1404
        .originalSize = tombBlock->buffers[i].size,
26,325✔
1405
    };
1406
    TAOS_CHECK_RETURN(tCompressDataToBuffer(tombBlock->buffers[i].data, &cinfo, buffer0, assist));
26,325!
1407
    TAOS_CHECK_RETURN(tsdbWriteFile(fd, *fileSize, buffer0->data, buffer0->size, encryptAlgorithm, encryptKey));
26,325!
1408

1409
    tombBlk.size[i] = cinfo.compressedSize;
26,325✔
1410
    tombBlk.dp->size += tombBlk.size[i];
26,325✔
1411
    *fileSize += tombBlk.size[i];
26,325✔
1412
  }
1413

1414
  TAOS_CHECK_RETURN(TARRAY2_APPEND_PTR(tombBlkArray, &tombBlk));
10,529!
1415

1416
  tTombBlockClear(tombBlock);
5,264✔
1417
  return 0;
5,264✔
1418
}
1419

1420
static int32_t tsdbDataFileWriteHeadFooter(SDataFileWriter *writer) {
1,435✔
1421
  int32_t code = 0;
1,435✔
1422
  int32_t lino = 0;
1,435✔
1423

1424
  int32_t encryptAlgorithm = writer->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
1,435✔
1425
  char   *encryptKey = writer->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
1,435✔
1426

1427
  TAOS_CHECK_GOTO(tsdbFileWriteHeadFooter(writer->fd[TSDB_FTYPE_HEAD], &writer->files[TSDB_FTYPE_HEAD].size,
1,435!
1428
                                          writer->headFooter, encryptAlgorithm, encryptKey),
1429
                  &lino, _exit);
1430

1431
_exit:
1,435✔
1432
  if (code) {
1,435!
1433
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1434
              tstrerror(code));
1435
  }
1436
  return code;
1,435✔
1437
}
1438

1439
static int32_t tsdbDataFileDoWriteTombBlock(SDataFileWriter *writer) {
817✔
1440
  if (TOMB_BLOCK_SIZE(writer->tombBlock) == 0) return 0;
817!
1441

1442
  int32_t code = 0;
817✔
1443
  int32_t lino = 0;
817✔
1444

1445
  int32_t encryptAlgorithm = writer->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
817✔
1446
  char   *encryptKey = writer->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
817✔
1447

1448
  TAOS_CHECK_GOTO(tsdbFileWriteTombBlock(writer->fd[TSDB_FTYPE_TOMB], writer->tombBlock, writer->config->cmprAlg,
817!
1449
                                         &writer->files[TSDB_FTYPE_TOMB].size, writer->tombBlkArray, writer->buffers,
1450
                                         &writer->ctx->tombRange, encryptAlgorithm, encryptKey),
1451
                  &lino, _exit);
1452

1453
_exit:
817✔
1454
  if (code) {
817!
1455
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1456
              tstrerror(code));
1457
  }
1458
  return code;
817✔
1459
}
1460

1461
int32_t tsdbFileWriteTombBlk(STsdbFD *fd, const TTombBlkArray *tombBlkArray, SFDataPtr *ptr, int64_t *fileSize,
341,295✔
1462
                             int32_t encryptAlgorithm, char *encryptKey) {
1463
  ptr->size = TARRAY2_DATA_LEN(tombBlkArray);
341,295✔
1464
  if (ptr->size > 0) {
341,295✔
1465
    ptr->offset = *fileSize;
5,265✔
1466

1467
    TAOS_CHECK_RETURN(tsdbWriteFile(fd, *fileSize, (const uint8_t *)TARRAY2_DATA(tombBlkArray), ptr->size,
5,265!
1468
                                    encryptAlgorithm, encryptKey));
1469

1470
    *fileSize += ptr->size;
5,265✔
1471
  }
1472
  return 0;
341,295✔
1473
}
1474

1475
static int32_t tsdbDataFileDoWriteTombBlk(SDataFileWriter *writer) {
817✔
1476
  if (TARRAY2_SIZE(writer->tombBlkArray) <= 0) {
817!
1477
    return TSDB_CODE_INVALID_PARA;
×
1478
  }
1479

1480
  int32_t code = 0;
817✔
1481
  int32_t lino = 0;
817✔
1482

1483
  int32_t encryptAlgorithm = writer->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
817✔
1484
  char   *encryptKey = writer->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
817✔
1485

1486
  TAOS_CHECK_GOTO(
817!
1487
      tsdbFileWriteTombBlk(writer->fd[TSDB_FTYPE_TOMB], writer->tombBlkArray, writer->tombFooter->tombBlkPtr,
1488
                           &writer->files[TSDB_FTYPE_TOMB].size, encryptAlgorithm, encryptKey),
1489
      &lino, _exit);
1490

1491
_exit:
817✔
1492
  if (code) {
817!
1493
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1494
              tstrerror(code));
1495
  }
1496
  return code;
817✔
1497
}
1498

1499
int32_t tsdbFileWriteTombFooter(STsdbFD *fd, const STombFooter *footer, int64_t *fileSize, int32_t encryptAlgorithm,
817✔
1500
                                char *encryptKey) {
1501
  TAOS_CHECK_RETURN(
817!
1502
      tsdbWriteFile(fd, *fileSize, (const uint8_t *)footer, sizeof(*footer), encryptAlgorithm, encryptKey));
1503
  *fileSize += sizeof(*footer);
817✔
1504
  return 0;
817✔
1505
}
1506

1507
static int32_t tsdbDataFileWriteTombFooter(SDataFileWriter *writer) {
817✔
1508
  int32_t code = 0;
817✔
1509
  int32_t lino = 0;
817✔
1510

1511
  int32_t encryptAlgorithm = writer->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
817✔
1512
  char   *encryptKey = writer->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
817✔
1513

1514
  TAOS_CHECK_GOTO(tsdbFileWriteTombFooter(writer->fd[TSDB_FTYPE_TOMB], writer->tombFooter,
817!
1515
                                          &writer->files[TSDB_FTYPE_TOMB].size, encryptAlgorithm, encryptKey),
1516
                  &lino, _exit);
1517

1518
_exit:
817✔
1519
  if (code) {
817!
1520
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1521
              tstrerror(code));
1522
  }
1523
  return code;
817✔
1524
}
1525

1526
static int32_t tsdbDataFileDoWriteTombRecord(SDataFileWriter *writer, const STombRecord *record) {
3,023✔
1527
  int32_t code = 0;
3,023✔
1528
  int32_t lino = 0;
3,023✔
1529

1530
  while (writer->ctx->hasOldTomb) {
3,549✔
1531
    for (; writer->ctx->tombBlockIdx < TOMB_BLOCK_SIZE(writer->ctx->tombBlock); writer->ctx->tombBlockIdx++) {
168,036✔
1532
      STombRecord record1[1];
1533
      TAOS_CHECK_GOTO(tTombBlockGet(writer->ctx->tombBlock, writer->ctx->tombBlockIdx, record1), &lino, _exit);
166,984!
1534

1535
      int32_t c = tTombRecordCompare(record, record1);
166,984✔
1536
      if (c < 0) {
166,984✔
1537
        goto _write;
625✔
1538
      } else if (c > 0) {
166,359!
1539
        TAOS_CHECK_GOTO(tTombBlockPut(writer->tombBlock, record1), &lino, _exit);
166,359!
1540

1541
        tsdbTrace("vgId:%d write tomb record to tomb file:%s, cid:%" PRId64 ", suid:%" PRId64 ", uid:%" PRId64
166,359!
1542
                  ", version:%" PRId64,
1543
                  TD_VID(writer->config->tsdb->pVnode), writer->fd[TSDB_FTYPE_TOMB]->path, writer->config->cid,
1544
                  record1->suid, record1->uid, record1->version);
1545

1546
        if (TOMB_BLOCK_SIZE(writer->tombBlock) >= writer->config->maxRow) {
166,359!
1547
          TAOS_CHECK_GOTO(tsdbDataFileDoWriteTombBlock(writer), &lino, _exit);
×
1548
        }
1549
      } else {
1550
        tsdbError("vgId:%d duplicate tomb record, cid:%" PRId64 ", suid:%" PRId64 ", uid:%" PRId64 ", version:%" PRId64,
×
1551
                  TD_VID(writer->config->tsdb->pVnode), writer->config->cid, record->suid, record->uid,
1552
                  record->version);
1553
      }
1554
    }
1555

1556
    if (writer->ctx->tombBlkArrayIdx >= TARRAY2_SIZE(writer->ctx->tombBlkArray)) {
1,052✔
1557
      writer->ctx->hasOldTomb = false;
526✔
1558
      break;
526✔
1559
    } else {
1560
      const STombBlk *tombBlk = TARRAY2_GET_PTR(writer->ctx->tombBlkArray, writer->ctx->tombBlkArrayIdx);
526✔
1561

1562
      TAOS_CHECK_GOTO(tsdbDataFileReadTombBlock(writer->ctx->reader, tombBlk, writer->ctx->tombBlock), &lino, _exit);
526!
1563

1564
      writer->ctx->tombBlockIdx = 0;
526✔
1565
      writer->ctx->tombBlkArrayIdx++;
526✔
1566
    }
1567
  }
1568

1569
_write:
1,872✔
1570
  if (record->suid == INT64_MAX) {
3,023✔
1571
    goto _exit;
817✔
1572
  }
1573

1574
  TAOS_CHECK_GOTO(tTombBlockPut(writer->tombBlock, record), &lino, _exit);
2,206!
1575

1576
  tsdbTrace("vgId:%d write tomb record to tomb file:%s, cid:%" PRId64 ", suid:%" PRId64 ", uid:%" PRId64
2,206!
1577
            ", version:%" PRId64,
1578
            TD_VID(writer->config->tsdb->pVnode), writer->fd[TSDB_FTYPE_TOMB]->path, writer->config->cid, record->suid,
1579
            record->uid, record->version);
1580

1581
  if (TOMB_BLOCK_SIZE(writer->tombBlock) >= writer->config->maxRow) {
2,206!
1582
    TAOS_CHECK_GOTO(tsdbDataFileDoWriteTombBlock(writer), &lino, _exit);
×
1583
  }
1584

1585
_exit:
2,206✔
1586
  if (code) {
3,023!
1587
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1588
              tstrerror(code));
1589
  }
1590
  return code;
3,023✔
1591
}
1592

1593
int32_t tsdbFileWriteBrinBlk(STsdbFD *fd, TBrinBlkArray *brinBlkArray, SFDataPtr *ptr, int64_t *fileSize,
1,435✔
1594
                             int32_t encryptAlgorithm, char *encryptKey) {
1595
  if (TARRAY2_SIZE(brinBlkArray) <= 0) {
1,435!
1596
    return TSDB_CODE_INVALID_PARA;
×
1597
  }
1598
  ptr->offset = *fileSize;
1,435✔
1599
  ptr->size = TARRAY2_DATA_LEN(brinBlkArray);
1,435✔
1600

1601
  TAOS_CHECK_RETURN(
1,435!
1602
      tsdbWriteFile(fd, ptr->offset, (uint8_t *)TARRAY2_DATA(brinBlkArray), ptr->size, encryptAlgorithm, encryptKey));
1603

1604
  *fileSize += ptr->size;
1,435✔
1605
  return 0;
1,435✔
1606
}
1607

1608
static int32_t tsdbDataFileWriteBrinBlk(SDataFileWriter *writer) {
1,435✔
1609
  int32_t code = 0;
1,435✔
1610
  int32_t lino = 0;
1,435✔
1611

1612
  int32_t encryptAlgorithm = writer->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
1,435✔
1613
  char   *encryptKey = writer->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
1,435✔
1614

1615
  TAOS_CHECK_GOTO(
1,435!
1616
      tsdbFileWriteBrinBlk(writer->fd[TSDB_FTYPE_HEAD], writer->brinBlkArray, writer->headFooter->brinBlkPtr,
1617
                           &writer->files[TSDB_FTYPE_HEAD].size, encryptAlgorithm, encryptKey),
1618
      &lino, _exit);
1619

1620
_exit:
1,435✔
1621
  if (code) {
1,435!
1622
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1623
              tstrerror(code));
1624
  }
1625
  return code;
1,435✔
1626
}
1627

1628
void tsdbTFileUpdVerRange(STFile *f, SVersionRange range) {
347,951✔
1629
  f->minVer = TMIN(f->minVer, range.minVer);
347,951✔
1630
  f->maxVer = TMAX(f->maxVer, range.maxVer);
347,951✔
1631
}
347,951✔
1632

1633
static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer, TFileOpArray *opArr) {
2,248✔
1634
  int32_t code = 0;
2,248✔
1635
  int32_t lino = 0;
2,248✔
1636

1637
  int32_t  ftype;
1638
  STFileOp op;
1639

1640
  if (writer->fd[TSDB_FTYPE_HEAD]) {
2,248✔
1641
    TABLEID tbid[1] = {{
1,435✔
1642
        .suid = INT64_MAX,
1643
        .uid = INT64_MAX,
1644
    }};
1645

1646
    TAOS_CHECK_GOTO(tsdbDataFileWriteTableDataEnd(writer), &lino, _exit);
1,435!
1647
    TAOS_CHECK_GOTO(tsdbDataFileWriteTableDataBegin(writer, tbid), &lino, _exit);
1,435!
1648
    TAOS_CHECK_GOTO(tsdbDataFileWriteBrinBlock(writer), &lino, _exit);
1,435!
1649
    TAOS_CHECK_GOTO(tsdbDataFileWriteBrinBlk(writer), &lino, _exit);
1,435!
1650
    TAOS_CHECK_GOTO(tsdbDataFileWriteHeadFooter(writer), &lino, _exit);
1,435!
1651

1652
    SVersionRange ofRange = {.minVer = VERSION_MAX, .maxVer = VERSION_MIN};
1,435✔
1653

1654
    // .head
1655
    ftype = TSDB_FTYPE_HEAD;
1,435✔
1656
    if (writer->config->files[ftype].exist) {
1,435✔
1657
      op = (STFileOp){
69✔
1658
          .optype = TSDB_FOP_REMOVE,
1659
          .fid = writer->config->fid,
69✔
1660
          .of = writer->config->files[ftype].file,
69✔
1661
      };
1662
      ofRange = (SVersionRange){.minVer = op.of.minVer, .maxVer = op.of.maxVer};
69✔
1663
      TAOS_CHECK_GOTO(TARRAY2_APPEND(opArr, op), &lino, _exit);
138!
1664
    }
1665
    op = (STFileOp){
1,435✔
1666
        .optype = TSDB_FOP_CREATE,
1667
        .fid = writer->config->fid,
1,435✔
1668
        .nf = writer->files[ftype],
1,435✔
1669
    };
1670
    tsdbTFileUpdVerRange(&op.nf, ofRange);
1,435✔
1671
    tsdbTFileUpdVerRange(&op.nf, writer->ctx->range);
1,435✔
1672
    TAOS_CHECK_GOTO(TARRAY2_APPEND(opArr, op), &lino, _exit);
2,869!
1673

1674
    // .data
1675
    ftype = TSDB_FTYPE_DATA;
1,434✔
1676
    if (!writer->config->files[ftype].exist) {
1,434✔
1677
      op = (STFileOp){
1,365✔
1678
          .optype = TSDB_FOP_CREATE,
1679
          .fid = writer->config->fid,
1,365✔
1680
          .nf = writer->files[ftype],
1,365✔
1681
      };
1682
      tsdbTFileUpdVerRange(&op.nf, writer->ctx->range);
1,365✔
1683
      TAOS_CHECK_GOTO(TARRAY2_APPEND(opArr, op), &lino, _exit);
2,732!
1684
    } else if (writer->config->files[ftype].file.size != writer->files[ftype].size) {
69!
1685
      op = (STFileOp){
69✔
1686
          .optype = TSDB_FOP_MODIFY,
1687
          .fid = writer->config->fid,
69✔
1688
          .of = writer->config->files[ftype].file,
69✔
1689
          .nf = writer->files[ftype],
69✔
1690
      };
1691
      tsdbTFileUpdVerRange(&op.nf, writer->ctx->range);
69✔
1692
      TAOS_CHECK_GOTO(TARRAY2_APPEND(opArr, op), &lino, _exit);
138!
1693
    }
1694

1695
    // .sma
1696
    ftype = TSDB_FTYPE_SMA;
1,435✔
1697
    if (!writer->config->files[ftype].exist) {
1,435✔
1698
      op = (STFileOp){
1,365✔
1699
          .optype = TSDB_FOP_CREATE,
1700
          .fid = writer->config->fid,
1,365✔
1701
          .nf = writer->files[ftype],
1,365✔
1702
      };
1703
      tsdbTFileUpdVerRange(&op.nf, writer->ctx->range);
1,365✔
1704
      TAOS_CHECK_GOTO(TARRAY2_APPEND(opArr, op), &lino, _exit);
2,730!
1705
    } else if (writer->config->files[ftype].file.size != writer->files[ftype].size) {
70✔
1706
      op = (STFileOp){
69✔
1707
          .optype = TSDB_FOP_MODIFY,
1708
          .fid = writer->config->fid,
69✔
1709
          .of = writer->config->files[ftype].file,
69✔
1710
          .nf = writer->files[ftype],
69✔
1711
      };
1712
      tsdbTFileUpdVerRange(&op.nf, writer->ctx->range);
69✔
1713
      TAOS_CHECK_GOTO(TARRAY2_APPEND(opArr, op), &lino, _exit);
138!
1714
    }
1715
  }
1716

1717
  if (writer->fd[TSDB_FTYPE_TOMB]) {
2,248✔
1718
    STombRecord record[1] = {{
817✔
1719
        .suid = INT64_MAX,
1720
        .uid = INT64_MAX,
1721
        .version = INT64_MAX,
1722
    }};
1723

1724
    TAOS_CHECK_GOTO(tsdbDataFileDoWriteTombRecord(writer, record), &lino, _exit);
817!
1725
    TAOS_CHECK_GOTO(tsdbDataFileDoWriteTombBlock(writer), &lino, _exit);
817!
1726
    TAOS_CHECK_GOTO(tsdbDataFileDoWriteTombBlk(writer), &lino, _exit);
817!
1727
    TAOS_CHECK_GOTO(tsdbDataFileWriteTombFooter(writer), &lino, _exit);
817!
1728

1729
    SVersionRange ofRange = (SVersionRange){.minVer = VERSION_MAX, .maxVer = VERSION_MIN};
817✔
1730

1731
    ftype = TSDB_FTYPE_TOMB;
817✔
1732
    if (writer->config->files[ftype].exist) {
817✔
1733
      op = (STFileOp){
526✔
1734
          .optype = TSDB_FOP_REMOVE,
1735
          .fid = writer->config->fid,
526✔
1736
          .of = writer->config->files[ftype].file,
526✔
1737
      };
1738
      ofRange = (SVersionRange){.minVer = op.of.minVer, .maxVer = op.of.maxVer};
526✔
1739
      TAOS_CHECK_GOTO(TARRAY2_APPEND(opArr, op), &lino, _exit);
1,052!
1740
    }
1741
    op = (STFileOp){
817✔
1742
        .optype = TSDB_FOP_CREATE,
1743
        .fid = writer->config->fid,
817✔
1744
        .nf = writer->files[ftype],
817✔
1745
    };
1746
    tsdbTFileUpdVerRange(&op.nf, ofRange);
817✔
1747
    tsdbTFileUpdVerRange(&op.nf, writer->ctx->tombRange);
817✔
1748
    TAOS_CHECK_GOTO(TARRAY2_APPEND(opArr, op), &lino, _exit);
1,634!
1749
  }
1750
  int32_t encryptAlgorithm = writer->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
2,248✔
1751
  char   *encryptKey = writer->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
2,248✔
1752
  for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) {
11,240✔
1753
    if (writer->fd[i]) {
8,991✔
1754
      TAOS_CHECK_GOTO(tsdbFsyncFile(writer->fd[i], encryptAlgorithm, encryptKey), &lino, _exit);
5,121!
1755
      tsdbCloseFile(&writer->fd[i]);
5,122✔
1756
    }
1757
  }
1758

1759
_exit:
2,249✔
1760
  if (code) {
2,249!
1761
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1762
              tstrerror(code));
1763
  }
1764
  return code;
2,248✔
1765
}
1766

1767
static int32_t tsdbDataFileWriterOpenDataFD(SDataFileWriter *writer) {
1,435✔
1768
  int32_t code = 0;
1,435✔
1769
  int32_t lino = 0;
1,435✔
1770

1771
  int32_t ftypes[] = {TSDB_FTYPE_HEAD, TSDB_FTYPE_DATA, TSDB_FTYPE_SMA};
1,435✔
1772

1773
  for (int32_t i = 0; i < ARRAY_SIZE(ftypes); ++i) {
5,740✔
1774
    int32_t ftype = ftypes[i];
4,305✔
1775

1776
    char    fname[TSDB_FILENAME_LEN];
1777
    int32_t flag = TD_FILE_READ | TD_FILE_WRITE;
4,305✔
1778

1779
    if (writer->files[ftype].size == 0) {
4,305✔
1780
      flag |= (TD_FILE_CREATE | TD_FILE_TRUNC);
4,167✔
1781
    }
1782

1783
    int32_t lcn = writer->files[ftype].lcn;
4,305✔
1784
    tsdbTFileName(writer->config->tsdb, &writer->files[ftype], fname);
4,305✔
1785
    TAOS_CHECK_GOTO(tsdbOpenFile(fname, writer->config->tsdb, flag, &writer->fd[ftype], lcn), &lino, _exit);
4,304!
1786

1787
    if (writer->files[ftype].size == 0) {
4,305✔
1788
      uint8_t hdr[TSDB_FHDR_SIZE] = {0};
4,167✔
1789

1790
      int32_t encryptAlgorithm = writer->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
4,167✔
1791
      char   *encryptKey = writer->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
4,167✔
1792

1793
      TAOS_CHECK_GOTO(tsdbWriteFile(writer->fd[ftype], 0, hdr, TSDB_FHDR_SIZE, encryptAlgorithm, encryptKey), &lino,
4,167!
1794
                      _exit);
1795

1796
      writer->files[ftype].size += TSDB_FHDR_SIZE;
4,167✔
1797
    }
1798
  }
1799

1800
  if (writer->ctx->reader) {
1,435✔
1801
    TAOS_CHECK_GOTO(tsdbDataFileReadBrinBlk(writer->ctx->reader, &writer->ctx->brinBlkArray), &lino, _exit);
69!
1802
  }
1803

1804
_exit:
1,435✔
1805
  if (code) {
1,435!
1806
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1807
              tstrerror(code));
1808
  }
1809
  return code;
1,435✔
1810
}
1811

1812
int32_t tsdbDataFileWriterOpen(const SDataFileWriterConfig *config, SDataFileWriter **writer) {
7,229✔
1813
  writer[0] = taosMemoryCalloc(1, sizeof(*writer[0]));
7,229!
1814
  if (!writer[0]) {
7,232!
1815
    return terrno;
×
1816
  }
1817

1818
  writer[0]->config[0] = config[0];
7,232✔
1819
  return 0;
7,232✔
1820
}
1821

1822
int32_t tsdbDataFileWriterClose(SDataFileWriter **writer, bool abort, TFileOpArray *opArr) {
7,232✔
1823
  if (writer[0] == NULL) return 0;
7,232!
1824

1825
  int32_t code = 0;
7,232✔
1826
  int32_t lino = 0;
7,232✔
1827

1828
  if (writer[0]->ctx->opened) {
7,232✔
1829
    if (abort) {
2,248!
1830
      TAOS_CHECK_GOTO(tsdbDataFileWriterCloseAbort(writer[0]), &lino, _exit);
×
1831
    } else {
1832
      TAOS_CHECK_GOTO(tsdbDataFileWriterCloseCommit(writer[0], opArr), &lino, _exit);
2,248!
1833
    }
1834
    tsdbDataFileWriterDoClose(writer[0]);
2,248✔
1835
  }
1836
  taosMemoryFree(writer[0]);
7,232!
1837
  writer[0] = NULL;
7,232✔
1838

1839
_exit:
7,232✔
1840
  if (code) {
7,232!
1841
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer[0]->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1842
              tstrerror(code));
1843
  }
1844
  return code;
7,232✔
1845
}
1846

1847
int32_t tsdbDataFileWriteRow(SDataFileWriter *writer, SRowInfo *row) {
12,770,039✔
1848
  int32_t code = 0;
12,770,039✔
1849
  int32_t lino = 0;
12,770,039✔
1850

1851
  if (!writer->ctx->opened) {
12,770,039✔
1852
    TAOS_CHECK_GOTO(tsdbDataFileWriterDoOpen(writer), &lino, _exit);
16!
1853
  }
1854

1855
  if (writer->fd[TSDB_FTYPE_HEAD] == NULL) {
12,770,039✔
1856
    TAOS_CHECK_GOTO(tsdbDataFileWriterOpenDataFD(writer), &lino, _exit);
16!
1857
  }
1858

1859
  if (row->uid != writer->ctx->tbid->uid) {
12,770,039✔
1860
    TAOS_CHECK_GOTO(tsdbDataFileWriteTableDataEnd(writer), &lino, _exit);
125!
1861
    TAOS_CHECK_GOTO(tsdbDataFileWriteTableDataBegin(writer, (TABLEID *)row), &lino, _exit);
125!
1862
  }
1863

1864
  TAOS_CHECK_GOTO(tsdbDataFileDoWriteTSData(writer, &row->row), &lino, _exit);
12,770,039!
1865

1866
_exit:
12,783,866✔
1867
  if (code) {
12,783,866!
1868
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1869
              tstrerror(code));
1870
  }
1871
  return code;
12,785,109✔
1872
}
1873

1874
int32_t tsdbDataFileWriteBlockData(SDataFileWriter *writer, SBlockData *bData) {
25,868✔
1875
  if (bData->nRow == 0) {
25,868!
1876
    return 0;
×
1877
  }
1878

1879
  int32_t code = 0;
25,868✔
1880
  int32_t lino = 0;
25,868✔
1881

1882
  if (!bData->uid) {
25,868!
1883
    return TSDB_CODE_INVALID_PARA;
×
1884
  }
1885

1886
  if (!writer->ctx->opened) {
25,868✔
1887
    TAOS_CHECK_GOTO(tsdbDataFileWriterDoOpen(writer), &lino, _exit);
1,417!
1888
  }
1889

1890
  if (writer->fd[TSDB_FTYPE_DATA] == NULL) {
25,868✔
1891
    TAOS_CHECK_GOTO(tsdbDataFileWriterOpenDataFD(writer), &lino, _exit);
1,419!
1892
  }
1893

1894
  if (bData->uid != writer->ctx->tbid->uid) {
25,868✔
1895
    TAOS_CHECK_GOTO(tsdbDataFileWriteTableDataEnd(writer), &lino, _exit);
15,980!
1896
    TAOS_CHECK_GOTO(tsdbDataFileWriteTableDataBegin(writer, (TABLEID *)bData), &lino, _exit);
15,982!
1897
  }
1898

1899
  if (writer->ctx->tbHasOldData) {
25,867✔
1900
    STsdbRowKey key;
1901

1902
    tsdbRowGetKey(&tsdbRowFromBlockData(bData, 0), &key);
2,169✔
1903
    TAOS_CHECK_GOTO(tsdbDataFileDoWriteTableOldData(writer, &key), &lino, _exit);
2,169!
1904
  }
1905

1906
  if (!writer->ctx->tbHasOldData       //
25,867✔
1907
      && writer->blockData->nRow == 0  //
23,713!
1908
  ) {
1909
    TAOS_CHECK_GOTO(tsdbDataFileDoWriteBlockData(writer, bData), &lino, _exit);
23,713!
1910

1911
  } else {
1912
    for (int32_t i = 0; i < bData->nRow; ++i) {
8,726,472✔
1913
      TSDBROW row[1] = {tsdbRowFromBlockData(bData, i)};
8,724,229✔
1914
      TAOS_CHECK_GOTO(tsdbDataFileDoWriteTSData(writer, row), &lino, _exit);
8,724,229!
1915
    }
1916
  }
1917

1918
_exit:
2,243✔
1919
  if (code) {
25,959!
1920
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1921
              tstrerror(code));
1922
  }
1923
  return code;
25,869✔
1924
}
1925

1926
int32_t tsdbDataFileFlush(SDataFileWriter *writer) {
2,208✔
1927
  if (!writer->ctx->opened) {
2,208!
1928
    return TSDB_CODE_INVALID_PARA;
×
1929
  }
1930

1931
  if (writer->blockData->nRow == 0) return 0;
2,208!
1932
  if (writer->ctx->tbHasOldData) return 0;
2,208✔
1933

1934
  return tsdbDataFileDoWriteBlockData(writer, writer->blockData);
2,191✔
1935
}
1936

1937
static int32_t tsdbDataFileWriterOpenTombFD(SDataFileWriter *writer) {
817✔
1938
  int32_t code = 0;
817✔
1939
  int32_t lino = 0;
817✔
1940

1941
  char    fname[TSDB_FILENAME_LEN];
1942
  int32_t ftype = TSDB_FTYPE_TOMB;
817✔
1943

1944
  int32_t flag = (TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
817✔
1945

1946
  int32_t lcn = writer->files[ftype].lcn;
817✔
1947
  tsdbTFileName(writer->config->tsdb, writer->files + ftype, fname);
817✔
1948

1949
  TAOS_CHECK_GOTO(tsdbOpenFile(fname, writer->config->tsdb, flag, &writer->fd[ftype], lcn), &lino, _exit);
817!
1950

1951
  uint8_t hdr[TSDB_FHDR_SIZE] = {0};
817✔
1952
  int32_t encryptAlgorithm = writer->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
817✔
1953
  char   *encryptKey = writer->config->tsdb->pVnode->config.tsdbCfg.encryptKey;
817✔
1954

1955
  TAOS_CHECK_GOTO(tsdbWriteFile(writer->fd[ftype], 0, hdr, TSDB_FHDR_SIZE, encryptAlgorithm, encryptKey), &lino, _exit);
817!
1956
  writer->files[ftype].size += TSDB_FHDR_SIZE;
817✔
1957

1958
  if (writer->ctx->reader) {
817✔
1959
    TAOS_CHECK_GOTO(tsdbDataFileReadTombBlk(writer->ctx->reader, &writer->ctx->tombBlkArray), &lino, _exit);
803!
1960

1961
    if (TARRAY2_SIZE(writer->ctx->tombBlkArray) > 0) {
803✔
1962
      writer->ctx->hasOldTomb = true;
526✔
1963
    }
1964

1965
    writer->ctx->tombBlkArrayIdx = 0;
803✔
1966
    tTombBlockClear(writer->ctx->tombBlock);
803✔
1967
    writer->ctx->tombBlockIdx = 0;
803✔
1968
  }
1969

1970
_exit:
14✔
1971
  if (code) {
817!
1972
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1973
              tstrerror(code));
1974
  }
1975
  return code;
817✔
1976
}
1977

1978
int32_t tsdbDataFileWriteTombRecord(SDataFileWriter *writer, const STombRecord *record) {
2,206✔
1979
  int32_t code = 0;
2,206✔
1980
  int32_t lino = 0;
2,206✔
1981

1982
  if (!writer->ctx->opened) {
2,206✔
1983
    TAOS_CHECK_GOTO(tsdbDataFileWriterDoOpen(writer), &lino, _exit);
815!
1984
  }
1985

1986
  if (writer->fd[TSDB_FTYPE_TOMB] == NULL) {
2,206✔
1987
    TAOS_CHECK_GOTO(tsdbDataFileWriterOpenTombFD(writer), &lino, _exit);
817!
1988
  }
1989

1990
  TAOS_CHECK_GOTO(tsdbDataFileDoWriteTombRecord(writer, record), &lino, _exit);
2,206!
1991

1992
_exit:
2,206✔
1993
  if (code) {
2,206!
1994
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1995
              tstrerror(code));
1996
  }
1997
  return code;
2,206✔
1998
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc