• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5079

17 May 2026 01:15AM UTC coverage: 73.395% (-0.05%) from 73.443%
#5079

push

travis-ci

web-flow
feat (TDgpt): Dynamic Model Synchronization Enhancements (#35344)

* refactor: do some internal refactor.

* fix: fix multiprocess sync issue.

* feat: add dynamic anomaly detection and forecasting services

* fix: log error message for undeploying model in exception handling

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* fix: handle undeploy when model exists only on disk

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/286aafa0-c3ce-4c27-b803-2707571e9dc1

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: guard dynamic registry concurrent access

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: tighten service list locking scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: restore prophet support and update tests per review feedback

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: improve test name and move copy inside lock scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* Potential fix for pull request finding

Co-au... (continued)

281685 of 383795 relevant lines covered (73.39%)

137851703.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.55
/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tsdbDataFileRW.h"
17
#include "meta.h"
18

19
// SDataFileReader =============================================
20
struct SDataFileReader {
21
  SDataFileReaderConfig config[1];
22

23
  SBuffer  local[10];
24
  SBuffer *buffers;
25

26
  struct {
27
    bool headFooterLoaded;
28
    bool tombFooterLoaded;
29
    bool brinBlkLoaded;
30
    bool tombBlkLoaded;
31
  } ctx[1];
32

33
  STsdbFD *fd[TSDB_FTYPE_MAX];
34

35
  SHeadFooter   headFooter[1];
36
  STombFooter   tombFooter[1];
37
  TBrinBlkArray brinBlkArray[1];
38
  TTombBlkArray tombBlkArray[1];
39
};
40

41
static int32_t tsdbDataFileReadHeadFooter(SDataFileReader *reader) {
29,341,308✔
42
  if (reader->ctx->headFooterLoaded) {
29,341,308✔
43
    return 0;
×
44
  }
45

46
  int32_t code = 0;
29,350,050✔
47
  int32_t lino = 0;
29,350,050✔
48

49
  int32_t ftype = TSDB_FTYPE_HEAD;
29,342,732✔
50
  if (reader->fd[ftype]) {
29,342,732✔
51
    SEncryptData *pEncryptData = &(reader->config->tsdb->pVnode->config.tsdbCfg.encryptData);
26,160,416✔
52

53
#if 1
54
    TAOS_CHECK_GOTO(tsdbReadFile(reader->fd[ftype], reader->config->files[ftype].file.size - sizeof(SHeadFooter),
26,165,852✔
55
                                 (uint8_t *)reader->headFooter, sizeof(SHeadFooter), 0, pEncryptData),
56
                    &lino, _exit);
57
#else
58
    int64_t size = reader->config->files[ftype].file.size;
59
    for (; size > TSDB_FHDR_SIZE; size--) {
60
      code = tsdbReadFile(reader->fd[ftype], size - sizeof(SHeadFooter), (uint8_t *)reader->headFooter,
61
                          sizeof(SHeadFooter), 0, encryptAlgorithm, encryptKey);
62
      if (code) continue;
63
      if (reader->headFooter->brinBlkPtr->offset + reader->headFooter->brinBlkPtr->size + sizeof(SHeadFooter) == size) {
64
        break;
65
      }
66
    }
67
    if (size <= TSDB_FHDR_SIZE) {
68
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
69
    }
70
#endif
71
  }
72

73
  reader->ctx->headFooterLoaded = true;
29,351,604✔
74

75
_exit:
29,357,894✔
76
  if (code) {
29,358,632✔
77
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, lino,
×
78
              tstrerror(code));
79
  }
80
  return code;
29,339,434✔
81
}
82

83
static int32_t tsdbDataFileReadTombFooter(SDataFileReader *reader) {
3,468,193✔
84
  if (reader->ctx->tombFooterLoaded) {
3,468,193✔
85
    return 0;
×
86
  }
87

88
  int32_t code = 0;
3,468,193✔
89
  int32_t lino = 0;
3,468,193✔
90

91
  int32_t ftype = TSDB_FTYPE_TOMB;
3,468,193✔
92
  if (reader->fd[ftype]) {
3,468,193✔
93
    SEncryptData *pEncryptData = &(reader->config->tsdb->pVnode->config.tsdbCfg.encryptData);
3,298,112✔
94

95
    TAOS_CHECK_GOTO(tsdbReadFile(reader->fd[ftype], reader->config->files[ftype].file.size - sizeof(STombFooter),
3,298,112✔
96
                                 (uint8_t *)reader->tombFooter, sizeof(STombFooter), 0, pEncryptData),
97
                    &lino, _exit);
98
  }
99
  reader->ctx->tombFooterLoaded = true;
3,468,193✔
100

101
_exit:
3,468,912✔
102
  if (code) {
3,468,912✔
103
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, lino,
×
104
              tstrerror(code));
105
  }
106
  return code;
3,468,193✔
107
}
108

109
int32_t tsdbDataFileReaderOpen(const char *fname[], const SDataFileReaderConfig *config, SDataFileReader **reader) {
29,631,758✔
110
  int32_t code = 0;
29,631,758✔
111
  int32_t lino = 0;
29,631,758✔
112

113
  if ((*reader = taosMemoryCalloc(1, sizeof(**reader))) == NULL) {
29,629,011✔
114
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
115
  }
116

117
  for (int32_t i = 0; i < ARRAY_SIZE(reader[0]->local); i++) {
325,684,747✔
118
    tBufferInit(reader[0]->local + i);
296,032,035✔
119
  }
120

121
  reader[0]->config[0] = config[0];
29,652,712✔
122
  reader[0]->buffers = config->buffers;
29,651,003✔
123
  if (reader[0]->buffers == NULL) {
29,637,256✔
124
    reader[0]->buffers = reader[0]->local;
29,207,240✔
125
  }
126

127
  if (fname) {
29,623,962✔
128
    for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) {
145,387,102✔
129
      if (fname[i]) {
116,305,064✔
130
        int32_t lcn = config->files[i].file.lcn;
81,100,833✔
131
        TAOS_CHECK_GOTO(tsdbOpenFile(fname[i], config->tsdb, TD_FILE_READ, &reader[0]->fd[i], lcn), &lino, _exit);
81,107,194✔
132
      }
133
    }
134
  } else {
135
    for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) {
2,825,233✔
136
      if (config->files[i].exist) {
2,260,643✔
137
        char fname1[TSDB_FILENAME_LEN];
1,145,165✔
138
        tsdbTFileName(config->tsdb, &config->files[i].file, fname1);
1,145,165✔
139
        int32_t lcn = config->files[i].file.lcn;
1,145,165✔
140
        TAOS_CHECK_GOTO(tsdbOpenFile(fname1, config->tsdb, TD_FILE_READ, &reader[0]->fd[i], lcn), &lino, _exit);
1,145,165✔
141
      }
142
    }
143
  }
144

145
_exit:
29,647,389✔
146
  if (code) {
29,652,669✔
147
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(config->tsdb->pVnode), __func__, __FILE__, lino,
×
148
              tstrerror(code));
149
  }
150
  return code;
29,652,669✔
151
}
152

153
void tsdbDataFileReaderClose(SDataFileReader **reader) {
64,276,882✔
154
  if (reader[0] == NULL) {
64,276,882✔
155
    return;
34,691,857✔
156
  }
157

158
  TARRAY2_DESTROY(reader[0]->tombBlkArray, NULL);
29,624,612✔
159
  TARRAY2_DESTROY(reader[0]->brinBlkArray, NULL);
29,651,710✔
160

161
  for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) {
148,229,776✔
162
    if (reader[0]->fd[i]) {
118,575,358✔
163
      tsdbCloseFile(&reader[0]->fd[i]);
82,262,194✔
164
    }
165
  }
166

167
  for (int32_t i = 0; i < ARRAY_SIZE(reader[0]->local); ++i) {
325,934,709✔
168
    tBufferDestroy(reader[0]->local + i);
296,298,746✔
169
  }
170

171
  taosMemoryFree(reader[0]);
29,635,963✔
172
  reader[0] = NULL;
29,636,398✔
173
}
174

175
int32_t tsdbDataFileReadBrinBlk(SDataFileReader *reader, const TBrinBlkArray **brinBlkArray) {
29,362,861✔
176
  int32_t code = 0;
29,362,861✔
177
  int32_t lino = 0;
29,362,861✔
178
  void   *data = NULL;
29,368,907✔
179

180
  if (!reader->ctx->brinBlkLoaded) {
29,368,907✔
181
    TAOS_CHECK_GOTO(tsdbDataFileReadHeadFooter(reader), &lino, _exit);
29,361,057✔
182

183
    if (reader->headFooter->brinBlkPtr->size > 0) {
29,351,042✔
184
      data = taosMemoryMalloc(reader->headFooter->brinBlkPtr->size);
26,164,340✔
185
      if (data == NULL) {
26,159,619✔
186
        TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
187
      }
188

189
      SEncryptData *pEncryptData = &(reader->config->tsdb->pVnode->config.tsdbCfg.encryptData);
26,159,619✔
190

191
      TAOS_CHECK_GOTO(tsdbReadFile(reader->fd[TSDB_FTYPE_HEAD], reader->headFooter->brinBlkPtr->offset, data,
26,175,209✔
192
                                   reader->headFooter->brinBlkPtr->size, 0, pEncryptData),
193
                      &lino, _exit);
194

195
      int32_t size = reader->headFooter->brinBlkPtr->size / sizeof(SBrinBlk);
26,183,758✔
196
      TARRAY2_INIT_EX(reader->brinBlkArray, size, size, data);
26,180,796✔
197
    } else {
198
      TARRAY2_INIT(reader->brinBlkArray);
3,184,974✔
199
    }
200

201
    reader->ctx->brinBlkLoaded = true;
29,345,123✔
202
  }
203
  brinBlkArray[0] = reader->brinBlkArray;
29,357,473✔
204

205
_exit:
29,350,947✔
206
  if (code) {
29,355,056✔
207
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, lino,
×
208
              tstrerror(code));
209
    taosMemoryFree(data);
×
210
  }
211
  return code;
29,355,056✔
212
}
213

214
int32_t tsdbDataFileReadBrinBlock(SDataFileReader *reader, const SBrinBlk *brinBlk, SBrinBlock *brinBlock) {
24,368,590✔
215
  int32_t code = 0;
24,368,590✔
216
  int32_t lino = 0;
24,368,590✔
217

218
  if (brinBlk->numOfPKs > TD_MAX_PK_COLS) {
24,367,148✔
219
    tsdbError("vgId:%d %s failed at %s:%d since brin block numOfPKs %d exceeds max %d (file corrupted)",
×
220
              TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, __LINE__, (int)brinBlk->numOfPKs,
221
              (int)TD_MAX_PK_COLS);
222
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
223
  }
224

225
  SBuffer *buffer = reader->buffers + 0;
24,353,185✔
226
  SBuffer *assist = reader->buffers + 1;
24,343,213✔
227

228
  SEncryptData *pEncryptData = &(reader->config->tsdb->pVnode->config.tsdbCfg.encryptData);
24,381,011✔
229

230
  // load data
231
  tBufferClear(buffer);
232
  TAOS_CHECK_GOTO(tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_HEAD], brinBlk->dp->offset, brinBlk->dp->size, buffer, 0,
24,351,064✔
233
                                       pEncryptData),
234
                  &lino, _exit);
235

236
  // decode brin block
237
  SBufferReader br = BUFFER_READER_INITIALIZER(0, buffer);
24,377,993✔
238
  tBrinBlockClear(brinBlock);
24,375,245✔
239
  brinBlock->numOfPKs = brinBlk->numOfPKs;
24,391,496✔
240
  brinBlock->numOfRecords = brinBlk->numRec;
24,388,227✔
241
  for (int32_t i = 0; i < 10; i++) {  // int64_t
268,246,360✔
242

243
    SCompressInfo cinfo = {
243,848,525✔
244
        .cmprAlg = brinBlk->cmprAlg,
243,834,626✔
245
        .dataType = TSDB_DATA_TYPE_BIGINT,
246
        .compressedSize = brinBlk->size[i],
243,867,602✔
247
        .originalSize = brinBlk->numRec * sizeof(int64_t),
243,858,126✔
248
    };
249
    TAOS_CHECK_GOTO(tDecompressDataToBuffer(BR_PTR(&br), &cinfo, brinBlock->buffers + i, assist), &lino, _exit);
243,890,279✔
250
    br.offset += brinBlk->size[i];
243,841,525✔
251
  }
252

253
  for (int32_t i = 10; i < 15; i++) {  // int32_t
146,368,332✔
254
    SCompressInfo cinfo = {
121,960,484✔
255
        .cmprAlg = brinBlk->cmprAlg,
121,963,194✔
256
        .dataType = TSDB_DATA_TYPE_INT,
257
        .compressedSize = brinBlk->size[i],
121,964,797✔
258
        .originalSize = brinBlk->numRec * sizeof(int32_t),
121,963,038✔
259
    };
260
    TAOS_CHECK_GOTO(tDecompressDataToBuffer(BR_PTR(&br), &cinfo, brinBlock->buffers + i, assist), &lino, _exit);
121,968,008✔
261
    br.offset += brinBlk->size[i];
121,970,629✔
262
  }
263

264
  // primary keys
265
  if (brinBlk->numOfPKs > 0) {  // decode the primary keys
24,407,848✔
266
    SValueColumnCompressInfo firstInfos[TD_MAX_PK_COLS];
4,949✔
267
    SValueColumnCompressInfo lastInfos[TD_MAX_PK_COLS];
4,949✔
268

269
    for (int32_t i = 0; i < brinBlk->numOfPKs; i++) {
9,898✔
270
      TAOS_CHECK_GOTO(tValueColumnCompressInfoDecode(&br, firstInfos + i), &lino, _exit);
4,949✔
271
    }
272
    for (int32_t i = 0; i < brinBlk->numOfPKs; i++) {
9,898✔
273
      TAOS_CHECK_GOTO(tValueColumnCompressInfoDecode(&br, lastInfos + i), &lino, _exit);
4,949✔
274
    }
275

276
    for (int32_t i = 0; i < brinBlk->numOfPKs; i++) {
9,898✔
277
      SValueColumnCompressInfo *info = firstInfos + i;
4,949✔
278

279
      TAOS_CHECK_GOTO(tValueColumnDecompress(BR_PTR(&br), info, brinBlock->firstKeyPKs + i, assist), &lino, _exit);
4,949✔
280
      br.offset += (info->offsetCompressedSize + info->dataCompressedSize);
4,949✔
281
    }
282

283
    for (int32_t i = 0; i < brinBlk->numOfPKs; i++) {
9,898✔
284
      SValueColumnCompressInfo *info = lastInfos + i;
4,949✔
285

286
      TAOS_CHECK_GOTO(tValueColumnDecompress(BR_PTR(&br), info, brinBlock->lastKeyPKs + i, assist), &lino, _exit);
4,949✔
287
      br.offset += (info->offsetCompressedSize + info->dataCompressedSize);
4,949✔
288
    }
289
  }
290

291
  if (br.offset != br.buffer->size) {
24,398,512✔
292
    tsdbError("vgId:%d %s failed at %s:%d since brin block size mismatch, expected: %u, actual: %u, fname:%s",
×
293
              TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, lino, br.buffer->size, br.offset,
294
              reader->fd[TSDB_FTYPE_HEAD]->path);
295
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
296
  }
297

298
_exit:
24,381,428✔
299
  if (code) {
24,393,801✔
300
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, lino,
×
301
              tstrerror(code));
302
  }
303
  return code;
24,393,801✔
304
}
305

306
extern int32_t tBlockDataDecompress(SBufferReader *br, SBlockData *blockData, SBuffer *assist);
307

308
int32_t tsdbDataFileReadBlockData(SDataFileReader *reader, const SBrinRecord *record, SBlockData *bData) {
2,855,210✔
309
  int32_t code = 0;
2,855,210✔
310
  int32_t lino = 0;
2,855,210✔
311
  int32_t fid = reader->config->files[TSDB_FTYPE_DATA].file.fid;
2,855,210✔
312

313
  SBuffer *buffer = reader->buffers + 0;
2,855,210✔
314
  SBuffer *assist = reader->buffers + 1;
2,855,210✔
315

316
  SEncryptData *pEncryptData = &(reader->config->tsdb->pVnode->config.tsdbCfg.encryptData);
2,855,210✔
317

318
  // load data
319
  tBufferClear(buffer);
320
  TAOS_CHECK_GOTO(tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_DATA], record->blockOffset, record->blockSize, buffer, 0,
2,855,210✔
321
                                       pEncryptData),
322
                  &lino, _exit);
323

324
  // decompress
325
  SBufferReader br = BUFFER_READER_INITIALIZER(0, buffer);
2,855,210✔
326
  TAOS_CHECK_GOTO(tBlockDataDecompress(&br, bData, assist), &lino, _exit);
2,855,210✔
327

328
  if (br.offset != buffer->size) {
2,855,210✔
329
    tsdbError("vgId:%d %s failed at %s:%d since block data size mismatch, expected: %u, actual: %u, fname:%s",
×
330
              TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, __LINE__, buffer->size, br.offset,
331
              reader->fd[TSDB_FTYPE_DATA]->path);
332
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
333
  }
334

335
_exit:
2,855,210✔
336
  if (code) {
2,855,210✔
337
    tsdbError("vgId:%d %s fid %d failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, fid,
×
338
              __FILE__, lino, tstrerror(code));
339
  }
340
  return code;
2,855,210✔
341
}
342

343
int32_t tsdbDataFileReadBlockDataByColumn(SDataFileReader *reader, const SBrinRecord *record, SBlockData *bData,
96,615,540✔
344
                                          STSchema *pTSchema, int16_t cids[], int32_t ncid) {
345
  int32_t code = 0;
96,615,540✔
346
  int32_t lino = 0;
96,615,540✔
347
  int32_t fid = reader->config->files[TSDB_FTYPE_DATA].file.fid;
96,625,175✔
348

349
  SDiskDataHdr hdr;
96,633,963✔
350
  SBuffer     *buffer0 = reader->buffers + 0;
96,642,357✔
351
  SBuffer     *buffer1 = reader->buffers + 1;
96,641,007✔
352
  SBuffer     *assist = reader->buffers + 2;
96,641,222✔
353

354
  SEncryptData *pEncryptData = &(reader->config->tsdb->pVnode->config.tsdbCfg.encryptData);
96,636,362✔
355

356
  // load key part
357
  tBufferClear(buffer0);
358
  TAOS_CHECK_GOTO(tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_DATA], record->blockOffset, record->blockKeySize, buffer0,
96,621,723✔
359
                                       0, pEncryptData),
360
                  &lino, _exit);
361

362
  // SDiskDataHdr
363
  SBufferReader br = BUFFER_READER_INITIALIZER(0, buffer0);
96,626,725✔
364
  TAOS_CHECK_GOTO(tGetDiskDataHdr(&br, &hdr), &lino, _exit);
96,632,274✔
365

366
  if (hdr.delimiter != TSDB_FILE_DLMT) {
96,593,178✔
367
    tsdbError("vgId:%d %s failed at %s:%d since disk data header delimiter is invalid, fname:%s",
×
368
              TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, __LINE__, reader->fd[TSDB_FTYPE_DATA]->path);
369
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
370
  }
371

372
  tBlockDataReset(bData);
96,593,178✔
373
  bData->suid = hdr.suid;
96,608,526✔
374
  bData->uid = hdr.uid;
96,638,280✔
375
  bData->nRow = hdr.nRow;
96,626,828✔
376

377
  // Key part
378
  TAOS_CHECK_GOTO(tBlockDataDecompressKeyPart(&hdr, &br, bData, assist), &lino, _exit);
96,630,964✔
379
  if (br.offset != buffer0->size) {
96,626,709✔
380
    tsdbError("vgId:%d %s failed at %s:%d since key part size mismatch, expected: %u, actual: %u, fname:%s",
10,683✔
381
              TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, __LINE__, buffer0->size, br.offset,
382
              reader->fd[TSDB_FTYPE_DATA]->path);
383
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
384
  }
385

386
  int extraColIdx = -1;
96,610,233✔
387
  for (int i = 0; i < ncid; i++) {
96,627,031✔
388
    if (tBlockDataGetColData(bData, cids[i]) == NULL) {
94,175,242✔
389
      extraColIdx = i;
94,157,457✔
390
      break;
94,157,457✔
391
    }
392
  }
393

394
  if (extraColIdx < 0) {
96,609,246✔
395
    goto _exit;
2,446,479✔
396
  }
397

398
  // load SBlockCol part
399
  tBufferClear(buffer0);
400
  TAOS_CHECK_GOTO(tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_DATA], record->blockOffset + record->blockKeySize,
94,157,993✔
401
                                       hdr.szBlkCol, buffer0, 0, pEncryptData),
402
                  &lino, _exit);
403

404
  // calc szHint
405
  int64_t szHint = 0;
94,185,489✔
406
  int     extraCols = 1;
94,185,489✔
407
  for (int i = extraColIdx + 1; i < ncid; ++i) {
94,185,489✔
408
    if (tBlockDataGetColData(bData, cids[i]) == NULL) {
50,502,176✔
409
      ++extraCols;
50,506,502✔
410
      break;
50,506,502✔
411
    }
412
  }
413

414
  if (extraCols >= 2) {
94,189,815✔
415
    br = BUFFER_READER_INITIALIZER(0, buffer0);
50,508,553✔
416

417
    SBlockCol blockCol = {.cid = 0};
50,508,553✔
418
    for (int32_t i = extraColIdx; i < ncid; ++i) {
50,509,917✔
419
      int16_t extraColCid = cids[i];
50,494,576✔
420

421
      while (extraColCid > blockCol.cid) {
167,947,078✔
422
        if (br.offset >= buffer0->size) {
117,426,815✔
423
          blockCol.cid = INT16_MAX;
×
424
          break;
×
425
        }
426

427
        TAOS_CHECK_GOTO(tGetBlockCol(&br, &blockCol, hdr.fmtVer, hdr.cmprAlg), &lino, _exit);
117,443,885✔
428
      }
429

430
      if (extraColCid == blockCol.cid || blockCol.cid == INT16_MAX) {
50,520,263✔
431
        extraColIdx = i;
50,520,263✔
432
        break;
50,520,263✔
433
      }
434
    }
435

436
    if (blockCol.cid > 0 && blockCol.cid < INT16_MAX /*&& blockCol->flag == HAS_VALUE*/) {
50,535,604✔
437
      int64_t   offset = blockCol.offset;
50,521,871✔
438
      SBlockCol lastNonNoneBlockCol = {.cid = 0};
50,521,871✔
439

440
      for (int32_t i = extraColIdx; i < ncid; ++i) {
274,372,168✔
441
        int16_t extraColCid = cids[i];
223,840,742✔
442

443
        while (extraColCid > blockCol.cid) {
613,641,423✔
444
          if (br.offset >= buffer0->size) {
389,791,126✔
445
            blockCol.cid = INT16_MAX;
×
446
            break;
×
447
          }
448

449
          TAOS_CHECK_GOTO(tGetBlockCol(&br, &blockCol, hdr.fmtVer, hdr.cmprAlg), &lino, _exit);
389,795,422✔
450
        }
451

452
        if (extraColCid == blockCol.cid) {
223,850,297✔
453
          lastNonNoneBlockCol = blockCol;
223,850,297✔
454
          continue;
223,850,297✔
455
        }
456

457
        if (blockCol.cid == INT16_MAX) {
×
458
          break;
×
459
        }
460
      }
461

462
      if (lastNonNoneBlockCol.cid > 0) {
50,531,426✔
463
        szHint = lastNonNoneBlockCol.offset + lastNonNoneBlockCol.szBitmap + lastNonNoneBlockCol.szOffset +
50,523,620✔
464
                 lastNonNoneBlockCol.szValue - offset;
50,523,620✔
465
      }
466
    }
467
  }
468

469
  // load each column
470
  SBlockCol blockCol = {
94,202,305✔
471
      .cid = 0,
472
  };
473
  bool firstRead = true;
94,186,537✔
474
  br = BUFFER_READER_INITIALIZER(0, buffer0);
94,186,537✔
475
  for (int32_t i = 0; i < ncid; i++) {
361,702,917✔
476
    int16_t cid = cids[i];
267,497,700✔
477

478
    if (tBlockDataGetColData(bData, cid)) {  // already loaded
267,502,134✔
479
      continue;
1,556✔
480
    }
481

482
    while (cid > blockCol.cid) {
946,382,835✔
483
      if (br.offset >= buffer0->size) {
678,870,737✔
484
        blockCol.cid = INT16_MAX;
×
485
        break;
×
486
      }
487

488
      TAOS_CHECK_GOTO(tGetBlockCol(&br, &blockCol, hdr.fmtVer, hdr.cmprAlg), &lino, _exit);
678,882,004✔
489
    }
490

491
    if (cid < blockCol.cid) {
267,512,098✔
492
      const STColumn *tcol = tTSchemaSearchColumn(pTSchema, cid);
×
493
      TSDB_CHECK_NULL(tcol, code, lino, _exit, TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER);
×
494
      SBlockCol none = {
×
495
          .cid = cid,
496
          .type = tcol->type,
×
497
          .cflag = tcol->flags,
×
498
          .flag = HAS_NONE,
499
          .szOrigin = 0,
500
          .szBitmap = 0,
501
          .szOffset = 0,
502
          .szValue = 0,
503
          .offset = 0,
504
      };
505
      TAOS_CHECK_GOTO(tBlockDataDecompressColData(&hdr, &none, &br, bData, assist), &lino, _exit);
×
506
    } else if (cid == blockCol.cid) {
267,512,098✔
507
      SEncryptData *pEncryptData = &(reader->config->tsdb->pVnode->config.tsdbCfg.encryptData);
267,512,406✔
508

509
      // load from file
510
      tBufferClear(buffer1);
511
      TAOS_CHECK_GOTO(tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_DATA],
267,546,024✔
512
                                           record->blockOffset + record->blockKeySize + hdr.szBlkCol + blockCol.offset,
513
                                           blockCol.szBitmap + blockCol.szOffset + blockCol.szValue, buffer1,
514
                                           firstRead ? szHint : 0, pEncryptData),
515
                      &lino, _exit);
516

517
      firstRead = false;
267,524,812✔
518

519
      // decode the buffer
520
      SBufferReader br1 = BUFFER_READER_INITIALIZER(0, buffer1);
267,524,812✔
521
      TAOS_CHECK_GOTO(tBlockDataDecompressColData(&hdr, &blockCol, &br1, bData, assist), &lino, _exit);
267,529,299✔
522
    }
523
  }
524

525
_exit:
96,638,901✔
526
  if (code) {
96,646,449✔
527
    tsdbError("vgId:%d %s fid:%d failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, fid,
×
528
              __FILE__, lino, tstrerror(code));
529
  }
530
  return code;
96,646,449✔
531
}
532

533
int32_t tsdbDataFileReadBlockSma(SDataFileReader *reader, const SBrinRecord *record,
36,484,393✔
534
                                 TColumnDataAggArray *columnDataAggArray) {
535
  int32_t  code = 0;
36,484,393✔
536
  int32_t  lino = 0;
36,484,393✔
537
  SBuffer *buffer = reader->buffers + 0;
36,520,958✔
538

539
  TARRAY2_CLEAR(columnDataAggArray, NULL);
36,582,179✔
540
  if (record->smaSize > 0) {
36,584,244✔
541
    tBufferClear(buffer);
542
    SEncryptData *pEncryptData = &(reader->config->tsdb->pVnode->config.tsdbCfg.encryptData);
36,585,602✔
543

544
    TAOS_CHECK_GOTO(
36,587,651✔
545
        tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_SMA], record->smaOffset, record->smaSize, buffer, 0, pEncryptData),
546
        &lino, _exit);
547

548
    // decode sma data
549
    SBufferReader br = BUFFER_READER_INITIALIZER(0, buffer);
36,513,018✔
550
    while (br.offset < record->smaSize) {
455,071,307✔
551
      SColumnDataAgg sma[1];
418,471,054✔
552

553
      TAOS_CHECK_GOTO(tGetColumnDataAgg(&br, sma), &lino, _exit);
418,472,261✔
554
      TAOS_CHECK_GOTO(TARRAY2_APPEND_PTR(columnDataAggArray, sma), &lino, _exit);
836,250,659✔
555
    }
556
    if (br.offset != record->smaSize) {
36,588,901✔
557
      tsdbError("vgId:%d %s failed at %s:%d since sma data size mismatch, expected: %u, actual: %u, fname:%s",
×
558
                TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, __LINE__, record->smaSize, br.offset,
559
                reader->fd[TSDB_FTYPE_SMA]->path);
560
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
561
    }
562
  }
563

564
_exit:
36,570,192✔
565
  if (code) {
36,588,909✔
566
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, lino,
×
567
              tstrerror(code));
568
  }
569
  return code;
36,588,909✔
570
}
571

572
int32_t tsdbDataFileReadTombBlk(SDataFileReader *reader, const TTombBlkArray **tombBlkArray) {
5,630,777✔
573
  int32_t code = 0;
5,630,777✔
574
  int32_t lino = 0;
5,630,777✔
575
  void   *data = NULL;
5,632,215✔
576

577
  if (!reader->ctx->tombBlkLoaded) {
5,632,215✔
578
    TAOS_CHECK_GOTO(tsdbDataFileReadTombFooter(reader), &lino, _exit);
3,468,912✔
579

580
    if (reader->tombFooter->tombBlkPtr->size > 0) {
3,468,193✔
581
      if ((data = taosMemoryMalloc(reader->tombFooter->tombBlkPtr->size)) == NULL) {
3,298,112✔
582
        TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
583
      }
584

585
      SEncryptData *pEncryptData = &(reader->config->tsdb->pVnode->config.tsdbCfg.encryptData);
3,298,112✔
586

587
      TAOS_CHECK_GOTO(tsdbReadFile(reader->fd[TSDB_FTYPE_TOMB], reader->tombFooter->tombBlkPtr->offset, data,
3,298,112✔
588
                                   reader->tombFooter->tombBlkPtr->size, 0, pEncryptData),
589
                      &lino, _exit);
590

591
      int32_t size = reader->tombFooter->tombBlkPtr->size / sizeof(STombBlk);
3,298,112✔
592
      TARRAY2_INIT_EX(reader->tombBlkArray, size, size, data);
3,298,112✔
593
    } else {
594
      TARRAY2_INIT(reader->tombBlkArray);
170,081✔
595
    }
596

597
    reader->ctx->tombBlkLoaded = true;
3,468,193✔
598
  }
599
  tombBlkArray[0] = reader->tombBlkArray;
5,631,496✔
600

601
_exit:
5,631,496✔
602
  if (code) {
5,631,496✔
603
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, lino,
×
604
              tstrerror(code));
605
    taosMemoryFree(data);
×
606
  }
607
  return code;
5,631,496✔
608
}
609

610
int32_t tsdbDataFileReadTombBlock(SDataFileReader *reader, const STombBlk *tombBlk, STombBlock *tData) {
4,088,055✔
611
  int32_t code = 0;
4,088,055✔
612
  int32_t lino = 0;
4,088,055✔
613

614
  SBuffer *buffer0 = reader->buffers + 0;
4,088,055✔
615
  SBuffer *assist = reader->buffers + 1;
4,088,055✔
616

617
  tBufferClear(buffer0);
618
  SEncryptData *pEncryptData = &(reader->config->tsdb->pVnode->config.tsdbCfg.encryptData);
4,088,055✔
619

620
  TAOS_CHECK_GOTO(tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_TOMB], tombBlk->dp->offset, tombBlk->dp->size, buffer0, 0,
4,088,055✔
621
                                       pEncryptData),
622
                  &lino, _exit);
623

624
  int32_t       size = 0;
4,088,055✔
625
  SBufferReader br = BUFFER_READER_INITIALIZER(0, buffer0);
4,088,055✔
626
  tTombBlockClear(tData);
4,088,055✔
627
  tData->numOfRecords = tombBlk->numRec;
4,088,055✔
628
  for (int32_t i = 0; i < ARRAY_SIZE(tData->buffers); ++i) {
24,528,330✔
629
    SCompressInfo cinfo = {
20,440,275✔
630
        .cmprAlg = tombBlk->cmprAlg,
20,440,275✔
631
        .dataType = TSDB_DATA_TYPE_BIGINT,
632
        .originalSize = tombBlk->numRec * sizeof(int64_t),
20,440,275✔
633
        .compressedSize = tombBlk->size[i],
20,440,275✔
634
    };
635
    TAOS_CHECK_GOTO(tDecompressDataToBuffer(BR_PTR(&br), &cinfo, tData->buffers + i, assist), &lino, _exit);
20,440,275✔
636
    br.offset += tombBlk->size[i];
20,440,275✔
637
  }
638

639
_exit:
4,088,055✔
640
  if (code) {
4,088,055✔
641
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, lino,
×
642
              tstrerror(code));
643
  }
644
  return code;
4,088,055✔
645
}
646

647
// SDataFileWriter =============================================
648
struct SDataFileWriter {
649
  SDataFileWriterConfig config[1];
650

651
  SSkmInfo skmTb[1];
652
  SSkmInfo skmRow[1];
653
  SBuffer  local[10];
654
  SBuffer *buffers;
655

656
  struct {
657
    bool             opened;
658
    SDataFileReader *reader;
659

660
    // for ts data
661
    TABLEID tbid[1];
662
    bool    tbHasOldData;
663

664
    const TBrinBlkArray *brinBlkArray;
665
    int32_t              brinBlkArrayIdx;
666
    SBrinBlock           brinBlock[1];
667
    int32_t              brinBlockIdx;
668
    SBlockData           blockData[1];
669
    int32_t              blockDataIdx;
670
    // for tomb data
671
    bool                 hasOldTomb;
672
    const TTombBlkArray *tombBlkArray;
673
    int32_t              tombBlkArrayIdx;
674
    STombBlock           tombBlock[1];
675
    int32_t              tombBlockIdx;
676
    // range
677
    SVersionRange range;
678
    SVersionRange tombRange;
679
  } ctx[1];
680

681
  STFile   files[TSDB_FTYPE_MAX];
682
  STsdbFD *fd[TSDB_FTYPE_MAX];
683

684
  SHeadFooter headFooter[1];
685
  STombFooter tombFooter[1];
686

687
  TBrinBlkArray brinBlkArray[1];
688
  SBrinBlock    brinBlock[1];
689
  SBlockData    blockData[1];
690

691
  TTombBlkArray tombBlkArray[1];
692
  STombBlock    tombBlock[1];
693
};
694

695
static int32_t tsdbDataFileWriterCloseAbort(SDataFileWriter *writer) {
×
696
  tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, __LINE__,
×
697
            "not implemented");
698
  return 0;
×
699
}
700

701
static void tsdbDataFileWriterDoClose(SDataFileWriter *writer) {
894,958✔
702
  if (writer->ctx->reader) {
894,958✔
703
    tsdbDataFileReaderClose(&writer->ctx->reader);
431,351✔
704
  }
705

706
  tTombBlockDestroy(writer->tombBlock);
894,958✔
707
  TARRAY2_DESTROY(writer->tombBlkArray, NULL);
894,422✔
708
  tBlockDataDestroy(writer->blockData);
894,958✔
709
  tBrinBlockDestroy(writer->brinBlock);
894,958✔
710
  TARRAY2_DESTROY(writer->brinBlkArray, NULL);
894,958✔
711

712
  tTombBlockDestroy(writer->ctx->tombBlock);
894,958✔
713
  tBlockDataDestroy(writer->ctx->blockData);
894,422✔
714
  tBrinBlockDestroy(writer->ctx->brinBlock);
894,422✔
715

716
  for (int32_t i = 0; i < ARRAY_SIZE(writer->local); ++i) {
9,839,178✔
717
    tBufferDestroy(writer->local + i);
8,944,220✔
718
  }
719

720
  tDestroyTSchema(writer->skmRow->pTSchema);
894,958✔
721
  tDestroyTSchema(writer->skmTb->pTSchema);
894,958✔
722
}
894,958✔
723

724
static int32_t tsdbDataFileWriterDoOpenReader(SDataFileWriter *writer) {
894,958✔
725
  int32_t code = 0;
894,958✔
726
  int32_t lino = 0;
894,958✔
727

728
  for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) {
3,092,547✔
729
    if (writer->config->files[i].exist) {
2,628,940✔
730
      SDataFileReaderConfig config[1] = {{
431,351✔
731
          .tsdb = writer->config->tsdb,
431,351✔
732
          .szPage = writer->config->szPage,
431,351✔
733
          .buffers = writer->buffers,
431,351✔
734
      }};
735

736
      for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) {
2,156,755✔
737
        config->files[i].exist = writer->config->files[i].exist;
1,725,404✔
738
        if (config->files[i].exist) {
1,725,404✔
739
          config->files[i].file = writer->config->files[i].file;
1,066,351✔
740
        }
741
      }
742

743
      TAOS_CHECK_GOTO(tsdbDataFileReaderOpen(NULL, config, &writer->ctx->reader), &lino, _exit);
431,351✔
744
      break;
431,351✔
745
    }
746
  }
747

748
_exit:
894,958✔
749
  if (code) {
894,958✔
750
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
751
              tstrerror(code));
752
  }
753
  return code;
894,958✔
754
}
755

756
static int32_t tsdbDataFileWriterDoOpen(SDataFileWriter *writer) {
894,958✔
757
  int32_t code = 0;
894,958✔
758
  int32_t lino = 0;
894,958✔
759
  int32_t ftype;
760
  SDiskID diskId = {0};
894,958✔
761

762
  if (!writer->config->skmTb) writer->config->skmTb = writer->skmTb;
894,958✔
763
  if (!writer->config->skmRow) writer->config->skmRow = writer->skmRow;
894,958✔
764
  writer->buffers = writer->config->buffers;
894,958✔
765
  if (writer->buffers == NULL) {
894,958✔
766
    writer->buffers = writer->local;
×
767
  }
768

769
  // open reader
770
  TAOS_CHECK_GOTO(tsdbDataFileWriterDoOpenReader(writer), &lino, _exit);
894,958✔
771

772
  // .head
773
  ftype = TSDB_FTYPE_HEAD;
894,958✔
774
  code = tsdbAllocateDisk(writer->config->tsdb, tsdbFTypeLabel(ftype), writer->config->expLevel, &diskId);
894,958✔
775
  TSDB_CHECK_CODE(code, lino, _exit);
894,958✔
776
  writer->files[ftype] = (STFile){
1,789,916✔
777
      .type = ftype,
778
      .did = diskId,
779
      .fid = writer->config->fid,
894,958✔
780
      .cid = writer->config->cid,
894,958✔
781
      .size = 0,
782
      .minVer = VERSION_MAX,
783
      .maxVer = VERSION_MIN,
784
  };
785

786
  // .data
787
  ftype = TSDB_FTYPE_DATA;
894,422✔
788
  if (writer->config->files[ftype].exist) {
894,422✔
789
    writer->files[ftype] = writer->config->files[ftype].file;
316,964✔
790
  } else {
791
    code = tsdbAllocateDisk(writer->config->tsdb, tsdbFTypeLabel(ftype), writer->config->expLevel, &diskId);
577,458✔
792
    TSDB_CHECK_CODE(code, lino, _exit);
577,994✔
793
    writer->files[ftype] = (STFile){
1,155,988✔
794
        .type = ftype,
795
        .did = diskId,
796
        .fid = writer->config->fid,
577,994✔
797
        .cid = writer->config->cid,
577,994✔
798
        .size = 0,
799
        .lcn = writer->config->lcn == 0 ? -1 : 0,
577,994✔
800
        .minVer = VERSION_MAX,
801
        .maxVer = VERSION_MIN,
802
    };
803
  }
804

805
  // .sma
806
  ftype = TSDB_FTYPE_SMA;
894,958✔
807
  if (writer->config->files[ftype].exist) {
894,958✔
808
    writer->files[ftype] = writer->config->files[ftype].file;
316,964✔
809
  } else {
810
    code = tsdbAllocateDisk(writer->config->tsdb, tsdbFTypeLabel(ftype), writer->config->expLevel, &diskId);
577,994✔
811
    TSDB_CHECK_CODE(code, lino, _exit);
577,994✔
812
    writer->files[ftype] = (STFile){
1,155,988✔
813
        .type = ftype,
814
        .did = diskId,
815
        .fid = writer->config->fid,
577,994✔
816
        .cid = writer->config->cid,
577,994✔
817
        .size = 0,
818
        .minVer = VERSION_MAX,
819
        .maxVer = VERSION_MIN,
820
    };
821
  }
822

823
  // .tomb
824
  ftype = TSDB_FTYPE_TOMB;
894,958✔
825
  code = tsdbAllocateDisk(writer->config->tsdb, tsdbFTypeLabel(ftype), writer->config->expLevel, &diskId);
894,958✔
826
  TSDB_CHECK_CODE(code, lino, _exit);
894,958✔
827
  writer->files[ftype] = (STFile){
1,789,916✔
828
      .type = ftype,
829
      .did = diskId,
830
      .fid = writer->config->fid,
894,958✔
831
      .cid = writer->config->cid,
894,958✔
832
      .size = 0,
833
      .minVer = VERSION_MAX,
834
      .maxVer = VERSION_MIN,
835
  };
836

837
  // range
838
  writer->ctx->range = (SVersionRange){.minVer = VERSION_MAX, .maxVer = VERSION_MIN};
894,958✔
839
  writer->ctx->tombRange = (SVersionRange){.minVer = VERSION_MAX, .maxVer = VERSION_MIN};
894,958✔
840

841
  writer->ctx->opened = true;
894,958✔
842

843
_exit:
894,958✔
844
  if (code) {
894,958✔
845
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
846
              tstrerror(code));
847
  }
848
  return code;
894,958✔
849
}
850

851
void tsdbWriterUpdVerRange(SVersionRange *range, int64_t minVer, int64_t maxVer) {
95,671,470✔
852
  range->minVer = TMIN(range->minVer, minVer);
95,671,470✔
853
  range->maxVer = TMAX(range->maxVer, maxVer);
95,701,900✔
854
}
95,717,449✔
855

856
int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, uint32_t cmprAlg, int64_t *fileSize,
872,092✔
857
                               TBrinBlkArray *brinBlkArray, SBuffer *buffers, SVersionRange *range,
858
                               SEncryptData *encryptData) {
859
  if (brinBlock->numOfRecords == 0) {
872,092✔
860
    return 0;
×
861
  }
862

863
  int32_t  code;
864
  SBuffer *buffer0 = buffers + 0;
872,092✔
865
  SBuffer *buffer1 = buffers + 1;
872,092✔
866
  SBuffer *assist = buffers + 2;
872,092✔
867

868
  SBrinBlk brinBlk = {
872,092✔
869
      .dp[0] =
870
          {
871
              .offset = *fileSize,
872,092✔
872
              .size = 0,
873
          },
874
      .numRec = brinBlock->numOfRecords,
872,092✔
875
      .numOfPKs = brinBlock->numOfPKs,
872,092✔
876
      .cmprAlg = cmprAlg,
877
  };
878
  for (int i = 0; i < brinBlock->numOfRecords; i++) {
60,615,744✔
879
    SBrinRecord record;
59,743,652✔
880

881
    TAOS_CHECK_RETURN(tBrinBlockGet(brinBlock, i, &record));
59,743,652✔
882
    if (i == 0) {
59,743,652✔
883
      brinBlk.minTbid.suid = record.suid;
872,092✔
884
      brinBlk.minTbid.uid = record.uid;
872,092✔
885
      brinBlk.minVer = record.minVer;
872,092✔
886
      brinBlk.maxVer = record.maxVer;
872,092✔
887
    }
888
    if (i == brinBlock->numOfRecords - 1) {
59,743,652✔
889
      brinBlk.maxTbid.suid = record.suid;
872,092✔
890
      brinBlk.maxTbid.uid = record.uid;
872,092✔
891
    }
892
    if (record.minVer < brinBlk.minVer) {
59,743,652✔
893
      brinBlk.minVer = record.minVer;
581,729✔
894
    }
895
    if (record.maxVer > brinBlk.maxVer) {
59,743,652✔
896
      brinBlk.maxVer = record.maxVer;
21,357,204✔
897
    }
898
  }
899

900
  tsdbWriterUpdVerRange(range, brinBlk.minVer, brinBlk.maxVer);
872,092✔
901

902
  // write to file
903
  for (int32_t i = 0; i < 10; ++i) {
9,593,012✔
904
    SCompressInfo info = {
8,720,920✔
905
        .cmprAlg = cmprAlg,
906
        .dataType = TSDB_DATA_TYPE_BIGINT,
907
        .originalSize = brinBlock->buffers[i].size,
8,720,920✔
908
    };
909

910
    tBufferClear(buffer0);
911
    TAOS_CHECK_RETURN(tCompressDataToBuffer(brinBlock->buffers[i].data, &info, buffer0, assist));
8,720,920✔
912
    TAOS_CHECK_RETURN(tsdbWriteFile(fd, *fileSize, buffer0->data, buffer0->size, encryptData));
8,720,384✔
913
    brinBlk.size[i] = info.compressedSize;
8,720,920✔
914
    brinBlk.dp->size += info.compressedSize;
8,720,920✔
915
    *fileSize += info.compressedSize;
8,720,920✔
916
  }
917
  for (int32_t i = 10; i < 15; ++i) {
5,232,552✔
918
    SCompressInfo info = {
4,360,460✔
919
        .cmprAlg = cmprAlg,
920
        .dataType = TSDB_DATA_TYPE_INT,
921
        .originalSize = brinBlock->buffers[i].size,
4,360,460✔
922
    };
923

924
    tBufferClear(buffer0);
925
    TAOS_CHECK_RETURN(tCompressDataToBuffer(brinBlock->buffers[i].data, &info, buffer0, assist));
4,360,460✔
926
    TAOS_CHECK_RETURN(tsdbWriteFile(fd, *fileSize, buffer0->data, buffer0->size, encryptData));
4,359,924✔
927
    brinBlk.size[i] = info.compressedSize;
4,360,460✔
928
    brinBlk.dp->size += info.compressedSize;
4,360,460✔
929
    *fileSize += info.compressedSize;
4,360,460✔
930
  }
931

932
  // write primary keys to file
933
  if (brinBlock->numOfPKs > 0) {
872,092✔
934
    tBufferClear(buffer0);
935
    tBufferClear(buffer1);
936

937
    // encode
938
    for (int i = 0; i < brinBlock->numOfPKs; i++) {
9,348✔
939
      SValueColumnCompressInfo info = {.cmprAlg = cmprAlg};
4,674✔
940
      TAOS_CHECK_RETURN(tValueColumnCompress(&brinBlock->firstKeyPKs[i], &info, buffer1, assist));
4,674✔
941
      TAOS_CHECK_RETURN(tValueColumnCompressInfoEncode(&info, buffer0));
4,674✔
942
    }
943
    for (int i = 0; i < brinBlock->numOfPKs; i++) {
9,348✔
944
      SValueColumnCompressInfo info = {.cmprAlg = cmprAlg};
4,674✔
945
      TAOS_CHECK_RETURN(tValueColumnCompress(&brinBlock->lastKeyPKs[i], &info, buffer1, assist));
4,674✔
946
      TAOS_CHECK_RETURN(tValueColumnCompressInfoEncode(&info, buffer0));
4,674✔
947
    }
948

949
    // write to file
950
    TAOS_CHECK_RETURN(tsdbWriteFile(fd, *fileSize, buffer0->data, buffer0->size, encryptData));
4,674✔
951
    *fileSize += buffer0->size;
4,674✔
952
    brinBlk.dp->size += buffer0->size;
4,674✔
953
    TAOS_CHECK_RETURN(tsdbWriteFile(fd, *fileSize, buffer1->data, buffer1->size, encryptData));
4,674✔
954
    *fileSize += buffer1->size;
4,674✔
955
    brinBlk.dp->size += buffer1->size;
4,674✔
956
  }
957

958
  // append to brinBlkArray
959
  TAOS_CHECK_RETURN(TARRAY2_APPEND_PTR(brinBlkArray, &brinBlk));
1,744,184✔
960

961
  tBrinBlockClear(brinBlock);
872,092✔
962

963
  return 0;
872,092✔
964
}
965

966
static int32_t tsdbDataFileWriteBrinBlock(SDataFileWriter *writer) {
872,092✔
967
  if (writer->brinBlock->numOfRecords == 0) {
872,092✔
968
    return 0;
×
969
  }
970

971
  int32_t code = 0;
872,092✔
972
  int32_t lino = 0;
872,092✔
973

974
  SEncryptData *pEncryptData = &(writer->config->tsdb->pVnode->config.tsdbCfg.encryptData);
872,092✔
975

976
  TAOS_CHECK_GOTO(tsdbFileWriteBrinBlock(writer->fd[TSDB_FTYPE_HEAD], writer->brinBlock, writer->config->cmprAlg,
872,092✔
977
                                         &writer->files[TSDB_FTYPE_HEAD].size, writer->brinBlkArray, writer->buffers,
978
                                         &writer->ctx->range, pEncryptData),
979
                  &lino, _exit);
980

981
_exit:
872,092✔
982
  if (code) {
872,092✔
983
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
984
              tstrerror(code));
985
  }
986
  return code;
872,092✔
987
}
988

989
static int32_t tsdbDataFileWriteBrinRecord(SDataFileWriter *writer, const SBrinRecord *record) {
59,739,802✔
990
  int32_t code = 0;
59,739,802✔
991
  int32_t lino = 0;
59,739,802✔
992

993
  for (;;) {
994
    code = tBrinBlockPut(writer->brinBlock, record);
59,741,472✔
995
    if (code == TSDB_CODE_INVALID_PARA) {
59,716,883✔
996
      // different records with different primary keys
997
      TAOS_CHECK_GOTO(tsdbDataFileWriteBrinBlock(writer), &lino, _exit);
1,670✔
998
      continue;
1,670✔
999
    } else {
1000
      TSDB_CHECK_CODE(code, lino, _exit);
59,715,213✔
1001
    }
1002
    break;
59,715,213✔
1003
  }
1004

1005
  if ((writer->brinBlock->numOfRecords) >= 256) {
59,715,213✔
1006
    TAOS_CHECK_GOTO(tsdbDataFileWriteBrinBlock(writer), &lino, _exit);
128,817✔
1007
  }
1008

1009
_exit:
59,717,333✔
1010
  if (code) {
59,713,140✔
1011
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1012
              tstrerror(code));
1013
  }
1014
  return code;
59,713,140✔
1015
}
1016

1017
static int32_t tsdbDataFileDoWriteBlockData(SDataFileWriter *writer, SBlockData *bData) {
65,064,307✔
1018
  if (bData->nRow == 0) {
65,064,307✔
1019
    return 0;
20,890,293✔
1020
  }
1021

1022
  if (!bData->uid) {
44,179,085✔
1023
    return TSDB_CODE_INVALID_PARA;
×
1024
  }
1025

1026
  int32_t  code = 0;
44,179,607✔
1027
  int32_t  lino = 0;
44,179,607✔
1028
  SBuffer *buffers = writer->buffers;
44,180,957✔
1029
  SBuffer *assist = writer->buffers + 4;
44,179,002✔
1030

1031
  SColCompressInfo cmprInfo = {.pColCmpr = NULL, .defaultCmprAlg = writer->config->cmprAlg};
44,179,170✔
1032

1033
  SBrinRecord record[1] = {{
44,179,437✔
1034
      .suid = bData->suid,
44,177,595✔
1035
      .uid = bData->uid,
44,178,983✔
1036
      .minVer = bData->aVersion[0],
44,176,301✔
1037
      .maxVer = bData->aVersion[0],
44,179,168✔
1038
      .blockOffset = writer->files[TSDB_FTYPE_DATA].size,
44,178,244✔
1039
      .smaOffset = writer->files[TSDB_FTYPE_SMA].size,
44,178,840✔
1040
      .blockSize = 0,
1041
      .blockKeySize = 0,
1042
      .smaSize = 0,
1043
      .numRow = bData->nRow,
44,179,382✔
1044
      .count = 1,
1045
  }};
1046

1047
  tsdbRowGetKey(&tsdbRowFromBlockData(bData, 0), &record->firstKey);
44,179,744✔
1048
  tsdbRowGetKey(&tsdbRowFromBlockData(bData, bData->nRow - 1), &record->lastKey);
44,179,272✔
1049

1050
  for (int32_t i = 1; i < bData->nRow; ++i) {
2,147,483,647✔
1051
    if (tsdbRowCompareWithoutVersion(&tsdbRowFromBlockData(bData, i - 1), &tsdbRowFromBlockData(bData, i)) != 0) {
2,147,483,647✔
1052
      record->count++;
2,147,483,647✔
1053
    }
1054
    if (bData->aVersion[i] < record->minVer) {
2,147,483,647✔
1055
      record->minVer = bData->aVersion[i];
6,553,620✔
1056
    }
1057
    if (bData->aVersion[i] > record->maxVer) {
2,147,483,647✔
1058
      record->maxVer = bData->aVersion[i];
372,496,956✔
1059
    }
1060
  }
1061

1062
  tsdbWriterUpdVerRange(&writer->ctx->range, record->minVer, record->maxVer);
44,182,014✔
1063

1064
  code = metaGetColCmpr(writer->config->tsdb->pVnode->pMeta, bData->suid != 0 ? bData->suid : bData->uid,
44,182,014✔
1065
                        &cmprInfo.pColCmpr);
1066
  if (code) {
44,179,452✔
1067
    tsdbWarn("vgId:%d failed to get column compress algrithm", TD_VID(writer->config->tsdb->pVnode));
663✔
1068
  }
1069

1070
  TAOS_CHECK_GOTO(tBlockDataCompress(bData, &cmprInfo, buffers, assist), &lino, _exit);
44,179,452✔
1071

1072
  record->blockKeySize = buffers[0].size + buffers[1].size;
44,155,802✔
1073
  record->blockSize = record->blockKeySize + buffers[2].size + buffers[3].size;
44,160,166✔
1074

1075
  SEncryptData *pEncryptData = &(writer->config->tsdb->pVnode->config.tsdbCfg.encryptData);
44,163,408✔
1076

1077
  for (int i = 0; i < 4; i++) {
220,861,434✔
1078
    TAOS_CHECK_GOTO(tsdbWriteFile(writer->fd[TSDB_FTYPE_DATA], writer->files[TSDB_FTYPE_DATA].size, buffers[i].data,
176,681,259✔
1079
                                  buffers[i].size, pEncryptData),
1080
                    &lino, _exit);
1081
    writer->files[TSDB_FTYPE_DATA].size += buffers[i].size;
176,668,384✔
1082
  }
1083

1084
  // to .sma file
1085
  tBufferClear(&buffers[0]);
1086
  for (int32_t i = 0; i < bData->nColData; ++i) {
311,192,914✔
1087
    SColData *colData = bData->aColData + i;
267,006,604✔
1088
    if ((colData->cflag & COL_SMA_ON) == 0 || ((colData->flag & HAS_VALUE) == 0)) continue;
267,010,971✔
1089

1090
    SColumnDataAgg sma[1] = {{.colId = colData->cid}};
260,780,004✔
1091
    tColDataCalcSMA[colData->type](colData, sma);
260,779,717✔
1092

1093
    TAOS_CHECK_GOTO(tPutColumnDataAgg(&buffers[0], sma), &lino, _exit);
260,783,271✔
1094
  }
1095
  record->smaSize = buffers[0].size;
44,180,293✔
1096

1097
  if (record->smaSize > 0) {
44,180,293✔
1098
    TAOS_CHECK_GOTO(
44,172,895✔
1099
        tsdbWriteFile(writer->fd[TSDB_FTYPE_SMA], record->smaOffset, buffers[0].data, record->smaSize, pEncryptData),
1100
        &lino, _exit);
1101
    writer->files[TSDB_FTYPE_SMA].size += record->smaSize;
44,168,554✔
1102
  }
1103

1104
  // append SBrinRecord
1105
  TAOS_CHECK_GOTO(tsdbDataFileWriteBrinRecord(writer, record), &lino, _exit);
44,179,035✔
1106

1107
  tBlockDataClear(bData);
44,158,248✔
1108

1109
_exit:
44,172,392✔
1110
  if (code) {
44,172,748✔
1111
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1112
              tstrerror(code));
1113
  }
1114
  taosHashCleanup(cmprInfo.pColCmpr);
44,172,748✔
1115
  return code;
44,158,938✔
1116
}
1117

1118
static int32_t tsdbDataFileDoWriteTSRow(SDataFileWriter *writer, TSDBROW *row) {
2,147,483,647✔
1119
  int32_t code = 0;
2,147,483,647✔
1120
  int32_t lino = 0;
2,147,483,647✔
1121

1122
  // update/append
1123
  if (row->type == TSDBROW_ROW_FMT) {
2,147,483,647✔
1124
    TAOS_CHECK_GOTO(
×
1125
        tsdbUpdateSkmRow(writer->config->tsdb, writer->ctx->tbid, TSDBROW_SVERSION(row), writer->config->skmRow), &lino,
1126
        _exit);
1127
  }
1128

1129
  if (TSDBROW_VERSION(row) <= writer->config->compactVersion  //
2,147,483,647✔
1130
      && writer->blockData->nRow > 0                          //
2,147,483,647✔
1131
      &&
2,147,483,647✔
1132
      tsdbRowCompareWithoutVersion(row, &tsdbRowFromBlockData(writer->blockData, writer->blockData->nRow - 1)) == 0  //
2,147,483,647✔
1133
  ) {
1134
    TAOS_CHECK_GOTO(tBlockDataUpdateRow(writer->blockData, row, writer->config->skmRow->pTSchema), &lino, _exit);
2,147,483,647✔
1135
  } else {
1136
    if (writer->blockData->nRow >= writer->config->maxRow) {
2,147,483,647✔
1137
      TAOS_CHECK_GOTO(tsdbDataFileDoWriteBlockData(writer, writer->blockData), &lino, _exit);
3,047,714✔
1138
    }
1139

1140
    TAOS_CHECK_GOTO(
2,147,483,647✔
1141
        tBlockDataAppendRow(writer->blockData, row, writer->config->skmRow->pTSchema, writer->ctx->tbid->uid), &lino,
1142
        _exit);
1143
  }
1144

1145
_exit:
2,147,483,647✔
1146
  if (code) {
2,147,483,647✔
1147
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1148
              tstrerror(code));
1149
  }
1150
  return code;
2,147,483,647✔
1151
}
1152

1153
static FORCE_INLINE int32_t tsdbRowKeyCmprNullAsLargest(const STsdbRowKey *key1, const STsdbRowKey *key2) {
1154
  if (key1 == NULL) {
2,147,483,647✔
1155
    return 1;
592,792✔
1156
  } else if (key2 == NULL) {
2,147,483,647✔
1157
    return -1;
6,589,788✔
1158
  } else {
1159
    return tsdbRowKeyCmpr(key1, key2);
2,147,483,647✔
1160
  }
1161
}
1162

1163
static int32_t tsdbDataFileDoWriteTableOldData(SDataFileWriter *writer, const STsdbRowKey *key) {
2,147,483,647✔
1164
  if (writer->ctx->tbHasOldData == false) {
2,147,483,647✔
1165
    return 0;
×
1166
  }
1167

1168
  int32_t     code = 0;
2,147,483,647✔
1169
  int32_t     lino = 0;
2,147,483,647✔
1170
  STsdbRowKey rowKey;
2,147,483,647✔
1171

1172
  for (;;) {
14,681✔
1173
    for (;;) {
1174
      // SBlockData
1175
      for (; writer->ctx->blockDataIdx < writer->ctx->blockData->nRow; writer->ctx->blockDataIdx++) {
2,147,483,647✔
1176
        TSDBROW row = tsdbRowFromBlockData(writer->ctx->blockData, writer->ctx->blockDataIdx);
2,147,483,647✔
1177

1178
        tsdbRowGetKey(&row, &rowKey);
2,147,483,647✔
1179
        if (tsdbRowKeyCmprNullAsLargest(&rowKey, key) < 0) {  // key <= rowKey
2,147,483,647✔
1180
          TAOS_CHECK_GOTO(tsdbDataFileDoWriteTSRow(writer, &row), &lino, _exit);
2,147,483,647✔
1181
        } else {
1182
          goto _exit;
2,147,483,647✔
1183
        }
1184
      }
1185

1186
      // SBrinBlock
1187
      if (writer->ctx->brinBlockIdx >= writer->ctx->brinBlock->numOfRecords) {
7,476,920✔
1188
        break;
219,837✔
1189
      }
1190

1191
      for (; writer->ctx->brinBlockIdx < writer->ctx->brinBlock->numOfRecords; writer->ctx->brinBlockIdx++) {
9,847,027✔
1192
        SBrinRecord record;
9,712,898✔
1193
        code = tBrinBlockGet(writer->ctx->brinBlock, writer->ctx->brinBlockIdx, &record);
9,712,898✔
1194
        TSDB_CHECK_CODE(code, lino, _exit);
9,712,090✔
1195
        if (record.uid != writer->ctx->tbid->uid) {
9,712,090✔
1196
          writer->ctx->tbHasOldData = false;
208,784✔
1197
          goto _exit;
208,784✔
1198
        }
1199

1200
        if (tsdbRowKeyCmprNullAsLargest(key, &record.firstKey) < 0) {  // key < record->firstKey
9,503,306✔
1201
          goto _exit;
4,365,944✔
1202
        } else {
1203
          SBrinRecord record[1];
5,137,362✔
1204
          code = tBrinBlockGet(writer->ctx->brinBlock, writer->ctx->brinBlockIdx, record);
5,137,362✔
1205
          TSDB_CHECK_CODE(code, lino, _exit);
5,138,170✔
1206
          if (tsdbRowKeyCmprNullAsLargest(key, &record->lastKey) > 0) {  // key > record->lastKey
5,138,170✔
1207
            if (writer->blockData->nRow > 0) {
2,589,944✔
1208
              TAOS_CHECK_GOTO(tsdbDataFileDoWriteBlockData(writer, writer->blockData), &lino, _exit);
1,752✔
1209
            }
1210

1211
            TAOS_CHECK_GOTO(tsdbDataFileWriteBrinRecord(writer, record), &lino, _exit);
2,589,944✔
1212
          } else {
1213
            TAOS_CHECK_GOTO(tsdbDataFileReadBlockData(writer->ctx->reader, record, writer->ctx->blockData), &lino,
2,548,226✔
1214
                            _exit);
1215

1216
            writer->ctx->blockDataIdx = 0;
2,548,226✔
1217
            writer->ctx->brinBlockIdx++;
2,548,226✔
1218
            break;
2,548,226✔
1219
          }
1220
        }
1221
      }
1222
    }
1223

1224
    // SBrinBlk
1225
    if (writer->ctx->brinBlkArrayIdx >= TARRAY2_SIZE(writer->ctx->brinBlkArray)) {
219,837✔
1226
      writer->ctx->brinBlkArray = NULL;
205,156✔
1227
      writer->ctx->tbHasOldData = false;
205,156✔
1228
      goto _exit;
205,156✔
1229
    } else {
1230
      const SBrinBlk *brinBlk = TARRAY2_GET_PTR(writer->ctx->brinBlkArray, writer->ctx->brinBlkArrayIdx);
14,681✔
1231

1232
      if (brinBlk->minTbid.uid != writer->ctx->tbid->uid) {
14,681✔
1233
        writer->ctx->tbHasOldData = false;
×
1234
        goto _exit;
×
1235
      }
1236

1237
      TAOS_CHECK_GOTO(tsdbDataFileReadBrinBlock(writer->ctx->reader, brinBlk, writer->ctx->brinBlock), &lino, _exit);
14,681✔
1238

1239
      writer->ctx->brinBlockIdx = 0;
14,681✔
1240
      writer->ctx->brinBlkArrayIdx++;
14,681✔
1241
    }
1242
  }
1243

1244
_exit:
2,147,483,647✔
1245
  if (code) {
2,147,483,647✔
1246
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1247
              tstrerror(code));
1248
  }
1249
  return code;
2,147,483,647✔
1250
}
1251

1252
static int32_t tsdbDataFileDoWriteTSData(SDataFileWriter *writer, TSDBROW *row) {
2,147,483,647✔
1253
  int32_t code = 0;
2,147,483,647✔
1254
  int32_t lino = 0;
2,147,483,647✔
1255

1256
  if (writer->ctx->tbHasOldData) {
2,147,483,647✔
1257
    STsdbRowKey key;
2,147,483,647✔
1258
    tsdbRowGetKey(row, &key);
2,147,483,647✔
1259
    TAOS_CHECK_GOTO(tsdbDataFileDoWriteTableOldData(writer, &key), &lino, _exit);
2,147,483,647✔
1260
  }
1261

1262
  TAOS_CHECK_GOTO(tsdbDataFileDoWriteTSRow(writer, row), &lino, _exit);
2,147,483,647✔
1263

1264
_exit:
2,147,483,647✔
1265
  if (code) {
2,147,483,647✔
1266
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1267
              tstrerror(code));
1268
  }
1269
  return code;
2,147,483,647✔
1270
}
1271

1272
static int32_t tsdbDataFileWriteTableDataEnd(SDataFileWriter *writer) {
22,731,183✔
1273
  if (writer->ctx->tbid->uid == 0) {
22,731,183✔
1274
    return 0;
741,605✔
1275
  }
1276

1277
  int32_t code = 0;
21,991,120✔
1278
  int32_t lino = 0;
21,991,120✔
1279

1280
  if (writer->ctx->tbHasOldData) {
21,992,061✔
1281
    TAOS_CHECK_GOTO(tsdbDataFileDoWriteTableOldData(writer, NULL /* as the largest key */), &lino, _exit);
148,212✔
1282
  }
1283

1284
  TAOS_CHECK_GOTO(tsdbDataFileDoWriteBlockData(writer, writer->blockData), &lino, _exit);
21,990,729✔
1285

1286
_exit:
21,991,576✔
1287
  if (code) {
21,991,576✔
1288
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1289
              tstrerror(code));
1290
  }
1291
  return code;
21,989,021✔
1292
}
1293

1294
static int32_t tsdbDataFileWriteTableDataBegin(SDataFileWriter *writer, const TABLEID *tbid) {
22,728,671✔
1295
  int32_t code = 0;
22,728,671✔
1296
  int32_t lino = 0;
22,728,671✔
1297

1298
  SMetaInfo info;
22,730,139✔
1299
  bool      drop = false;
22,731,475✔
1300
  TABLEID   tbid1[1];
22,731,475✔
1301
  writer->ctx->tbHasOldData = false;
22,732,484✔
1302
  while (writer->ctx->brinBlkArray) {  // skip data of previous table
23,028,332✔
1303
    for (; writer->ctx->brinBlockIdx < writer->ctx->brinBlock->numOfRecords; writer->ctx->brinBlockIdx++) {
16,303,928✔
1304
      SBrinRecord record;
15,934,666✔
1305
      TAOS_CHECK_GOTO(tBrinBlockGet(writer->ctx->brinBlock, writer->ctx->brinBlockIdx, &record), &lino, _exit);
15,934,268✔
1306

1307
      if (record.uid == tbid->uid) {
15,935,064✔
1308
        writer->ctx->tbHasOldData = true;
413,940✔
1309
        goto _begin;
413,940✔
1310
      } else if (record.suid > tbid->suid || (record.suid == tbid->suid && record.uid > tbid->uid)) {
15,519,504✔
1311
        goto _begin;
2,548,303✔
1312
      } else {
1313
        if (record.uid != writer->ctx->tbid->uid) {
12,971,201✔
1314
          if (drop && tbid1->uid == record.uid) {
4,455,509✔
1315
            continue;
×
1316
          } else if (metaGetInfo(writer->config->tsdb->pVnode->pMeta, record.uid, &info, NULL) != 0) {
4,455,509✔
1317
            drop = true;
1,326✔
1318
            tbid1->suid = record.suid;
1,326✔
1319
            tbid1->uid = record.uid;
1,326✔
1320
            continue;
1,326✔
1321
          } else {
1322
            drop = false;
4,454,382✔
1323
            writer->ctx->tbid->suid = record.suid;
4,454,382✔
1324
            writer->ctx->tbid->uid = record.uid;
4,454,183✔
1325
          }
1326
        }
1327

1328
        TAOS_CHECK_GOTO(tsdbDataFileWriteBrinRecord(writer, &record), &lino, _exit);
12,971,495✔
1329
      }
1330
    }
1331

1332
    if (writer->ctx->brinBlkArrayIdx >= TARRAY2_SIZE(writer->ctx->brinBlkArray)) {
370,882✔
1333
      writer->ctx->brinBlkArray = NULL;
74,824✔
1334
      break;
74,824✔
1335
    } else {
1336
      const SBrinBlk *brinBlk = TARRAY2_GET_PTR(writer->ctx->brinBlkArray, writer->ctx->brinBlkArrayIdx);
296,058✔
1337

1338
      TAOS_CHECK_GOTO(tsdbDataFileReadBrinBlock(writer->ctx->reader, brinBlk, writer->ctx->brinBlock), &lino, _exit);
296,058✔
1339

1340
      writer->ctx->brinBlockIdx = 0;
296,058✔
1341
      writer->ctx->brinBlkArrayIdx++;
296,058✔
1342
    }
1343
  }
1344

1345
_begin:
22,733,902✔
1346
  writer->ctx->tbid[0] = *tbid;
22,733,579✔
1347

1348
  if (tbid->uid == INT64_MAX) {
22,732,690✔
1349
    goto _exit;
741,605✔
1350
  }
1351

1352
  TAOS_CHECK_GOTO(tsdbUpdateSkmTb(writer->config->tsdb, tbid, writer->config->skmTb), &lino, _exit);
21,991,445✔
1353
  TAOS_CHECK_GOTO(tBlockDataInit(writer->blockData, writer->ctx->tbid, writer->config->skmTb->pTSchema, NULL, 0), &lino,
21,991,085✔
1354
                  _exit);
1355

1356
_exit:
22,732,533✔
1357
  if (code) {
22,732,732✔
1358
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1359
              tstrerror(code));
1360
  }
1361
  return code;
22,732,732✔
1362
}
1363

1364
int32_t tsdbFileWriteHeadFooter(STsdbFD *fd, int64_t *fileSize, const SHeadFooter *footer, SEncryptData *encryptData) {
741,605✔
1365
  TAOS_CHECK_RETURN(tsdbWriteFile(fd, *fileSize, (const uint8_t *)footer, sizeof(*footer), encryptData));
741,605✔
1366
  *fileSize += sizeof(*footer);
741,605✔
1367
  return 0;
741,605✔
1368
}
1369

1370
int32_t tsdbFileWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAlg, int64_t *fileSize,
1,875,118✔
1371
                               TTombBlkArray *tombBlkArray, SBuffer *buffers, SVersionRange *range,
1372
                               SEncryptData *encryptData) {
1373
  int32_t code;
1374

1375
  if (TOMB_BLOCK_SIZE(tombBlock) == 0) {
1,875,118✔
1376
    return 0;
×
1377
  }
1378

1379
  SBuffer *buffer0 = buffers + 0;
1,875,118✔
1380
  SBuffer *assist = buffers + 1;
1,875,118✔
1381

1382
  STombBlk tombBlk = {
1,875,118✔
1383
      .dp[0] =
1384
          {
1385
              .offset = *fileSize,
1,875,118✔
1386
              .size = 0,
1387
          },
1388
      .numRec = TOMB_BLOCK_SIZE(tombBlock),
1,875,118✔
1389
      .cmprAlg = cmprAlg,
1390
  };
1391
  for (int i = 0; i < TOMB_BLOCK_SIZE(tombBlock); i++) {
45,933,288✔
1392
    STombRecord record;
44,058,170✔
1393
    TAOS_CHECK_RETURN(tTombBlockGet(tombBlock, i, &record));
44,058,170✔
1394

1395
    if (i == 0) {
44,058,170✔
1396
      tombBlk.minTbid.suid = record.suid;
1,875,118✔
1397
      tombBlk.minTbid.uid = record.uid;
1,875,118✔
1398
      tombBlk.minVer = record.version;
1,875,118✔
1399
      tombBlk.maxVer = record.version;
1,875,118✔
1400
    }
1401
    if (i == TOMB_BLOCK_SIZE(tombBlock) - 1) {
44,058,170✔
1402
      tombBlk.maxTbid.suid = record.suid;
1,875,118✔
1403
      tombBlk.maxTbid.uid = record.uid;
1,875,118✔
1404
    }
1405
    if (record.version < tombBlk.minVer) {
44,058,170✔
1406
      tombBlk.minVer = record.version;
1,429,141✔
1407
    }
1408
    if (record.version > tombBlk.maxVer) {
44,058,170✔
1409
      tombBlk.maxVer = record.version;
4,148,253✔
1410
    }
1411
  }
1412

1413
  tsdbWriterUpdVerRange(range, tombBlk.minVer, tombBlk.maxVer);
1,875,118✔
1414

1415
  for (int32_t i = 0; i < ARRAY_SIZE(tombBlock->buffers); i++) {
11,250,708✔
1416
    tBufferClear(buffer0);
1417

1418
    SCompressInfo cinfo = {
9,375,590✔
1419
        .cmprAlg = cmprAlg,
1420
        .dataType = TSDB_DATA_TYPE_BIGINT,
1421
        .originalSize = tombBlock->buffers[i].size,
9,375,590✔
1422
    };
1423
    TAOS_CHECK_RETURN(tCompressDataToBuffer(tombBlock->buffers[i].data, &cinfo, buffer0, assist));
9,375,590✔
1424
    TAOS_CHECK_RETURN(tsdbWriteFile(fd, *fileSize, buffer0->data, buffer0->size, encryptData));
9,375,590✔
1425

1426
    tombBlk.size[i] = cinfo.compressedSize;
9,375,590✔
1427
    tombBlk.dp->size += tombBlk.size[i];
9,375,590✔
1428
    *fileSize += tombBlk.size[i];
9,375,590✔
1429
  }
1430

1431
  TAOS_CHECK_RETURN(TARRAY2_APPEND_PTR(tombBlkArray, &tombBlk));
3,750,236✔
1432

1433
  tTombBlockClear(tombBlock);
1,875,118✔
1434
  return 0;
1,875,118✔
1435
}
1436

1437
static int32_t tsdbDataFileWriteHeadFooter(SDataFileWriter *writer) {
741,605✔
1438
  int32_t code = 0;
741,605✔
1439
  int32_t lino = 0;
741,605✔
1440

1441
  SEncryptData *pEncryptData = &(writer->config->tsdb->pVnode->config.tsdbCfg.encryptData);
741,605✔
1442

1443
  TAOS_CHECK_GOTO(tsdbFileWriteHeadFooter(writer->fd[TSDB_FTYPE_HEAD], &writer->files[TSDB_FTYPE_HEAD].size,
741,605✔
1444
                                          writer->headFooter, pEncryptData),
1445
                  &lino, _exit);
1446

1447
_exit:
741,605✔
1448
  if (code) {
741,605✔
1449
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1450
              tstrerror(code));
1451
  }
1452
  return code;
741,605✔
1453
}
1454

1455
static int32_t tsdbDataFileDoWriteTombBlock(SDataFileWriter *writer) {
156,191✔
1456
  if (TOMB_BLOCK_SIZE(writer->tombBlock) == 0) return 0;
156,191✔
1457

1458
  int32_t code = 0;
156,191✔
1459
  int32_t lino = 0;
156,191✔
1460

1461
  SEncryptData *pEncryptData = &(writer->config->tsdb->pVnode->config.tsdbCfg.encryptData);
156,191✔
1462

1463
  TAOS_CHECK_GOTO(tsdbFileWriteTombBlock(writer->fd[TSDB_FTYPE_TOMB], writer->tombBlock, writer->config->cmprAlg,
156,191✔
1464
                                         &writer->files[TSDB_FTYPE_TOMB].size, writer->tombBlkArray, writer->buffers,
1465
                                         &writer->ctx->tombRange, pEncryptData),
1466
                  &lino, _exit);
1467

1468
_exit:
156,191✔
1469
  if (code) {
156,191✔
1470
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1471
              tstrerror(code));
1472
  }
1473
  return code;
156,191✔
1474
}
1475

1476
int32_t tsdbFileWriteTombBlk(STsdbFD *fd, const TTombBlkArray *tombBlkArray, SFDataPtr *ptr, int64_t *fileSize,
20,646,413✔
1477
                             SEncryptData *encryptData) {
1478
  ptr->size = TARRAY2_DATA_LEN(tombBlkArray);
20,646,413✔
1479
  if (ptr->size > 0) {
20,672,043✔
1480
    ptr->offset = *fileSize;
1,875,118✔
1481

1482
    TAOS_CHECK_RETURN(
1,875,118✔
1483
        tsdbWriteFile(fd, *fileSize, (const uint8_t *)TARRAY2_DATA(tombBlkArray), ptr->size, encryptData));
1484

1485
    *fileSize += ptr->size;
1,875,118✔
1486
  }
1487
  return 0;
20,667,721✔
1488
}
1489

1490
static int32_t tsdbDataFileDoWriteTombBlk(SDataFileWriter *writer) {
156,191✔
1491
  if (TARRAY2_SIZE(writer->tombBlkArray) <= 0) {
156,191✔
1492
    return TSDB_CODE_INVALID_PARA;
×
1493
  }
1494

1495
  int32_t code = 0;
156,191✔
1496
  int32_t lino = 0;
156,191✔
1497

1498
  SEncryptData *pEncryptData = &(writer->config->tsdb->pVnode->config.tsdbCfg.encryptData);
156,191✔
1499

1500
  TAOS_CHECK_GOTO(
156,191✔
1501
      tsdbFileWriteTombBlk(writer->fd[TSDB_FTYPE_TOMB], writer->tombBlkArray, writer->tombFooter->tombBlkPtr,
1502
                           &writer->files[TSDB_FTYPE_TOMB].size, pEncryptData),
1503
      &lino, _exit);
1504

1505
_exit:
156,191✔
1506
  if (code) {
156,191✔
1507
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1508
              tstrerror(code));
1509
  }
1510
  return code;
156,191✔
1511
}
1512

1513
int32_t tsdbFileWriteTombFooter(STsdbFD *fd, const STombFooter *footer, int64_t *fileSize, SEncryptData *encryptData) {
156,191✔
1514
  TAOS_CHECK_RETURN(tsdbWriteFile(fd, *fileSize, (const uint8_t *)footer, sizeof(*footer), encryptData));
156,191✔
1515
  *fileSize += sizeof(*footer);
156,191✔
1516
  return 0;
156,191✔
1517
}
1518

1519
static int32_t tsdbDataFileWriteTombFooter(SDataFileWriter *writer) {
156,191✔
1520
  int32_t code = 0;
156,191✔
1521
  int32_t lino = 0;
156,191✔
1522

1523
  SEncryptData *pEncryptData = &(writer->config->tsdb->pVnode->config.tsdbCfg.encryptData);
156,191✔
1524

1525
  TAOS_CHECK_GOTO(tsdbFileWriteTombFooter(writer->fd[TSDB_FTYPE_TOMB], writer->tombFooter,
156,191✔
1526
                                          &writer->files[TSDB_FTYPE_TOMB].size, pEncryptData),
1527
                  &lino, _exit);
1528

1529
_exit:
156,191✔
1530
  if (code) {
156,191✔
1531
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1532
              tstrerror(code));
1533
  }
1534
  return code;
156,191✔
1535
}
1536

1537
static int32_t tsdbDataFileDoWriteTombRecord(SDataFileWriter *writer, const STombRecord *record) {
606,720✔
1538
  int32_t code = 0;
606,720✔
1539
  int32_t lino = 0;
606,720✔
1540

1541
  while (writer->ctx->hasOldTomb) {
721,107✔
1542
    for (; writer->ctx->tombBlockIdx < TOMB_BLOCK_SIZE(writer->ctx->tombBlock); writer->ctx->tombBlockIdx++) {
37,336,605✔
1543
      STombRecord record1[1];
37,107,831✔
1544
      TAOS_CHECK_GOTO(tTombBlockGet(writer->ctx->tombBlock, writer->ctx->tombBlockIdx, record1), &lino, _exit);
37,107,831✔
1545

1546
      int32_t c = tTombRecordCompare(record, record1);
37,107,831✔
1547
      if (c < 0) {
37,107,831✔
1548
        goto _write;
392,650✔
1549
      } else if (c > 0) {
36,715,181✔
1550
        TAOS_CHECK_GOTO(tTombBlockPut(writer->tombBlock, record1), &lino, _exit);
36,715,181✔
1551

1552
        tsdbTrace("vgId:%d write tomb record to tomb file:%s, cid:%" PRId64 ", suid:%" PRId64 ", uid:%" PRId64
36,715,181✔
1553
                  ", version:%" PRId64,
1554
                  TD_VID(writer->config->tsdb->pVnode), writer->fd[TSDB_FTYPE_TOMB]->path, writer->config->cid,
1555
                  record1->suid, record1->uid, record1->version);
1556

1557
        if (TOMB_BLOCK_SIZE(writer->tombBlock) >= writer->config->maxRow) {
36,715,181✔
1558
          TAOS_CHECK_GOTO(tsdbDataFileDoWriteTombBlock(writer), &lino, _exit);
×
1559
        }
1560
      } else {
1561
        tsdbError("vgId:%d duplicate tomb record, cid:%" PRId64 ", suid:%" PRId64 ", uid:%" PRId64 ", version:%" PRId64,
×
1562
                  TD_VID(writer->config->tsdb->pVnode), writer->config->cid, record->suid, record->uid,
1563
                  record->version);
1564
      }
1565
    }
1566

1567
    if (writer->ctx->tombBlkArrayIdx >= TARRAY2_SIZE(writer->ctx->tombBlkArray)) {
228,774✔
1568
      writer->ctx->hasOldTomb = false;
114,387✔
1569
      break;
114,387✔
1570
    } else {
1571
      const STombBlk *tombBlk = TARRAY2_GET_PTR(writer->ctx->tombBlkArray, writer->ctx->tombBlkArrayIdx);
114,387✔
1572

1573
      TAOS_CHECK_GOTO(tsdbDataFileReadTombBlock(writer->ctx->reader, tombBlk, writer->ctx->tombBlock), &lino, _exit);
114,387✔
1574

1575
      writer->ctx->tombBlockIdx = 0;
114,387✔
1576
      writer->ctx->tombBlkArrayIdx++;
114,387✔
1577
    }
1578
  }
1579

1580
_write:
606,720✔
1581
  if (record->suid == INT64_MAX) {
606,720✔
1582
    goto _exit;
156,191✔
1583
  }
1584

1585
  TAOS_CHECK_GOTO(tTombBlockPut(writer->tombBlock, record), &lino, _exit);
450,529✔
1586

1587
  tsdbTrace("vgId:%d write tomb record to tomb file:%s, cid:%" PRId64 ", suid:%" PRId64 ", uid:%" PRId64
450,529✔
1588
            ", version:%" PRId64,
1589
            TD_VID(writer->config->tsdb->pVnode), writer->fd[TSDB_FTYPE_TOMB]->path, writer->config->cid, record->suid,
1590
            record->uid, record->version);
1591

1592
  if (TOMB_BLOCK_SIZE(writer->tombBlock) >= writer->config->maxRow) {
450,529✔
1593
    TAOS_CHECK_GOTO(tsdbDataFileDoWriteTombBlock(writer), &lino, _exit);
×
1594
  }
1595

1596
_exit:
606,720✔
1597
  if (code) {
606,720✔
1598
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1599
              tstrerror(code));
1600
  }
1601
  return code;
606,720✔
1602
}
1603

1604
int32_t tsdbFileWriteBrinBlk(STsdbFD *fd, TBrinBlkArray *brinBlkArray, SFDataPtr *ptr, int64_t *fileSize,
741,605✔
1605
                             SEncryptData *encryptData) {
1606
  if (TARRAY2_SIZE(brinBlkArray) <= 0) {
741,605✔
1607
    return TSDB_CODE_INVALID_PARA;
×
1608
  }
1609
  ptr->offset = *fileSize;
741,605✔
1610
  ptr->size = TARRAY2_DATA_LEN(brinBlkArray);
741,605✔
1611

1612
  TAOS_CHECK_RETURN(tsdbWriteFile(fd, ptr->offset, (uint8_t *)TARRAY2_DATA(brinBlkArray), ptr->size, encryptData));
741,605✔
1613

1614
  *fileSize += ptr->size;
741,605✔
1615
  return 0;
741,605✔
1616
}
1617

1618
static int32_t tsdbDataFileWriteBrinBlk(SDataFileWriter *writer) {
741,605✔
1619
  int32_t code = 0;
741,605✔
1620
  int32_t lino = 0;
741,605✔
1621

1622
  SEncryptData *pEncryptData = &(writer->config->tsdb->pVnode->config.tsdbCfg.encryptData);
741,605✔
1623

1624
  TAOS_CHECK_GOTO(
741,605✔
1625
      tsdbFileWriteBrinBlk(writer->fd[TSDB_FTYPE_HEAD], writer->brinBlkArray, writer->headFooter->brinBlkPtr,
1626
                           &writer->files[TSDB_FTYPE_HEAD].size, pEncryptData),
1627
      &lino, _exit);
1628

1629
_exit:
741,605✔
1630
  if (code) {
741,605✔
1631
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1632
              tstrerror(code));
1633
  }
1634
  return code;
741,605✔
1635
}
1636

1637
void tsdbTFileUpdVerRange(STFile *f, SVersionRange range) {
23,746,382✔
1638
  f->minVer = TMIN(f->minVer, range.minVer);
23,746,382✔
1639
  f->maxVer = TMAX(f->maxVer, range.maxVer);
23,754,147✔
1640
}
23,755,322✔
1641

1642
static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer, TFileOpArray *opArr) {
894,958✔
1643
  int32_t code = 0;
894,958✔
1644
  int32_t lino = 0;
894,958✔
1645

1646
  int32_t  ftype;
1647
  STFileOp op;
894,958✔
1648

1649
  if (writer->fd[TSDB_FTYPE_HEAD]) {
894,958✔
1650
    TABLEID tbid[1] = {{
741,605✔
1651
        .suid = INT64_MAX,
1652
        .uid = INT64_MAX,
1653
    }};
1654

1655
    TAOS_CHECK_GOTO(tsdbDataFileWriteTableDataEnd(writer), &lino, _exit);
741,605✔
1656
    TAOS_CHECK_GOTO(tsdbDataFileWriteTableDataBegin(writer, tbid), &lino, _exit);
741,605✔
1657
    TAOS_CHECK_GOTO(tsdbDataFileWriteBrinBlock(writer), &lino, _exit);
741,605✔
1658
    TAOS_CHECK_GOTO(tsdbDataFileWriteBrinBlk(writer), &lino, _exit);
741,605✔
1659
    TAOS_CHECK_GOTO(tsdbDataFileWriteHeadFooter(writer), &lino, _exit);
741,605✔
1660

1661
    SVersionRange ofRange = {.minVer = VERSION_MAX, .maxVer = VERSION_MIN};
741,605✔
1662

1663
    // .head
1664
    ftype = TSDB_FTYPE_HEAD;
741,605✔
1665
    if (writer->config->files[ftype].exist) {
741,605✔
1666
      op = (STFileOp){
279,980✔
1667
          .optype = TSDB_FOP_REMOVE,
1668
          .fid = writer->config->fid,
279,980✔
1669
          .of = writer->config->files[ftype].file,
279,980✔
1670
      };
1671
      ofRange = (SVersionRange){.minVer = op.of.minVer, .maxVer = op.of.maxVer};
279,980✔
1672
      TAOS_CHECK_GOTO(TARRAY2_APPEND(opArr, op), &lino, _exit);
559,960✔
1673
    }
1674
    op = (STFileOp){
741,605✔
1675
        .optype = TSDB_FOP_CREATE,
1676
        .fid = writer->config->fid,
741,605✔
1677
        .nf = writer->files[ftype],
741,605✔
1678
    };
1679
    tsdbTFileUpdVerRange(&op.nf, ofRange);
741,605✔
1680
    tsdbTFileUpdVerRange(&op.nf, writer->ctx->range);
741,605✔
1681
    TAOS_CHECK_GOTO(TARRAY2_APPEND(opArr, op), &lino, _exit);
1,483,210✔
1682

1683
    // .data
1684
    ftype = TSDB_FTYPE_DATA;
741,605✔
1685
    if (!writer->config->files[ftype].exist) {
741,605✔
1686
      op = (STFileOp){
461,625✔
1687
          .optype = TSDB_FOP_CREATE,
1688
          .fid = writer->config->fid,
461,625✔
1689
          .nf = writer->files[ftype],
461,625✔
1690
      };
1691
      tsdbTFileUpdVerRange(&op.nf, writer->ctx->range);
461,625✔
1692
      TAOS_CHECK_GOTO(TARRAY2_APPEND(opArr, op), &lino, _exit);
923,250✔
1693
    } else if (writer->config->files[ftype].file.size != writer->files[ftype].size) {
279,980✔
1694
      op = (STFileOp){
279,980✔
1695
          .optype = TSDB_FOP_MODIFY,
1696
          .fid = writer->config->fid,
279,980✔
1697
          .of = writer->config->files[ftype].file,
279,980✔
1698
          .nf = writer->files[ftype],
279,980✔
1699
      };
1700
      tsdbTFileUpdVerRange(&op.nf, writer->ctx->range);
279,980✔
1701
      TAOS_CHECK_GOTO(TARRAY2_APPEND(opArr, op), &lino, _exit);
559,960✔
1702
    }
1703

1704
    // .sma
1705
    ftype = TSDB_FTYPE_SMA;
741,605✔
1706
    if (!writer->config->files[ftype].exist) {
741,605✔
1707
      op = (STFileOp){
461,625✔
1708
          .optype = TSDB_FOP_CREATE,
1709
          .fid = writer->config->fid,
461,625✔
1710
          .nf = writer->files[ftype],
461,625✔
1711
      };
1712
      tsdbTFileUpdVerRange(&op.nf, writer->ctx->range);
461,625✔
1713
      TAOS_CHECK_GOTO(TARRAY2_APPEND(opArr, op), &lino, _exit);
923,250✔
1714
    } else if (writer->config->files[ftype].file.size != writer->files[ftype].size) {
279,980✔
1715
      op = (STFileOp){
279,980✔
1716
          .optype = TSDB_FOP_MODIFY,
1717
          .fid = writer->config->fid,
279,980✔
1718
          .of = writer->config->files[ftype].file,
279,980✔
1719
          .nf = writer->files[ftype],
279,980✔
1720
      };
1721
      tsdbTFileUpdVerRange(&op.nf, writer->ctx->range);
279,980✔
1722
      TAOS_CHECK_GOTO(TARRAY2_APPEND(opArr, op), &lino, _exit);
559,960✔
1723
    }
1724
  }
1725

1726
  if (writer->fd[TSDB_FTYPE_TOMB]) {
894,958✔
1727
    STombRecord record[1] = {{
156,191✔
1728
        .suid = INT64_MAX,
1729
        .uid = INT64_MAX,
1730
        .version = INT64_MAX,
1731
    }};
1732

1733
    TAOS_CHECK_GOTO(tsdbDataFileDoWriteTombRecord(writer, record), &lino, _exit);
156,191✔
1734
    TAOS_CHECK_GOTO(tsdbDataFileDoWriteTombBlock(writer), &lino, _exit);
156,191✔
1735
    TAOS_CHECK_GOTO(tsdbDataFileDoWriteTombBlk(writer), &lino, _exit);
156,191✔
1736
    TAOS_CHECK_GOTO(tsdbDataFileWriteTombFooter(writer), &lino, _exit);
156,191✔
1737

1738
    SVersionRange ofRange = (SVersionRange){.minVer = VERSION_MAX, .maxVer = VERSION_MIN};
156,191✔
1739

1740
    ftype = TSDB_FTYPE_TOMB;
156,191✔
1741
    if (writer->config->files[ftype].exist) {
156,191✔
1742
      op = (STFileOp){
114,387✔
1743
          .optype = TSDB_FOP_REMOVE,
1744
          .fid = writer->config->fid,
114,387✔
1745
          .of = writer->config->files[ftype].file,
114,387✔
1746
      };
1747
      ofRange = (SVersionRange){.minVer = op.of.minVer, .maxVer = op.of.maxVer};
114,387✔
1748
      TAOS_CHECK_GOTO(TARRAY2_APPEND(opArr, op), &lino, _exit);
228,774✔
1749
    }
1750
    op = (STFileOp){
156,191✔
1751
        .optype = TSDB_FOP_CREATE,
1752
        .fid = writer->config->fid,
156,191✔
1753
        .nf = writer->files[ftype],
156,191✔
1754
    };
1755
    tsdbTFileUpdVerRange(&op.nf, ofRange);
156,191✔
1756
    tsdbTFileUpdVerRange(&op.nf, writer->ctx->tombRange);
156,191✔
1757
    TAOS_CHECK_GOTO(TARRAY2_APPEND(opArr, op), &lino, _exit);
312,382✔
1758
  }
1759
  SEncryptData *pEncryptData = &(writer->config->tsdb->pVnode->config.tsdbCfg.encryptData);
894,958✔
1760
  for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) {
4,474,254✔
1761
    if (writer->fd[i]) {
3,579,296✔
1762
      TAOS_CHECK_GOTO(tsdbFsyncFile(writer->fd[i], pEncryptData), &lino, _exit);
2,381,006✔
1763
      tsdbCloseFile(&writer->fd[i]);
2,381,006✔
1764
    }
1765
  }
1766

1767
_exit:
894,958✔
1768
  if (code) {
894,958✔
1769
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1770
              tstrerror(code));
1771
  }
1772
  return code;
894,958✔
1773
}
1774

1775
static int32_t tsdbDataFileWriterOpenDataFD(SDataFileWriter *writer) {
741,605✔
1776
  int32_t code = 0;
741,605✔
1777
  int32_t lino = 0;
741,605✔
1778

1779
  int32_t ftypes[] = {TSDB_FTYPE_HEAD, TSDB_FTYPE_DATA, TSDB_FTYPE_SMA};
741,605✔
1780

1781
  for (int32_t i = 0; i < ARRAY_SIZE(ftypes); ++i) {
2,966,420✔
1782
    int32_t ftype = ftypes[i];
2,224,815✔
1783

1784
    char    fname[TSDB_FILENAME_LEN];
2,224,815✔
1785
    int32_t flag = TD_FILE_READ | TD_FILE_WRITE;
2,224,815✔
1786

1787
    if (writer->files[ftype].size == 0) {
2,224,815✔
1788
      flag |= (TD_FILE_CREATE | TD_FILE_TRUNC);
1,664,855✔
1789
    }
1790

1791
    int32_t lcn = writer->files[ftype].lcn;
2,224,815✔
1792
    tsdbTFileName(writer->config->tsdb, &writer->files[ftype], fname);
2,224,815✔
1793
    TAOS_CHECK_GOTO(tsdbOpenFile(fname, writer->config->tsdb, flag, &writer->fd[ftype], lcn), &lino, _exit);
2,224,815✔
1794

1795
    if (writer->files[ftype].size == 0) {
2,224,815✔
1796
      uint8_t hdr[TSDB_FHDR_SIZE] = {0};
1,664,855✔
1797

1798
      SEncryptData *pEncryptData = &(writer->config->tsdb->pVnode->config.tsdbCfg.encryptData);
1,664,855✔
1799

1800
      TAOS_CHECK_GOTO(tsdbWriteFile(writer->fd[ftype], 0, hdr, TSDB_FHDR_SIZE, pEncryptData), &lino, _exit);
1,664,855✔
1801

1802
      writer->files[ftype].size += TSDB_FHDR_SIZE;
1,664,855✔
1803
    }
1804
  }
1805

1806
  if (writer->ctx->reader) {
741,605✔
1807
    TAOS_CHECK_GOTO(tsdbDataFileReadBrinBlk(writer->ctx->reader, &writer->ctx->brinBlkArray), &lino, _exit);
279,980✔
1808
  }
1809

1810
_exit:
741,605✔
1811
  if (code) {
741,605✔
1812
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1813
              tstrerror(code));
1814
  }
1815
  return code;
741,605✔
1816
}
1817

1818
int32_t tsdbDataFileWriterOpen(const SDataFileWriterConfig *config, SDataFileWriter **writer) {
4,962,915✔
1819
  writer[0] = taosMemoryCalloc(1, sizeof(*writer[0]));
4,962,915✔
1820
  if (!writer[0]) {
4,962,715✔
1821
    return terrno;
×
1822
  }
1823

1824
  writer[0]->config[0] = config[0];
4,965,931✔
1825
  return 0;
4,971,827✔
1826
}
1827

1828
int32_t tsdbDataFileWriterClose(SDataFileWriter **writer, bool abort, TFileOpArray *opArr) {
4,973,082✔
1829
  if (writer == NULL || writer[0] == NULL) return 0;
4,973,082✔
1830

1831
  int32_t code = 0;
4,973,618✔
1832
  int32_t lino = 0;
4,973,618✔
1833

1834
  if (writer[0]->ctx->opened) {
4,973,618✔
1835
    if (abort) {
894,958✔
1836
      TAOS_CHECK_GOTO(tsdbDataFileWriterCloseAbort(writer[0]), &lino, _exit);
×
1837
    } else {
1838
      TAOS_CHECK_GOTO(tsdbDataFileWriterCloseCommit(writer[0], opArr), &lino, _exit);
894,958✔
1839
    }
1840
    tsdbDataFileWriterDoClose(writer[0]);
894,958✔
1841
  }
1842
  taosMemoryFree(writer[0]);
4,973,082✔
1843
  writer[0] = NULL;
4,970,938✔
1844

1845
_exit:
4,972,010✔
1846
  if (code) {
4,972,010✔
1847
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer[0]->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1848
              tstrerror(code));
1849
  }
1850
  return code;
4,971,474✔
1851
}
1852

1853
int32_t tsdbDataFileWriteRow(SDataFileWriter *writer, SRowInfo *row) {
2,147,483,647✔
1854
  int32_t code = 0;
2,147,483,647✔
1855
  int32_t lino = 0;
2,147,483,647✔
1856

1857
  if (!writer->ctx->opened) {
2,147,483,647✔
1858
    TAOS_CHECK_GOTO(tsdbDataFileWriterDoOpen(writer), &lino, _exit);
91,688✔
1859
  }
1860

1861
  if (writer->fd[TSDB_FTYPE_HEAD] == NULL) {
2,147,483,647✔
1862
    TAOS_CHECK_GOTO(tsdbDataFileWriterOpenDataFD(writer), &lino, _exit);
91,688✔
1863
  }
1864

1865
  if (row->uid != writer->ctx->tbid->uid) {
2,147,483,647✔
1866
    TAOS_CHECK_GOTO(tsdbDataFileWriteTableDataEnd(writer), &lino, _exit);
185,679✔
1867
    TAOS_CHECK_GOTO(tsdbDataFileWriteTableDataBegin(writer, (TABLEID *)row), &lino, _exit);
185,679✔
1868
  }
1869

1870
  TAOS_CHECK_GOTO(tsdbDataFileDoWriteTSData(writer, &row->row), &lino, _exit);
2,147,483,647✔
1871

1872
_exit:
2,147,483,647✔
1873
  if (code) {
2,147,483,647✔
1874
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1875
              tstrerror(code));
1876
  }
1877
  return code;
2,147,483,647✔
1878
}
1879

1880
int32_t tsdbDataFileWriteBlockData(SDataFileWriter *writer, SBlockData *bData) {
41,397,692✔
1881
  if (bData->nRow == 0) {
41,397,692✔
1882
    return 0;
×
1883
  }
1884

1885
  int32_t code = 0;
41,397,210✔
1886
  int32_t lino = 0;
41,397,210✔
1887

1888
  if (!bData->uid) {
41,397,540✔
1889
    return TSDB_CODE_INVALID_PARA;
×
1890
  }
1891

1892
  if (!writer->ctx->opened) {
41,397,083✔
1893
    TAOS_CHECK_GOTO(tsdbDataFileWriterDoOpen(writer), &lino, _exit);
648,939✔
1894
  }
1895

1896
  if (writer->fd[TSDB_FTYPE_DATA] == NULL) {
41,395,393✔
1897
    TAOS_CHECK_GOTO(tsdbDataFileWriterOpenDataFD(writer), &lino, _exit);
649,917✔
1898
  }
1899

1900
  if (bData->uid != writer->ctx->tbid->uid) {
41,399,721✔
1901
    TAOS_CHECK_GOTO(tsdbDataFileWriteTableDataEnd(writer), &lino, _exit);
21,806,749✔
1902
    TAOS_CHECK_GOTO(tsdbDataFileWriteTableDataBegin(writer, (TABLEID *)bData), &lino, _exit);
21,803,302✔
1903
  }
1904

1905
  if (writer->ctx->tbHasOldData) {
41,398,289✔
1906
    STsdbRowKey key;
2,422,976✔
1907

1908
    tsdbRowGetKey(&tsdbRowFromBlockData(bData, 0), &key);
2,422,976✔
1909
    TAOS_CHECK_GOTO(tsdbDataFileDoWriteTableOldData(writer, &key), &lino, _exit);
2,422,976✔
1910
  }
1911

1912
  if (!writer->ctx->tbHasOldData       //
41,398,413✔
1913
      && writer->blockData->nRow == 0  //
39,229,232✔
1914
  ) {
1915
    TAOS_CHECK_GOTO(tsdbDataFileDoWriteBlockData(writer, bData), &lino, _exit);
39,081,989✔
1916

1917
  } else {
1918
    for (int32_t i = 0; i < bData->nRow; ++i) {
2,147,483,647✔
1919
      TSDBROW row[1] = {tsdbRowFromBlockData(bData, i)};
2,147,483,647✔
1920
      TAOS_CHECK_GOTO(tsdbDataFileDoWriteTSData(writer, row), &lino, _exit);
2,147,483,647✔
1921
    }
1922
  }
1923

1924
_exit:
41,372,783✔
1925
  if (code) {
41,377,122✔
1926
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1927
              tstrerror(code));
1928
  }
1929
  return code;
41,378,300✔
1930
}
1931

1932
int32_t tsdbDataFileFlush(SDataFileWriter *writer) {
1,079,269✔
1933
  if (!writer->ctx->opened) {
1,079,269✔
1934
    return TSDB_CODE_INVALID_PARA;
×
1935
  }
1936

1937
  if (writer->blockData->nRow == 0) return 0;
1,079,269✔
1938
  if (writer->ctx->tbHasOldData) return 0;
1,079,269✔
1939

1940
  return tsdbDataFileDoWriteBlockData(writer, writer->blockData);
945,983✔
1941
}
1942

1943
static int32_t tsdbDataFileWriterOpenTombFD(SDataFileWriter *writer) {
156,191✔
1944
  int32_t code = 0;
156,191✔
1945
  int32_t lino = 0;
156,191✔
1946

1947
  char    fname[TSDB_FILENAME_LEN];
156,191✔
1948
  int32_t ftype = TSDB_FTYPE_TOMB;
156,191✔
1949

1950
  int32_t flag = (TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
156,191✔
1951

1952
  int32_t lcn = writer->files[ftype].lcn;
156,191✔
1953
  tsdbTFileName(writer->config->tsdb, writer->files + ftype, fname);
156,191✔
1954

1955
  TAOS_CHECK_GOTO(tsdbOpenFile(fname, writer->config->tsdb, flag, &writer->fd[ftype], lcn), &lino, _exit);
156,191✔
1956

1957
  uint8_t hdr[TSDB_FHDR_SIZE] = {0};
156,191✔
1958
  SEncryptData *pEncryptData = &(writer->config->tsdb->pVnode->config.tsdbCfg.encryptData);
156,191✔
1959

1960
  TAOS_CHECK_GOTO(tsdbWriteFile(writer->fd[ftype], 0, hdr, TSDB_FHDR_SIZE, pEncryptData), &lino, _exit);
156,191✔
1961
  writer->files[ftype].size += TSDB_FHDR_SIZE;
156,191✔
1962

1963
  if (writer->ctx->reader) {
156,191✔
1964
    TAOS_CHECK_GOTO(tsdbDataFileReadTombBlk(writer->ctx->reader, &writer->ctx->tombBlkArray), &lino, _exit);
152,034✔
1965

1966
    if (TARRAY2_SIZE(writer->ctx->tombBlkArray) > 0) {
152,034✔
1967
      writer->ctx->hasOldTomb = true;
114,387✔
1968
    }
1969

1970
    writer->ctx->tombBlkArrayIdx = 0;
152,034✔
1971
    tTombBlockClear(writer->ctx->tombBlock);
152,034✔
1972
    writer->ctx->tombBlockIdx = 0;
152,034✔
1973
  }
1974

1975
_exit:
156,191✔
1976
  if (code) {
156,191✔
1977
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
1978
              tstrerror(code));
1979
  }
1980
  return code;
156,191✔
1981
}
1982

1983
int32_t tsdbDataFileWriteTombRecord(SDataFileWriter *writer, const STombRecord *record) {
450,529✔
1984
  int32_t code = 0;
450,529✔
1985
  int32_t lino = 0;
450,529✔
1986

1987
  if (!writer->ctx->opened) {
450,529✔
1988
    TAOS_CHECK_GOTO(tsdbDataFileWriterDoOpen(writer), &lino, _exit);
154,331✔
1989
  }
1990

1991
  if (writer->fd[TSDB_FTYPE_TOMB] == NULL) {
450,529✔
1992
    TAOS_CHECK_GOTO(tsdbDataFileWriterOpenTombFD(writer), &lino, _exit);
156,191✔
1993
  }
1994

1995
  TAOS_CHECK_GOTO(tsdbDataFileDoWriteTombRecord(writer, record), &lino, _exit);
450,529✔
1996

1997
_exit:
450,529✔
1998
  if (code) {
450,529✔
1999
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, __FILE__, lino,
×
2000
              tstrerror(code));
2001
  }
2002
  return code;
450,529✔
2003
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc