• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4844

09 Nov 2025 03:44PM UTC coverage: 63.058% (-0.5%) from 63.514%
#4844

push

travis-ci

web-flow
test: minor changes (#33510)

117164 of 185804 relevant lines covered (63.06%)

115657269.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.22
/source/dnode/vnode/src/tsdb/tsdbSnapshot.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tsdb.h"
17
#include "tsdbDataFileRW.h"
18
#include "tsdbFS2.h"
19
#include "tsdbFSetRW.h"
20
#include "tsdbIter.h"
21
#include "tsdbSttFileRW.h"
22

23
extern int32_t tsdbUpdateTableSchema(SMeta* pMeta, int64_t suid, int64_t uid, SSkmInfo* pSkmInfo);
24

25
// STsdbSnapReader ========================================
26
struct STsdbSnapReader {
27
  STsdb*  tsdb;
28
  int64_t sver;
29
  int64_t ever;
30
  int8_t  type;
31

32
  SBuffer  buffers[10];
33
  SSkmInfo skmTb[1];
34

35
  TFileSetRangeArray* fsrArr;
36

37
  // context
38
  struct {
39
    int32_t         fsrArrIdx;
40
    STFileSetRange* fsr;
41
    bool            isDataDone;
42
    bool            isTombDone;
43
  } ctx[1];
44

45
  // reader
46
  SDataFileReader*    dataReader;
47
  TSttFileReaderArray sttReaderArr[1];
48

49
  // iter
50
  TTsdbIterArray dataIterArr[1];
51
  SIterMerger*   dataIterMerger;
52
  TTsdbIterArray tombIterArr[1];
53
  SIterMerger*   tombIterMerger;
54

55
  // data
56
  SBlockData blockData[1];
57
  STombBlock tombBlock[1];
58
};
59

60
static void tsdbSnapReadFileSetCloseReader(STsdbSnapReader* reader) {
923✔
61
  TARRAY2_CLEAR(reader->sttReaderArr, tsdbSttFileReaderClose);
923✔
62
  tsdbDataFileReaderClose(&reader->dataReader);
923✔
63
  return;
923✔
64
}
65

66
static int32_t tsdbSnapReadFileSetOpenReader(STsdbSnapReader* reader) {
923✔
67
  int32_t code = 0;
923✔
68
  int32_t lino = 0;
923✔
69

70
  // data
71
  SDataFileReaderConfig config = {
923✔
72
      .tsdb = reader->tsdb,
923✔
73
      .szPage = reader->tsdb->pVnode->config.tsdbPageSize,
923✔
74
      .buffers = reader->buffers,
923✔
75
  };
76
  bool hasDataFile = false;
923✔
77
  for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ftype++) {
4,615✔
78
    if (reader->ctx->fsr->fset->farr[ftype] != NULL) {
3,692✔
79
      hasDataFile = true;
2,769✔
80
      config.files[ftype].exist = true;
2,769✔
81
      config.files[ftype].file = reader->ctx->fsr->fset->farr[ftype]->f[0];
2,769✔
82
    }
83
  }
84

85
  if (hasDataFile) {
923✔
86
    code = tsdbDataFileReaderOpen(NULL, &config, &reader->dataReader);
923✔
87
    TSDB_CHECK_CODE(code, lino, _exit);
923✔
88
  }
89

90
  // stt
91
  SSttLvl* lvl;
92
  TARRAY2_FOREACH(reader->ctx->fsr->fset->lvlArr, lvl) {
923✔
93
    STFileObj* fobj;
94
    TARRAY2_FOREACH(lvl->fobjArr, fobj) {
×
95
      SSttFileReader*      sttReader;
×
96
      SSttFileReaderConfig config = {
×
97
          .tsdb = reader->tsdb,
×
98
          .szPage = reader->tsdb->pVnode->config.tsdbPageSize,
×
99
          .file = fobj->f[0],
100
          .buffers = reader->buffers,
×
101
      };
102

103
      code = tsdbSttFileReaderOpen(fobj->fname, &config, &sttReader);
×
104
      TSDB_CHECK_CODE(code, lino, _exit);
×
105

106
      if ((code = TARRAY2_APPEND(reader->sttReaderArr, sttReader))) {
×
107
        tsdbSttFileReaderClose(&sttReader);
×
108
        TSDB_CHECK_CODE(code, lino, _exit);
×
109
      }
110
    }
111
  }
112

113
_exit:
923✔
114
  if (code) {
923✔
115
    tsdbSnapReadFileSetCloseReader(reader);
×
116
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
×
117
  }
118
  return code;
923✔
119
}
120

121
static int32_t tsdbSnapReadFileSetOpenIter(STsdbSnapReader* reader) {
923✔
122
  int32_t code = 0;
923✔
123
  int32_t lino = 0;
923✔
124

125
  STsdbIter*      iter;
923✔
126
  STsdbIterConfig config = {
923✔
127
      .filterByVersion = true,
128
      .verRange[0] = reader->ctx->fsr->sver,
923✔
129
      .verRange[1] = reader->ctx->fsr->ever,
923✔
130
  };
131

132
  // data file
133
  if (reader->dataReader) {
923✔
134
    // data
135
    config.type = TSDB_ITER_TYPE_DATA;
923✔
136
    config.dataReader = reader->dataReader;
923✔
137

138
    code = tsdbIterOpen(&config, &iter);
923✔
139
    TSDB_CHECK_CODE(code, lino, _exit);
923✔
140

141
    code = TARRAY2_APPEND(reader->dataIterArr, iter);
923✔
142
    TSDB_CHECK_CODE(code, lino, _exit);
923✔
143

144
    // tomb
145
    config.type = TSDB_ITER_TYPE_DATA_TOMB;
923✔
146
    config.dataReader = reader->dataReader;
923✔
147

148
    code = tsdbIterOpen(&config, &iter);
923✔
149
    TSDB_CHECK_CODE(code, lino, _exit);
923✔
150

151
    code = TARRAY2_APPEND(reader->tombIterArr, iter);
923✔
152
    TSDB_CHECK_CODE(code, lino, _exit);
923✔
153
  }
154

155
  // stt file
156
  SSttFileReader* sttReader;
157
  TARRAY2_FOREACH(reader->sttReaderArr, sttReader) {
923✔
158
    // data
159
    config.type = TSDB_ITER_TYPE_STT;
×
160
    config.sttReader = sttReader;
×
161

162
    code = tsdbIterOpen(&config, &iter);
×
163
    TSDB_CHECK_CODE(code, lino, _exit);
×
164

165
    code = TARRAY2_APPEND(reader->dataIterArr, iter);
×
166
    TSDB_CHECK_CODE(code, lino, _exit);
×
167

168
    // tomb
169
    config.type = TSDB_ITER_TYPE_STT_TOMB;
×
170
    config.sttReader = sttReader;
×
171

172
    code = tsdbIterOpen(&config, &iter);
×
173
    TSDB_CHECK_CODE(code, lino, _exit);
×
174

175
    code = TARRAY2_APPEND(reader->tombIterArr, iter);
×
176
    TSDB_CHECK_CODE(code, lino, _exit);
×
177
  }
178

179
  // merger
180
  code = tsdbIterMergerOpen(reader->dataIterArr, &reader->dataIterMerger, false);
923✔
181
  TSDB_CHECK_CODE(code, lino, _exit);
923✔
182

183
  code = tsdbIterMergerOpen(reader->tombIterArr, &reader->tombIterMerger, true);
923✔
184
  TSDB_CHECK_CODE(code, lino, _exit);
923✔
185

186
_exit:
923✔
187
  if (code) {
923✔
188
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
×
189
  }
190
  return code;
923✔
191
}
192

193
static void tsdbSnapReadFileSetCloseIter(STsdbSnapReader* reader) {
923✔
194
  tsdbIterMergerClose(&reader->dataIterMerger);
923✔
195
  tsdbIterMergerClose(&reader->tombIterMerger);
923✔
196
  TARRAY2_CLEAR(reader->dataIterArr, tsdbIterClose);
1,846✔
197
  TARRAY2_CLEAR(reader->tombIterArr, tsdbIterClose);
1,846✔
198
  return;
923✔
199
}
200

201
static int32_t tsdbSnapReadRangeBegin(STsdbSnapReader* reader) {
1,846✔
202
  int32_t code = 0;
1,846✔
203
  int32_t lino = 0;
1,846✔
204

205
  if (reader->ctx->fsrArrIdx < TARRAY2_SIZE(reader->fsrArr)) {
1,846✔
206
    reader->ctx->fsr = TARRAY2_GET(reader->fsrArr, reader->ctx->fsrArrIdx++);
923✔
207
    reader->ctx->isDataDone = false;
923✔
208
    reader->ctx->isTombDone = false;
923✔
209

210
    code = tsdbSnapReadFileSetOpenReader(reader);
923✔
211
    TSDB_CHECK_CODE(code, lino, _exit);
923✔
212

213
    code = tsdbSnapReadFileSetOpenIter(reader);
923✔
214
    TSDB_CHECK_CODE(code, lino, _exit);
923✔
215
  }
216

217
_exit:
1,846✔
218
  if (code) {
1,846✔
219
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
×
220
  }
221
  return code;
1,846✔
222
}
223

224
static int32_t tsdbSnapReadRangeEnd(STsdbSnapReader* reader) {
923✔
225
  tsdbSnapReadFileSetCloseIter(reader);
923✔
226
  tsdbSnapReadFileSetCloseReader(reader);
923✔
227
  reader->ctx->fsr = NULL;
923✔
228
  return 0;
923✔
229
}
230

231
static int32_t tsdbSnapCmprData(STsdbSnapReader* reader, uint8_t** data) {
5,691✔
232
  int32_t code = 0;
5,691✔
233
  int32_t lino = 0;
5,691✔
234

235
  SColCompressInfo info;
236

237
  SColCompressInfo cmprInfo = {.pColCmpr = NULL, .defaultCmprAlg = NO_COMPRESSION};
5,691✔
238
  code = tBlockDataCompress(reader->blockData, (void*)&cmprInfo, reader->buffers, reader->buffers + 4);
5,691✔
239
  TSDB_CHECK_CODE(code, lino, _exit);
5,691✔
240

241
  int32_t size = 0;
5,691✔
242
  for (int i = 0; i < 4; i++) {
28,455✔
243
    size += reader->buffers[i].size;
22,764✔
244
  }
245
  *data = taosMemoryMalloc(sizeof(SSnapDataHdr) + size);
5,691✔
246
  if (*data == NULL) {
5,691✔
247
    code = terrno;
×
248
    TSDB_CHECK_CODE(code, lino, _exit);
×
249
  }
250

251
  SSnapDataHdr* pHdr = (SSnapDataHdr*)*data;
5,691✔
252
  uint8_t*      pBuf = pHdr->data;
5,691✔
253

254
  pHdr->type = reader->type;
5,691✔
255
  pHdr->size = size;
5,691✔
256
  for (int i = 0; i < 4; i++) {
28,455✔
257
    memcpy(pBuf, reader->buffers[i].data, reader->buffers[i].size);
22,764✔
258
    pBuf += reader->buffers[i].size;
22,764✔
259
  }
260

261
_exit:
5,691✔
262
  if (code) {
5,691✔
263
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), lino, code);
×
264
  }
265
  return code;
5,691✔
266
}
267

268
static int64_t tBlockDataSize(SBlockData* pBlockData) {
4,690,312✔
269
  int64_t nData = 0;
4,690,312✔
270
  for (int32_t iCol = 0; iCol < pBlockData->nColData; iCol++) {
50,615,620✔
271
    SColData* pColData = tBlockDataGetColDataByIdx(pBlockData, iCol);
45,925,308✔
272
    nData += pColData->nData;
45,925,308✔
273
  }
274
  return nData;
4,690,312✔
275
}
276

277
static int32_t tsdbSnapReadTimeSeriesData(STsdbSnapReader* reader, uint8_t** data) {
6,614✔
278
  int32_t   code = 0;
6,614✔
279
  int32_t   lino = 0;
6,614✔
280
  SMetaInfo info;
6,614✔
281

282
  tBlockDataReset(reader->blockData);
6,614✔
283

284
  TABLEID tbid[1] = {0};
6,614✔
285
  for (SRowInfo* row; (row = tsdbIterMergerGetData(reader->dataIterMerger));) {
75,051,846✔
286
    // skip dropped table
287
    if (row->uid != tbid->uid) {
75,050,000✔
288
      tbid->suid = row->suid;
13,838✔
289
      tbid->uid = row->uid;
13,838✔
290
      if (metaGetInfo(reader->tsdb->pVnode->pMeta, tbid->uid, &info, NULL) != 0) {
13,838✔
291
        code = tsdbIterMergerSkipTableData(reader->dataIterMerger, tbid);
×
292
        TSDB_CHECK_CODE(code, lino, _exit);
×
293
        continue;
×
294
      }
295
    }
296

297
    if (reader->blockData->suid == 0 && reader->blockData->uid == 0) {
75,050,000✔
298
      code = tsdbUpdateSkmTb(reader->tsdb, (TABLEID*)row, reader->skmTb);
5,691✔
299
      TSDB_CHECK_CODE(code, lino, _exit);
5,691✔
300

301
      TABLEID tbid1 = {
5,691✔
302
          .suid = row->suid,
5,691✔
303
          .uid = row->suid ? 0 : row->uid,
5,691✔
304
      };
305
      code = tBlockDataInit(reader->blockData, &tbid1, reader->skmTb->pTSchema, NULL, 0);
5,691✔
306
      TSDB_CHECK_CODE(code, lino, _exit);
5,691✔
307
    }
308

309
    if (!TABLE_SAME_SCHEMA(reader->blockData->suid, reader->blockData->uid, row->suid, row->uid)) {
75,050,000✔
310
      break;
×
311
    }
312

313
    code = tBlockDataAppendRow(reader->blockData, &row->row, NULL, row->uid);
75,050,000✔
314
    TSDB_CHECK_CODE(code, lino, _exit);
75,050,000✔
315

316
    code = tsdbIterMergerNext(reader->dataIterMerger);
75,050,000✔
317
    TSDB_CHECK_CODE(code, lino, _exit);
75,050,000✔
318

319
    if (!(reader->blockData->nRow % 16)) {
75,050,000✔
320
      int64_t nData = tBlockDataSize(reader->blockData);
4,690,312✔
321
      if (nData >= TSDB_SNAP_DATA_PAYLOAD_SIZE) {
4,690,312✔
322
        break;
4,768✔
323
      }
324
    }
325
  }
326

327
  if (reader->blockData->nRow > 0) {
6,614✔
328
    code = tsdbSnapCmprData(reader, data);
5,691✔
329
    TSDB_CHECK_CODE(code, lino, _exit);
5,691✔
330
  }
331

332
_exit:
6,614✔
333
  if (code) {
6,614✔
334
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
×
335
  }
336
  return code;
6,614✔
337
}
338

339
static int32_t tsdbSnapCmprTombData(STsdbSnapReader* reader, uint8_t** data) {
×
340
  int32_t code = 0;
×
341
  int32_t lino = 0;
×
342

343
  int64_t size = 0;
×
344
  for (int32_t i = 0; i < ARRAY_SIZE(reader->tombBlock->buffers); i++) {
×
345
    size += reader->tombBlock->buffers[i].size;
×
346
  }
347

348
  data[0] = taosMemoryMalloc(size + sizeof(SSnapDataHdr));
×
349
  if (data[0] == NULL) {
×
350
    code = terrno;
×
351
    TSDB_CHECK_CODE(code, lino, _exit);
×
352
  }
353

354
  SSnapDataHdr* hdr = (SSnapDataHdr*)(data[0]);
×
355
  hdr->type = SNAP_DATA_DEL;
×
356
  hdr->size = size;
×
357

358
  uint8_t* tdata = hdr->data;
×
359
  for (int32_t i = 0; i < ARRAY_SIZE(reader->tombBlock->buffers); i++) {
×
360
    memcpy(tdata, reader->tombBlock->buffers[i].data, reader->tombBlock->buffers[i].size);
×
361
    tdata += reader->tombBlock->buffers[i].size;
×
362
  }
363

364
_exit:
×
365
  if (code) {
×
366
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
×
367
  }
368
  return code;
×
369
}
370

371
static int32_t tsdbSnapReadTombData(STsdbSnapReader* reader, uint8_t** data) {
923✔
372
  int32_t   code = 0;
923✔
373
  int32_t   lino = 0;
923✔
374
  SMetaInfo info;
923✔
375

376
  tTombBlockClear(reader->tombBlock);
923✔
377

378
  TABLEID tbid[1] = {0};
923✔
379
  for (STombRecord* record; (record = tsdbIterMergerGetTombRecord(reader->tombIterMerger)) != NULL;) {
923✔
380
    if (record->uid != tbid->uid) {
×
381
      tbid->suid = record->suid;
×
382
      tbid->uid = record->uid;
×
383
      if (metaGetInfo(reader->tsdb->pVnode->pMeta, tbid->uid, &info, NULL) != 0) {
×
384
        code = tsdbIterMergerSkipTableData(reader->tombIterMerger, tbid);
×
385
        TSDB_CHECK_CODE(code, lino, _exit);
×
386
        continue;
×
387
      }
388
    }
389

390
    code = tTombBlockPut(reader->tombBlock, record);
×
391
    TSDB_CHECK_CODE(code, lino, _exit);
×
392

393
    code = tsdbIterMergerNext(reader->tombIterMerger);
×
394
    TSDB_CHECK_CODE(code, lino, _exit);
×
395

396
    if (TOMB_BLOCK_SIZE(reader->tombBlock) >= 81920) {
×
397
      break;
×
398
    }
399
  }
400

401
  if (TOMB_BLOCK_SIZE(reader->tombBlock) > 0) {
923✔
402
    code = tsdbSnapCmprTombData(reader, data);
×
403
    TSDB_CHECK_CODE(code, lino, _exit);
×
404
  }
405

406
_exit:
923✔
407
  if (code) {
923✔
408
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
×
409
  }
410
  return code;
923✔
411
}
412

413
int32_t tsdbSnapReaderOpen(STsdb* tsdb, int64_t sver, int64_t ever, int8_t type, void* pRanges,
923✔
414
                           STsdbSnapReader** reader) {
415
  int32_t code = 0;
923✔
416
  int32_t lino = 0;
923✔
417

418
  reader[0] = (STsdbSnapReader*)taosMemoryCalloc(1, sizeof(*reader[0]));
923✔
419
  if (reader[0] == NULL) return terrno;
923✔
420

421
  reader[0]->tsdb = tsdb;
923✔
422
  reader[0]->sver = sver;
923✔
423
  reader[0]->ever = ever;
923✔
424
  reader[0]->type = type;
923✔
425

426
  code = tsdbFSCreateRefRangedSnapshot(tsdb->pFS, sver, ever, (TFileSetRangeArray*)pRanges, &reader[0]->fsrArr);
923✔
427
  TSDB_CHECK_CODE(code, lino, _exit);
923✔
428

429
_exit:
923✔
430
  if (code) {
923✔
431
    tsdbError("vgId:%d %s failed at %s:%d since %s, sver:%" PRId64 " ever:%" PRId64 " type:%d", TD_VID(tsdb->pVnode),
×
432
              __func__, __FILE__, lino, tstrerror(code), sver, ever, type);
433
    tsdbTFileSetRangeArrayDestroy(&reader[0]->fsrArr);
×
434
    taosMemoryFree(reader[0]);
×
435
    reader[0] = NULL;
×
436
  } else {
437
    tsdbInfo("vgId:%d, tsdb snapshot incremental reader opened. sver:%" PRId64 " ever:%" PRId64 " type:%d",
923✔
438
             TD_VID(tsdb->pVnode), sver, ever, type);
439
  }
440
  return code;
923✔
441
}
442

443
void tsdbSnapReaderClose(STsdbSnapReader** reader) {
923✔
444
  if (reader[0] == NULL) {
923✔
445
    return;
×
446
  }
447

448
  int32_t code = 0;
923✔
449

450
  STsdb* tsdb = reader[0]->tsdb;
923✔
451

452
  tTombBlockDestroy(reader[0]->tombBlock);
923✔
453
  tBlockDataDestroy(reader[0]->blockData);
923✔
454

455
  tsdbIterMergerClose(&reader[0]->dataIterMerger);
923✔
456
  tsdbIterMergerClose(&reader[0]->tombIterMerger);
923✔
457
  TARRAY2_DESTROY(reader[0]->dataIterArr, tsdbIterClose);
923✔
458
  TARRAY2_DESTROY(reader[0]->tombIterArr, tsdbIterClose);
923✔
459
  TARRAY2_DESTROY(reader[0]->sttReaderArr, tsdbSttFileReaderClose);
923✔
460
  tsdbDataFileReaderClose(&reader[0]->dataReader);
923✔
461

462
  tsdbFSDestroyRefRangedSnapshot(&reader[0]->fsrArr);
923✔
463
  tDestroyTSchema(reader[0]->skmTb->pTSchema);
923✔
464

465
  for (int32_t i = 0; i < ARRAY_SIZE(reader[0]->buffers); ++i) {
10,153✔
466
    tBufferDestroy(reader[0]->buffers + i);
9,230✔
467
  }
468

469
  taosMemoryFree(reader[0]);
923✔
470
  reader[0] = NULL;
923✔
471

472
  return;
923✔
473
}
474

475
int32_t tsdbSnapRead(STsdbSnapReader* reader, uint8_t** data) {
6,614✔
476
  int32_t code = 0;
6,614✔
477
  int32_t lino = 0;
6,614✔
478

479
  data[0] = NULL;
6,614✔
480

481
  for (;;) {
482
    if (reader->ctx->fsr == NULL) {
7,537✔
483
      code = tsdbSnapReadRangeBegin(reader);
1,846✔
484
      TSDB_CHECK_CODE(code, lino, _exit);
1,846✔
485

486
      if (reader->ctx->fsr == NULL) {
1,846✔
487
        break;
923✔
488
      }
489
    }
490

491
    if (!reader->ctx->isDataDone) {
6,614✔
492
      code = tsdbSnapReadTimeSeriesData(reader, data);
6,614✔
493
      TSDB_CHECK_CODE(code, lino, _exit);
6,614✔
494
      if (data[0]) {
6,614✔
495
        goto _exit;
5,691✔
496
      } else {
497
        reader->ctx->isDataDone = true;
923✔
498
      }
499
    }
500

501
    if (!reader->ctx->isTombDone) {
923✔
502
      code = tsdbSnapReadTombData(reader, data);
923✔
503
      TSDB_CHECK_CODE(code, lino, _exit);
923✔
504
      if (data[0]) {
923✔
505
        goto _exit;
×
506
      } else {
507
        reader->ctx->isTombDone = true;
923✔
508
      }
509
    }
510

511
    code = tsdbSnapReadRangeEnd(reader);
923✔
512
    TSDB_CHECK_CODE(code, lino, _exit);
923✔
513
  }
514

515
_exit:
6,614✔
516
  if (code) {
6,614✔
517
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
×
518
  } else {
519
    tsdbDebug("vgId:%d %s done", TD_VID(reader->tsdb->pVnode), __func__);
6,614✔
520
  }
521
  return code;
6,614✔
522
}
523

524
// STsdbSnapWriter ========================================
525
struct STsdbSnapWriter {
526
  STsdb*  tsdb;
527
  int64_t sver;
528
  int64_t ever;
529
  int32_t minutes;
530
  int8_t  precision;
531
  int32_t minRow;
532
  int32_t maxRow;
533
  int8_t  cmprAlg;
534
  int64_t commitID;
535
  int32_t szPage;
536
  int64_t compactVersion;
537
  int64_t now;
538
  SBuffer buffers[10];
539

540
  TFileSetArray* fsetArr;
541
  TFileOpArray   fopArr[1];
542

543
  struct {
544
    bool       fsetWriteBegin;
545
    int32_t    fid;
546
    STFileSet* fset;
547
    int32_t    expLevel;
548
    bool       hasData;  // if have time series data
549
    bool       hasTomb;  // if have tomb data
550

551
    // reader
552
    SDataFileReader*    dataReader;
553
    TSttFileReaderArray sttReaderArr[1];
554

555
    // iter/merger
556
    TTsdbIterArray dataIterArr[1];
557
    SIterMerger*   dataIterMerger;
558
    TTsdbIterArray tombIterArr[1];
559
    SIterMerger*   tombIterMerger;
560

561
    // writer
562
    bool         toSttOnly;
563
    SFSetWriter* fsetWriter;
564
  } ctx[1];
565
};
566

567
// APIs
568
static int32_t tsdbSnapWriteTimeSeriesRow(STsdbSnapWriter* writer, SRowInfo* row) {
35,068,226✔
569
  int32_t   code = 0;
35,068,226✔
570
  int32_t   lino = 0;
35,068,226✔
571
  TABLEID   tbid = {0};
35,068,226✔
572
  SMetaInfo info;
35,068,226✔
573

574
  while (writer->ctx->hasData) {
58,918,226✔
575
    SRowInfo* row1;
576
    for (;;) {
577
      row1 = tsdbIterMergerGetData(writer->ctx->dataIterMerger);
55,788,226✔
578
      if (row1 == NULL) {
55,788,226✔
579
        writer->ctx->hasData = false;
626✔
580
      } else if (row1->uid != tbid.uid) {
55,787,600✔
581
        tbid.suid = row1->suid;
31,942,802✔
582
        tbid.uid = row1->uid;
31,942,802✔
583
        if (metaGetInfo(writer->tsdb->pVnode->pMeta, tbid.uid, &info, NULL) != 0) {
31,942,802✔
584
          code = tsdbIterMergerSkipTableData(writer->ctx->dataIterMerger, &tbid);
×
585
          TSDB_CHECK_CODE(code, lino, _exit);
×
586
          continue;
×
587
        }
588
      }
589
      break;
55,788,226✔
590
    }
591

592
    if (writer->ctx->hasData == false) {
55,788,226✔
593
      break;
626✔
594
    }
595

596
    int32_t c = tRowInfoCmprFn(row1, row);
55,787,600✔
597
    if (c <= 0) {
55,787,600✔
598
      code = tsdbFSetWriteRow(writer->ctx->fsetWriter, row1);
23,850,000✔
599
      TSDB_CHECK_CODE(code, lino, _exit);
23,850,000✔
600

601
      code = tsdbIterMergerNext(writer->ctx->dataIterMerger);
23,850,000✔
602
      TSDB_CHECK_CODE(code, lino, _exit);
23,850,000✔
603
    } else {
604
      break;
31,937,600✔
605
    }
606
  }
607

608
  if (row->suid == INT64_MAX) {
35,068,226✔
609
    goto _exit;
626✔
610
  }
611

612
  code = tsdbFSetWriteRow(writer->ctx->fsetWriter, row);
35,067,600✔
613
  TSDB_CHECK_CODE(code, lino, _exit);
35,067,600✔
614

615
_exit:
35,068,226✔
616
  if (code) {
35,068,226✔
617
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
618
  }
619
  return code;
35,068,226✔
620
}
621

622
static int32_t tsdbSnapWriteFileSetOpenReader(STsdbSnapWriter* writer) {
922✔
623
  int32_t code = 0;
922✔
624
  int32_t lino = 0;
922✔
625

626
  writer->ctx->toSttOnly = false;
922✔
627
  if (writer->ctx->fset) {
922✔
628
#if 0
629
    // open data reader
630
    SDataFileReaderConfig dataFileReaderConfig = {
631
        .tsdb = writer->tsdb,
632
        .buffers = writer->buffers,
633
        .szPage = writer->szPage,
634
    };
635

636
    for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ++ftype) {
637
      if (writer->ctx->fset->farr[ftype] == NULL) {
638
        continue;
639
      }
640

641
      dataFileReaderConfig.files[ftype].exist = true;
642
      dataFileReaderConfig.files[ftype].file = writer->ctx->fset->farr[ftype]->f[0];
643

644
      STFileOp fileOp = {
645
          .optype = TSDB_FOP_REMOVE,
646
          .fid = writer->ctx->fset->fid,
647
          .of = writer->ctx->fset->farr[ftype]->f[0],
648
      };
649

650
      code = TARRAY2_APPEND(writer->fopArr, fileOp);
651
      TSDB_CHECK_CODE(code, lino, _exit);
652
    }
653

654
    code = tsdbDataFileReaderOpen(NULL, &dataFileReaderConfig, &writer->ctx->dataReader);
655
    TSDB_CHECK_CODE(code, lino, _exit);
656
#endif
657

658
    // open stt reader array
659
    SSttLvl* lvl;
660
    TARRAY2_FOREACH(writer->ctx->fset->lvlArr, lvl) {
1,844✔
661
      if (lvl->level != 0) {
922✔
662
        if (TARRAY2_SIZE(lvl->fobjArr) > 0) {
×
663
          writer->ctx->toSttOnly = true;
×
664
        }
665

666
        continue;  // Only merge level 0
×
667
      }
668

669
      STFileObj* fobj;
670
      TARRAY2_FOREACH(lvl->fobjArr, fobj) {
1,844✔
671
        SSttFileReader*      reader;
922✔
672
        SSttFileReaderConfig sttFileReaderConfig = {
1,844✔
673
            .tsdb = writer->tsdb,
922✔
674
            .szPage = writer->szPage,
922✔
675
            .buffers = writer->buffers,
922✔
676
            .file = fobj->f[0],
677
        };
678

679
        code = tsdbSttFileReaderOpen(fobj->fname, &sttFileReaderConfig, &reader);
922✔
680
        TSDB_CHECK_CODE(code, lino, _exit);
922✔
681

682
        code = TARRAY2_APPEND(writer->ctx->sttReaderArr, reader);
922✔
683
        TSDB_CHECK_CODE(code, lino, _exit);
922✔
684

685
        STFileOp fileOp = {
1,844✔
686
            .optype = TSDB_FOP_REMOVE,
687
            .fid = fobj->f->fid,
922✔
688
            .of = fobj->f[0],
689
        };
690

691
        code = TARRAY2_APPEND(writer->fopArr, fileOp);
922✔
692
        TSDB_CHECK_CODE(code, lino, _exit);
922✔
693
      }
694
    }
695
  }
696

697
_exit:
922✔
698
  if (code) {
922✔
699
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
700
  }
701
  return code;
922✔
702
}
703

704
static int32_t tsdbSnapWriteFileSetCloseReader(STsdbSnapWriter* writer) {
922✔
705
  TARRAY2_CLEAR(writer->ctx->sttReaderArr, tsdbSttFileReaderClose);
1,844✔
706
  tsdbDataFileReaderClose(&writer->ctx->dataReader);
922✔
707
  return 0;
922✔
708
}
709

710
static int32_t tsdbSnapWriteFileSetOpenIter(STsdbSnapWriter* writer) {
922✔
711
  int32_t code = 0;
922✔
712
  int32_t lino = 0;
922✔
713

714
  // data ieter
715
  if (writer->ctx->dataReader) {
922✔
716
    STsdbIter*      iter;
×
717
    STsdbIterConfig config = {0};
×
718

719
    // data
720
    config.type = TSDB_ITER_TYPE_DATA;
×
721
    config.dataReader = writer->ctx->dataReader;
×
722

723
    code = tsdbIterOpen(&config, &iter);
×
724
    TSDB_CHECK_CODE(code, lino, _exit);
×
725

726
    code = TARRAY2_APPEND(writer->ctx->dataIterArr, iter);
×
727
    TSDB_CHECK_CODE(code, lino, _exit);
×
728

729
    // tome
730
    config.type = TSDB_ITER_TYPE_DATA_TOMB;
×
731
    config.dataReader = writer->ctx->dataReader;
×
732

733
    code = tsdbIterOpen(&config, &iter);
×
734
    TSDB_CHECK_CODE(code, lino, _exit);
×
735

736
    code = TARRAY2_APPEND(writer->ctx->tombIterArr, iter);
×
737
    TSDB_CHECK_CODE(code, lino, _exit);
×
738
  }
739

740
  // stt iter
741
  SSttFileReader* sttFileReader;
742
  TARRAY2_FOREACH(writer->ctx->sttReaderArr, sttFileReader) {
1,844✔
743
    STsdbIter*      iter;
922✔
744
    STsdbIterConfig config = {0};
922✔
745

746
    // data
747
    config.type = TSDB_ITER_TYPE_STT;
922✔
748
    config.sttReader = sttFileReader;
922✔
749

750
    code = tsdbIterOpen(&config, &iter);
922✔
751
    TSDB_CHECK_CODE(code, lino, _exit);
922✔
752

753
    code = TARRAY2_APPEND(writer->ctx->dataIterArr, iter);
922✔
754
    TSDB_CHECK_CODE(code, lino, _exit);
922✔
755

756
    // tomb
757
    config.type = TSDB_ITER_TYPE_STT_TOMB;
922✔
758
    config.sttReader = sttFileReader;
922✔
759

760
    code = tsdbIterOpen(&config, &iter);
922✔
761
    TSDB_CHECK_CODE(code, lino, _exit);
922✔
762

763
    code = TARRAY2_APPEND(writer->ctx->tombIterArr, iter);
922✔
764
    TSDB_CHECK_CODE(code, lino, _exit);
922✔
765
  }
766

767
  // open merger
768
  code = tsdbIterMergerOpen(writer->ctx->dataIterArr, &writer->ctx->dataIterMerger, false);
922✔
769
  TSDB_CHECK_CODE(code, lino, _exit);
922✔
770

771
  code = tsdbIterMergerOpen(writer->ctx->tombIterArr, &writer->ctx->tombIterMerger, true);
922✔
772
  TSDB_CHECK_CODE(code, lino, _exit);
922✔
773

774
_exit:
922✔
775
  if (code) {
922✔
776
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
777
  }
778
  return code;
922✔
779
}
780

781
static int32_t tsdbSnapWriteFileSetCloseIter(STsdbSnapWriter* writer) {
922✔
782
  tsdbIterMergerClose(&writer->ctx->dataIterMerger);
922✔
783
  tsdbIterMergerClose(&writer->ctx->tombIterMerger);
922✔
784
  TARRAY2_CLEAR(writer->ctx->dataIterArr, tsdbIterClose);
1,844✔
785
  TARRAY2_CLEAR(writer->ctx->tombIterArr, tsdbIterClose);
1,844✔
786
  return 0;
922✔
787
}
788

789
static int32_t tsdbSnapWriteFileSetOpenWriter(STsdbSnapWriter* writer) {
922✔
790
  int32_t code = 0;
922✔
791
  int32_t lino = 0;
922✔
792

793
  SFSetWriterConfig config = {
922✔
794
      .tsdb = writer->tsdb,
922✔
795
      .toSttOnly = writer->ctx->toSttOnly,
922✔
796
      .compactVersion = writer->compactVersion,
922✔
797
      .minRow = writer->minRow,
922✔
798
      .maxRow = writer->maxRow,
922✔
799
      .szPage = writer->szPage,
922✔
800
      .cmprAlg = writer->cmprAlg,
922✔
801
      .fid = writer->ctx->fid,
922✔
802
      .cid = writer->commitID,
922✔
803
      .expLevel = writer->ctx->expLevel,
922✔
804
      .level = writer->ctx->toSttOnly ? 1 : 0,
922✔
805
  };
806
  // merge stt files to either data or a new stt file
807
  if (writer->ctx->fset) {
922✔
808
    for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ++ftype) {
4,610✔
809
      if (writer->ctx->fset->farr[ftype] != NULL) {
3,688✔
810
        config.files[ftype].exist = true;
×
811
        config.files[ftype].file = writer->ctx->fset->farr[ftype]->f[0];
×
812
      }
813
    }
814
  }
815

816
  code = tsdbFSetWriterOpen(&config, &writer->ctx->fsetWriter);
922✔
817
  TSDB_CHECK_CODE(code, lino, _exit);
922✔
818

819
_exit:
922✔
820
  if (code) {
922✔
821
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
822
  }
823
  return code;
922✔
824
}
825

826
static int32_t tsdbSnapWriteFileSetCloseWriter(STsdbSnapWriter* writer) {
922✔
827
  return tsdbFSetWriterClose(&writer->ctx->fsetWriter, 0, writer->fopArr);
922✔
828
}
829

830
static int32_t tsdbSnapWriteFileSetBegin(STsdbSnapWriter* writer, int32_t fid) {
922✔
831
  int32_t code = 0;
922✔
832
  int32_t lino = 0;
922✔
833

834
  STFileSet* fset = &(STFileSet){.fid = fid};
922✔
835

836
  writer->ctx->fid = fid;
922✔
837
  STFileSet** fsetPtr = TARRAY2_SEARCH(writer->fsetArr, &fset, tsdbTFileSetCmprFn, TD_EQ);
922✔
838
  writer->ctx->fset = (fsetPtr == NULL) ? NULL : *fsetPtr;
922✔
839

840
  writer->ctx->expLevel = tsdbFidLevel(fid, &writer->tsdb->keepCfg, taosGetTimestampSec());
922✔
841

842
  writer->ctx->hasData = true;
922✔
843
  writer->ctx->hasTomb = true;
922✔
844

845
  code = tsdbSnapWriteFileSetOpenReader(writer);
922✔
846
  TSDB_CHECK_CODE(code, lino, _exit);
922✔
847

848
  code = tsdbSnapWriteFileSetOpenIter(writer);
922✔
849
  TSDB_CHECK_CODE(code, lino, _exit);
922✔
850

851
  code = tsdbSnapWriteFileSetOpenWriter(writer);
922✔
852
  TSDB_CHECK_CODE(code, lino, _exit);
922✔
853

854
  writer->ctx->fsetWriteBegin = true;
922✔
855

856
_exit:
922✔
857
  if (code) {
922✔
858
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
859
  } else {
860
    tsdbInfo("vgId:%d %s succeeded, fid:%d", TD_VID(writer->tsdb->pVnode), __func__, fid);
922✔
861
  }
862
  return code;
922✔
863
}
864

865
static int32_t tsdbSnapWriteTombRecord(STsdbSnapWriter* writer, const STombRecord* record) {
626✔
866
  int32_t code = 0;
626✔
867
  int32_t lino = 0;
626✔
868

869
  while (writer->ctx->hasTomb) {
626✔
870
    STombRecord* record1 = tsdbIterMergerGetTombRecord(writer->ctx->tombIterMerger);
626✔
871
    if (record1 == NULL) {
626✔
872
      writer->ctx->hasTomb = false;
626✔
873
      break;
626✔
874
    }
875

876
    int32_t c = tTombRecordCompare(record1, record);
×
877
    if (c <= 0) {
×
878
      code = tsdbFSetWriteTombRecord(writer->ctx->fsetWriter, record1);
×
879
      TSDB_CHECK_CODE(code, lino, _exit);
×
880
    } else {
881
      break;
×
882
    }
883

884
    code = tsdbIterMergerNext(writer->ctx->tombIterMerger);
×
885
    TSDB_CHECK_CODE(code, lino, _exit);
×
886
  }
887

888
  if (record->suid == INT64_MAX) {
626✔
889
    goto _exit;
626✔
890
  }
891

892
  code = tsdbFSetWriteTombRecord(writer->ctx->fsetWriter, record);
×
893
  TSDB_CHECK_CODE(code, lino, _exit);
×
894

895
_exit:
×
896
  if (code) {
626✔
897
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
898
  }
899
  return code;
626✔
900
}
901

902
static int32_t tsdbSnapWriteFileSetEnd(STsdbSnapWriter* writer) {
1,548✔
903
  if (!writer->ctx->fsetWriteBegin) return 0;
1,548✔
904

905
  int32_t code = 0;
626✔
906
  int32_t lino = 0;
626✔
907

908
  // end timeseries data write
909
  SRowInfo row = {
626✔
910
      .suid = INT64_MAX,
911
      .uid = INT64_MAX,
912
  };
913

914
  code = tsdbSnapWriteTimeSeriesRow(writer, &row);
626✔
915
  TSDB_CHECK_CODE(code, lino, _exit);
626✔
916

917
  // end tombstone data write
918
  STombRecord record = {
626✔
919
      .suid = INT64_MAX,
920
      .uid = INT64_MAX,
921
  };
922

923
  code = tsdbSnapWriteTombRecord(writer, &record);
626✔
924
  TSDB_CHECK_CODE(code, lino, _exit);
626✔
925

926
  // close write
927
  code = tsdbSnapWriteFileSetCloseWriter(writer);
626✔
928
  TSDB_CHECK_CODE(code, lino, _exit);
626✔
929

930
  code = tsdbSnapWriteFileSetCloseIter(writer);
626✔
931
  TSDB_CHECK_CODE(code, lino, _exit);
626✔
932

933
  code = tsdbSnapWriteFileSetCloseReader(writer);
626✔
934
  TSDB_CHECK_CODE(code, lino, _exit);
626✔
935

936
  writer->ctx->fsetWriteBegin = false;
626✔
937

938
_exit:
626✔
939
  if (code) {
626✔
940
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
941
  } else {
942
    tsdbInfo("vgId:%d %s succeeded, fid:%d", TD_VID(writer->tsdb->pVnode), __func__, writer->ctx->fid);
626✔
943
  }
944
  return code;
626✔
945
}
946

947
static int32_t tsdbSnapWriteFileSetAbort(STsdbSnapWriter* writer) {
296✔
948
  if (!writer->ctx->fsetWriteBegin) return 0;
296✔
949

950
  int32_t code = 0;
296✔
951
  int32_t lino = 0;
296✔
952

953
  // close write
954
  code = tsdbSnapWriteFileSetCloseWriter(writer);
296✔
955
  TSDB_CHECK_CODE(code, lino, _exit);
296✔
956

957
  code = tsdbSnapWriteFileSetCloseIter(writer);
296✔
958
  TSDB_CHECK_CODE(code, lino, _exit);
296✔
959

960
  code = tsdbSnapWriteFileSetCloseReader(writer);
296✔
961
  TSDB_CHECK_CODE(code, lino, _exit);
296✔
962

963
  writer->ctx->fsetWriteBegin = false;
296✔
964

965
_exit:
296✔
966
  if (code) {
296✔
967
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
968
  }
969
  return code;
296✔
970
}
971

972
static int32_t tsdbSnapWriteTimeSeriesData(STsdbSnapWriter* writer, SSnapDataHdr* hdr) {
2,419✔
973
  int32_t code = 0;
2,419✔
974
  int32_t lino = 0;
2,419✔
975

976
  SBlockData blockData[1] = {0};
2,419✔
977

978
  SBuffer buffer = {
2,419✔
979
      .capacity = hdr->size,
2,419✔
980
      .data = hdr->data,
2,419✔
981
      .size = hdr->size,
2,419✔
982
  };
983
  SBufferReader br = BUFFER_READER_INITIALIZER(0, &buffer);
2,419✔
984

985
  code = tBlockDataDecompress(&br, blockData, &writer->buffers[0]);
2,419✔
986
  TSDB_CHECK_CODE(code, lino, _exit);
2,419✔
987

988
  int32_t fid = tsdbKeyFid(blockData->aTSKEY[0], writer->minutes, writer->precision);
2,419✔
989
  if (!writer->ctx->fsetWriteBegin || fid != writer->ctx->fid) {
2,419✔
990
    code = tsdbSnapWriteFileSetEnd(writer);
922✔
991
    TSDB_CHECK_CODE(code, lino, _exit);
922✔
992

993
    code = tsdbSnapWriteFileSetBegin(writer, fid);
922✔
994
    TSDB_CHECK_CODE(code, lino, _exit);
922✔
995
  }
996

997
  for (int32_t i = 0; i < blockData->nRow; ++i) {
35,070,019✔
998
    SRowInfo rowInfo = {
35,067,600✔
999
        .suid = blockData->suid,
35,067,600✔
1000
        .uid = blockData->uid ? blockData->uid : blockData->aUid[i],
35,067,600✔
1001
        .row = tsdbRowFromBlockData(blockData, i),
1002
    };
1003

1004
    code = tsdbSnapWriteTimeSeriesRow(writer, &rowInfo);
35,067,600✔
1005
    TSDB_CHECK_CODE(code, lino, _exit);
35,067,600✔
1006
  }
1007

1008
_exit:
2,419✔
1009
  if (code) {
2,419✔
1010
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
1011
  } else {
1012
    tsdbDebug("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64 " nRow:%d", TD_VID(writer->tsdb->pVnode), __func__,
2,419✔
1013
              blockData->suid, blockData->uid, blockData->nRow);
1014
  }
1015
  tBlockDataDestroy(blockData);
2,419✔
1016
  return code;
2,419✔
1017
}
1018

1019
static int32_t tsdbSnapWriteDecmprTombBlock(SSnapDataHdr* hdr, STombBlock* tombBlock) {
×
1020
  int32_t code = 0;
×
1021
  int32_t lino = 0;
×
1022

1023
  tTombBlockClear(tombBlock);
×
1024

1025
  int64_t size = hdr->size;
×
1026
  size = size / TOMB_RECORD_ELEM_NUM;
×
1027
  tombBlock->numOfRecords = size / sizeof(int64_t);
×
1028

1029
  // int64_t* data = (int64_t*)hdr->data;
1030
  uint8_t* data = hdr->data;
×
1031
  for (int32_t i = 0; i < TOMB_RECORD_ELEM_NUM; ++i) {
×
1032
    code = tBufferPut(tombBlock->buffers + i, data, size);
×
1033
    TSDB_CHECK_CODE(code, lino, _exit);
×
1034
    data += size;
×
1035
  }
1036

1037
_exit:
×
1038
  return code;
×
1039
}
1040

1041
static int32_t tsdbSnapWriteTombData(STsdbSnapWriter* writer, SSnapDataHdr* hdr) {
×
1042
  int32_t code = 0;
×
1043
  int32_t lino = 0;
×
1044

1045
  STombRecord record;
×
1046
  STombBlock  tombBlock[1] = {0};
×
1047

1048
  code = tsdbSnapWriteDecmprTombBlock(hdr, tombBlock);
×
1049
  TSDB_CHECK_CODE(code, lino, _exit);
×
1050

1051
  code = tTombBlockGet(tombBlock, 0, &record);
×
1052
  TSDB_CHECK_CODE(code, lino, _exit);
×
1053
  int32_t fid = tsdbKeyFid(record.skey, writer->minutes, writer->precision);
×
1054
  if (!writer->ctx->fsetWriteBegin || fid != writer->ctx->fid) {
×
1055
    code = tsdbSnapWriteFileSetEnd(writer);
×
1056
    TSDB_CHECK_CODE(code, lino, _exit);
×
1057

1058
    code = tsdbSnapWriteFileSetBegin(writer, fid);
×
1059
    TSDB_CHECK_CODE(code, lino, _exit);
×
1060
  }
1061

1062
  if (writer->ctx->hasData) {
×
1063
    SRowInfo row = {
×
1064
        .suid = INT64_MAX,
1065
        .uid = INT64_MAX,
1066
    };
1067

1068
    code = tsdbSnapWriteTimeSeriesRow(writer, &row);
×
1069
    TSDB_CHECK_CODE(code, lino, _exit);
×
1070
  }
1071

1072
  for (int32_t i = 0; i < TOMB_BLOCK_SIZE(tombBlock); ++i) {
×
1073
    code = tTombBlockGet(tombBlock, i, &record);
×
1074
    TSDB_CHECK_CODE(code, lino, _exit);
×
1075

1076
    code = tsdbSnapWriteTombRecord(writer, &record);
×
1077
    TSDB_CHECK_CODE(code, lino, _exit);
×
1078
  }
1079

1080
  tTombBlockDestroy(tombBlock);
×
1081

1082
_exit:
×
1083
  if (code) {
×
1084
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
1085
  }
1086
  return code;
×
1087
}
1088

1089
int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, void* pRanges, STsdbSnapWriter** writer) {
922✔
1090
  int32_t code = 0;
922✔
1091
  int32_t lino = 0;
922✔
1092

1093
  // start to write
1094
  writer[0] = taosMemoryCalloc(1, sizeof(*writer[0]));
922✔
1095
  if (writer[0] == NULL) return terrno;
922✔
1096

1097
  writer[0]->tsdb = pTsdb;
922✔
1098
  writer[0]->sver = sver;
922✔
1099
  writer[0]->ever = ever;
922✔
1100
  writer[0]->minutes = pTsdb->keepCfg.days;
922✔
1101
  writer[0]->precision = pTsdb->keepCfg.precision;
922✔
1102
  writer[0]->minRow = pTsdb->pVnode->config.tsdbCfg.minRows;
922✔
1103
  writer[0]->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows;
922✔
1104
  writer[0]->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression;
922✔
1105
  writer[0]->commitID = tsdbFSAllocEid(pTsdb->pFS);
922✔
1106
  writer[0]->szPage = pTsdb->pVnode->config.tsdbPageSize;
922✔
1107
  writer[0]->compactVersion = INT64_MAX;
922✔
1108
  writer[0]->now = taosGetTimestampMs();
1,844✔
1109

1110
  code =
1111
      tsdbFSCreateCopyRangedSnapshot(pTsdb->pFS, (TFileSetRangeArray*)pRanges, &writer[0]->fsetArr, writer[0]->fopArr);
922✔
1112
  TSDB_CHECK_CODE(code, lino, _exit);
922✔
1113

1114
_exit:
922✔
1115
  if (code) {
922✔
1116
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
×
1117
  } else {
1118
    tsdbInfo("vgId:%d %s done, sver:%" PRId64 " ever:%" PRId64, TD_VID(pTsdb->pVnode), __func__, sver, ever);
922✔
1119
  }
1120
  return code;
922✔
1121
}
1122

1123
int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* writer, bool rollback) {
922✔
1124
  int32_t code = 0;
922✔
1125
  int32_t lino = 0;
922✔
1126

1127
  if (!rollback) {
922✔
1128
    code = tsdbSnapWriteFileSetEnd(writer);
626✔
1129
    TSDB_CHECK_CODE(code, lino, _exit);
626✔
1130

1131
    code = tsdbFSEditBegin(writer->tsdb->pFS, writer->fopArr, TSDB_FEDIT_COMMIT);
626✔
1132
    TSDB_CHECK_CODE(code, lino, _exit);
626✔
1133
  } else {
1134
    code = tsdbSnapWriteFileSetAbort(writer);
296✔
1135
    TSDB_CHECK_CODE(code, lino, _exit);
296✔
1136

1137
    code = tsdbFSEditBegin(writer->tsdb->pFS, writer->fopArr, TSDB_FEDIT_COMMIT);
296✔
1138
    TSDB_CHECK_CODE(code, lino, _exit);
296✔
1139
  }
1140

1141
_exit:
296✔
1142
  if (code) {
922✔
1143
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
1144
  } else {
1145
    tsdbDebug("vgId:%d %s done", TD_VID(writer->tsdb->pVnode), __func__);
922✔
1146
  }
1147
  return code;
922✔
1148
}
1149

1150
int32_t tsdbSnapWriterClose(STsdbSnapWriter** writer, int8_t rollback) {
922✔
1151
  if (writer[0] == NULL) return 0;
922✔
1152

1153
  int32_t code = 0;
922✔
1154
  int32_t lino = 0;
922✔
1155

1156
  STsdb* tsdb = writer[0]->tsdb;
922✔
1157

1158
  if (rollback) {
922✔
1159
    code = tsdbFSEditAbort(writer[0]->tsdb->pFS);
296✔
1160
    TSDB_CHECK_CODE(code, lino, _exit);
296✔
1161
  } else {
1162
    (void)taosThreadMutexLock(&writer[0]->tsdb->mutex);
626✔
1163

1164
    code = tsdbFSEditCommit(writer[0]->tsdb->pFS);
626✔
1165
    if (code) {
626✔
1166
      (void)taosThreadMutexUnlock(&writer[0]->tsdb->mutex);
×
1167
      TSDB_CHECK_CODE(code, lino, _exit);
×
1168
    }
1169

1170
    writer[0]->tsdb->pFS->fsstate = TSDB_FS_STATE_NORMAL;
626✔
1171

1172
    (void)taosThreadMutexUnlock(&writer[0]->tsdb->mutex);
626✔
1173
  }
1174

1175
  tsdbIterMergerClose(&writer[0]->ctx->tombIterMerger);
922✔
1176
  tsdbIterMergerClose(&writer[0]->ctx->dataIterMerger);
922✔
1177
  TARRAY2_DESTROY(writer[0]->ctx->tombIterArr, tsdbIterClose);
922✔
1178
  TARRAY2_DESTROY(writer[0]->ctx->dataIterArr, tsdbIterClose);
922✔
1179
  TARRAY2_DESTROY(writer[0]->ctx->sttReaderArr, tsdbSttFileReaderClose);
922✔
1180
  tsdbDataFileReaderClose(&writer[0]->ctx->dataReader);
922✔
1181

1182
  TARRAY2_DESTROY(writer[0]->fopArr, NULL);
922✔
1183
  tsdbFSDestroyCopyRangedSnapshot(&writer[0]->fsetArr);
922✔
1184

1185
  for (int32_t i = 0; i < ARRAY_SIZE(writer[0]->buffers); ++i) {
10,142✔
1186
    tBufferDestroy(writer[0]->buffers + i);
9,220✔
1187
  }
1188

1189
  taosMemoryFree(writer[0]);
922✔
1190
  writer[0] = NULL;
922✔
1191

1192
_exit:
922✔
1193
  if (code) {
922✔
1194
    TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
×
1195
  } else {
1196
    tsdbInfo("vgId:%d %s done, rollback:%d", TD_VID(tsdb->pVnode), __func__, rollback);
922✔
1197
  }
1198
  return code;
922✔
1199
}
1200

1201
int32_t tsdbSnapWrite(STsdbSnapWriter* writer, SSnapDataHdr* hdr) {
2,419✔
1202
  int32_t code = 0;
2,419✔
1203
  int32_t lino = 0;
2,419✔
1204

1205
  if (hdr->type == SNAP_DATA_TSDB) {
2,419✔
1206
    code = tsdbSnapWriteTimeSeriesData(writer, hdr);
2,419✔
1207
    TSDB_CHECK_CODE(code, lino, _exit);
2,419✔
1208
  } else if (hdr->type == SNAP_DATA_DEL) {
×
1209
    code = tsdbSnapWriteTombData(writer, hdr);
×
1210
    TSDB_CHECK_CODE(code, lino, _exit);
×
1211
  } else {
1212
    TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_PARA, lino, _exit);
×
1213
  }
1214

1215
_exit:
2,419✔
1216
  if (code) {
2,419✔
1217
    tsdbError("vgId:%d %s failed at line %d since %s, type:%d index:%" PRId64 " size:%" PRId64,
×
1218
              TD_VID(writer->tsdb->pVnode), __func__, lino, tstrerror(code), hdr->type, hdr->index, hdr->size);
1219
  } else {
1220
    tsdbDebug("vgId:%d %s done, type:%d index:%" PRId64 " size:%" PRId64, TD_VID(writer->tsdb->pVnode), __func__,
2,419✔
1221
              hdr->type, hdr->index, hdr->size);
1222
  }
1223
  return code;
2,419✔
1224
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc