• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3599

08 Feb 2025 11:23AM UTC coverage: 1.77% (-61.6%) from 63.396%
#3599

push

travis-ci

web-flow
Merge pull request #29712 from taosdata/fix/TD-33652-3.0

fix: reduce write rows from 30w to 3w

3776 of 278949 branches covered (1.35%)

Branch coverage included in aggregate %.

6012 of 274147 relevant lines covered (2.19%)

1642.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/dnode/vnode/src/tsdb/tsdbSnapshot.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tsdb.h"
17
#include "tsdbDataFileRW.h"
18
#include "tsdbFS2.h"
19
#include "tsdbFSetRW.h"
20
#include "tsdbIter.h"
21
#include "tsdbSttFileRW.h"
22

23
extern int32_t tsdbUpdateTableSchema(SMeta* pMeta, int64_t suid, int64_t uid, SSkmInfo* pSkmInfo);
24

25
// STsdbSnapReader ========================================
26
struct STsdbSnapReader {
27
  STsdb*  tsdb;
28
  int64_t sver;
29
  int64_t ever;
30
  int8_t  type;
31

32
  SBuffer  buffers[10];
33
  SSkmInfo skmTb[1];
34

35
  TFileSetRangeArray* fsrArr;
36

37
  // context
38
  struct {
39
    int32_t         fsrArrIdx;
40
    STFileSetRange* fsr;
41
    bool            isDataDone;
42
    bool            isTombDone;
43
  } ctx[1];
44

45
  // reader
46
  SDataFileReader*    dataReader;
47
  TSttFileReaderArray sttReaderArr[1];
48

49
  // iter
50
  TTsdbIterArray dataIterArr[1];
51
  SIterMerger*   dataIterMerger;
52
  TTsdbIterArray tombIterArr[1];
53
  SIterMerger*   tombIterMerger;
54

55
  // data
56
  SBlockData blockData[1];
57
  STombBlock tombBlock[1];
58
};
59

60
static void tsdbSnapReadFileSetCloseReader(STsdbSnapReader* reader) {
×
61
  TARRAY2_CLEAR(reader->sttReaderArr, tsdbSttFileReaderClose);
×
62
  tsdbDataFileReaderClose(&reader->dataReader);
×
63
  return;
×
64
}
65

66
static int32_t tsdbSnapReadFileSetOpenReader(STsdbSnapReader* reader) {
×
67
  int32_t code = 0;
×
68
  int32_t lino = 0;
×
69

70
  // data
71
  SDataFileReaderConfig config = {
×
72
      .tsdb = reader->tsdb,
×
73
      .szPage = reader->tsdb->pVnode->config.tsdbPageSize,
×
74
      .buffers = reader->buffers,
×
75
  };
76
  bool hasDataFile = false;
×
77
  for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ftype++) {
×
78
    if (reader->ctx->fsr->fset->farr[ftype] != NULL) {
×
79
      hasDataFile = true;
×
80
      config.files[ftype].exist = true;
×
81
      config.files[ftype].file = reader->ctx->fsr->fset->farr[ftype]->f[0];
×
82
    }
83
  }
84

85
  if (hasDataFile) {
×
86
    code = tsdbDataFileReaderOpen(NULL, &config, &reader->dataReader);
×
87
    TSDB_CHECK_CODE(code, lino, _exit);
×
88
  }
89

90
  // stt
91
  SSttLvl* lvl;
92
  TARRAY2_FOREACH(reader->ctx->fsr->fset->lvlArr, lvl) {
×
93
    STFileObj* fobj;
94
    TARRAY2_FOREACH(lvl->fobjArr, fobj) {
×
95
      SSttFileReader*      sttReader;
96
      SSttFileReaderConfig config = {
×
97
          .tsdb = reader->tsdb,
×
98
          .szPage = reader->tsdb->pVnode->config.tsdbPageSize,
×
99
          .file = fobj->f[0],
100
          .buffers = reader->buffers,
×
101
      };
102

103
      code = tsdbSttFileReaderOpen(fobj->fname, &config, &sttReader);
×
104
      TSDB_CHECK_CODE(code, lino, _exit);
×
105

106
      if ((code = TARRAY2_APPEND(reader->sttReaderArr, sttReader))) {
×
107
        tsdbSttFileReaderClose(&sttReader);
×
108
        TSDB_CHECK_CODE(code, lino, _exit);
×
109
      }
110
    }
111
  }
112

113
_exit:
×
114
  if (code) {
×
115
    tsdbSnapReadFileSetCloseReader(reader);
×
116
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
×
117
  }
118
  return code;
×
119
}
120

121
static int32_t tsdbSnapReadFileSetOpenIter(STsdbSnapReader* reader) {
×
122
  int32_t code = 0;
×
123
  int32_t lino = 0;
×
124

125
  STsdbIter*      iter;
126
  STsdbIterConfig config = {
×
127
      .filterByVersion = true,
128
      .verRange[0] = reader->ctx->fsr->sver,
×
129
      .verRange[1] = reader->ctx->fsr->ever,
×
130
  };
131

132
  // data file
133
  if (reader->dataReader) {
×
134
    // data
135
    config.type = TSDB_ITER_TYPE_DATA;
×
136
    config.dataReader = reader->dataReader;
×
137

138
    code = tsdbIterOpen(&config, &iter);
×
139
    TSDB_CHECK_CODE(code, lino, _exit);
×
140

141
    code = TARRAY2_APPEND(reader->dataIterArr, iter);
×
142
    TSDB_CHECK_CODE(code, lino, _exit);
×
143

144
    // tomb
145
    config.type = TSDB_ITER_TYPE_DATA_TOMB;
×
146
    config.dataReader = reader->dataReader;
×
147

148
    code = tsdbIterOpen(&config, &iter);
×
149
    TSDB_CHECK_CODE(code, lino, _exit);
×
150

151
    code = TARRAY2_APPEND(reader->tombIterArr, iter);
×
152
    TSDB_CHECK_CODE(code, lino, _exit);
×
153
  }
154

155
  // stt file
156
  SSttFileReader* sttReader;
157
  TARRAY2_FOREACH(reader->sttReaderArr, sttReader) {
×
158
    // data
159
    config.type = TSDB_ITER_TYPE_STT;
×
160
    config.sttReader = sttReader;
×
161

162
    code = tsdbIterOpen(&config, &iter);
×
163
    TSDB_CHECK_CODE(code, lino, _exit);
×
164

165
    code = TARRAY2_APPEND(reader->dataIterArr, iter);
×
166
    TSDB_CHECK_CODE(code, lino, _exit);
×
167

168
    // tomb
169
    config.type = TSDB_ITER_TYPE_STT_TOMB;
×
170
    config.sttReader = sttReader;
×
171

172
    code = tsdbIterOpen(&config, &iter);
×
173
    TSDB_CHECK_CODE(code, lino, _exit);
×
174

175
    code = TARRAY2_APPEND(reader->tombIterArr, iter);
×
176
    TSDB_CHECK_CODE(code, lino, _exit);
×
177
  }
178

179
  // merger
180
  code = tsdbIterMergerOpen(reader->dataIterArr, &reader->dataIterMerger, false);
×
181
  TSDB_CHECK_CODE(code, lino, _exit);
×
182

183
  code = tsdbIterMergerOpen(reader->tombIterArr, &reader->tombIterMerger, true);
×
184
  TSDB_CHECK_CODE(code, lino, _exit);
×
185

186
_exit:
×
187
  if (code) {
×
188
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
×
189
  }
190
  return code;
×
191
}
192

193
static void tsdbSnapReadFileSetCloseIter(STsdbSnapReader* reader) {
×
194
  tsdbIterMergerClose(&reader->dataIterMerger);
×
195
  tsdbIterMergerClose(&reader->tombIterMerger);
×
196
  TARRAY2_CLEAR(reader->dataIterArr, tsdbIterClose);
×
197
  TARRAY2_CLEAR(reader->tombIterArr, tsdbIterClose);
×
198
  return;
×
199
}
200

201
static int32_t tsdbSnapReadRangeBegin(STsdbSnapReader* reader) {
×
202
  int32_t code = 0;
×
203
  int32_t lino = 0;
×
204

205
  if (reader->ctx->fsrArrIdx < TARRAY2_SIZE(reader->fsrArr)) {
×
206
    reader->ctx->fsr = TARRAY2_GET(reader->fsrArr, reader->ctx->fsrArrIdx++);
×
207
    reader->ctx->isDataDone = false;
×
208
    reader->ctx->isTombDone = false;
×
209

210
    code = tsdbSnapReadFileSetOpenReader(reader);
×
211
    TSDB_CHECK_CODE(code, lino, _exit);
×
212

213
    code = tsdbSnapReadFileSetOpenIter(reader);
×
214
    TSDB_CHECK_CODE(code, lino, _exit);
×
215
  }
216

217
_exit:
×
218
  if (code) {
×
219
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
×
220
  }
221
  return code;
×
222
}
223

224
static int32_t tsdbSnapReadRangeEnd(STsdbSnapReader* reader) {
×
225
  tsdbSnapReadFileSetCloseIter(reader);
×
226
  tsdbSnapReadFileSetCloseReader(reader);
×
227
  reader->ctx->fsr = NULL;
×
228
  return 0;
×
229
}
230

231
static int32_t tsdbSnapCmprData(STsdbSnapReader* reader, uint8_t** data) {
×
232
  int32_t code = 0;
×
233
  int32_t lino = 0;
×
234

235
  SColCompressInfo info;
236

237
  SColCompressInfo cmprInfo = {.pColCmpr = NULL, .defaultCmprAlg = NO_COMPRESSION};
×
238
  code = tBlockDataCompress(reader->blockData, (void*)&cmprInfo, reader->buffers, reader->buffers + 4);
×
239
  TSDB_CHECK_CODE(code, lino, _exit);
×
240

241
  int32_t size = 0;
×
242
  for (int i = 0; i < 4; i++) {
×
243
    size += reader->buffers[i].size;
×
244
  }
245
  *data = taosMemoryMalloc(sizeof(SSnapDataHdr) + size);
×
246
  if (*data == NULL) {
×
247
    code = terrno;
×
248
    TSDB_CHECK_CODE(code, lino, _exit);
×
249
  }
250

251
  SSnapDataHdr* pHdr = (SSnapDataHdr*)*data;
×
252
  uint8_t*      pBuf = pHdr->data;
×
253

254
  pHdr->type = reader->type;
×
255
  pHdr->size = size;
×
256
  for (int i = 0; i < 4; i++) {
×
257
    memcpy(pBuf, reader->buffers[i].data, reader->buffers[i].size);
×
258
    pBuf += reader->buffers[i].size;
×
259
  }
260

261
_exit:
×
262
  if (code) {
×
263
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), lino, code);
×
264
  }
265
  return code;
×
266
}
267

268
static int64_t tBlockDataSize(SBlockData* pBlockData) {
×
269
  int64_t nData = 0;
×
270
  for (int32_t iCol = 0; iCol < pBlockData->nColData; iCol++) {
×
271
    SColData* pColData = tBlockDataGetColDataByIdx(pBlockData, iCol);
×
272
    nData += pColData->nData;
×
273
  }
274
  return nData;
×
275
}
276

277
static int32_t tsdbSnapReadTimeSeriesData(STsdbSnapReader* reader, uint8_t** data) {
×
278
  int32_t   code = 0;
×
279
  int32_t   lino = 0;
×
280
  SMetaInfo info;
281

282
  tBlockDataReset(reader->blockData);
×
283

284
  TABLEID tbid[1] = {0};
×
285
  for (SRowInfo* row; (row = tsdbIterMergerGetData(reader->dataIterMerger));) {
×
286
    // skip dropped table
287
    if (row->uid != tbid->uid) {
×
288
      tbid->suid = row->suid;
×
289
      tbid->uid = row->uid;
×
290
      if (metaGetInfo(reader->tsdb->pVnode->pMeta, tbid->uid, &info, NULL) != 0) {
×
291
        code = tsdbIterMergerSkipTableData(reader->dataIterMerger, tbid);
×
292
        TSDB_CHECK_CODE(code, lino, _exit);
×
293
        continue;
×
294
      }
295
    }
296

297
    if (reader->blockData->suid == 0 && reader->blockData->uid == 0) {
×
298
      code = tsdbUpdateSkmTb(reader->tsdb, (TABLEID*)row, reader->skmTb);
×
299
      TSDB_CHECK_CODE(code, lino, _exit);
×
300

301
      TABLEID tbid1 = {
×
302
          .suid = row->suid,
×
303
          .uid = row->suid ? 0 : row->uid,
×
304
      };
305
      code = tBlockDataInit(reader->blockData, &tbid1, reader->skmTb->pTSchema, NULL, 0);
×
306
      TSDB_CHECK_CODE(code, lino, _exit);
×
307
    }
308

309
    if (!TABLE_SAME_SCHEMA(reader->blockData->suid, reader->blockData->uid, row->suid, row->uid)) {
×
310
      break;
×
311
    }
312

313
    code = tBlockDataAppendRow(reader->blockData, &row->row, NULL, row->uid);
×
314
    TSDB_CHECK_CODE(code, lino, _exit);
×
315

316
    code = tsdbIterMergerNext(reader->dataIterMerger);
×
317
    TSDB_CHECK_CODE(code, lino, _exit);
×
318

319
    if (!(reader->blockData->nRow % 16)) {
×
320
      int64_t nData = tBlockDataSize(reader->blockData);
×
321
      if (nData >= TSDB_SNAP_DATA_PAYLOAD_SIZE) {
×
322
        break;
×
323
      }
324
    }
325
  }
326

327
  if (reader->blockData->nRow > 0) {
×
328
    code = tsdbSnapCmprData(reader, data);
×
329
    TSDB_CHECK_CODE(code, lino, _exit);
×
330
  }
331

332
_exit:
×
333
  if (code) {
×
334
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
×
335
  }
336
  return code;
×
337
}
338

339
static int32_t tsdbSnapCmprTombData(STsdbSnapReader* reader, uint8_t** data) {
×
340
  int32_t code = 0;
×
341
  int32_t lino = 0;
×
342

343
  int64_t size = 0;
×
344
  for (int32_t i = 0; i < ARRAY_SIZE(reader->tombBlock->buffers); i++) {
×
345
    size += reader->tombBlock->buffers[i].size;
×
346
  }
347

348
  data[0] = taosMemoryMalloc(size + sizeof(SSnapDataHdr));
×
349
  if (data[0] == NULL) {
×
350
    code = terrno;
×
351
    TSDB_CHECK_CODE(code, lino, _exit);
×
352
  }
353

354
  SSnapDataHdr* hdr = (SSnapDataHdr*)(data[0]);
×
355
  hdr->type = SNAP_DATA_DEL;
×
356
  hdr->size = size;
×
357

358
  uint8_t* tdata = hdr->data;
×
359
  for (int32_t i = 0; i < ARRAY_SIZE(reader->tombBlock->buffers); i++) {
×
360
    memcpy(tdata, reader->tombBlock->buffers[i].data, reader->tombBlock->buffers[i].size);
×
361
    tdata += reader->tombBlock->buffers[i].size;
×
362
  }
363

364
_exit:
×
365
  if (code) {
×
366
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
×
367
  }
368
  return code;
×
369
}
370

371
static int32_t tsdbSnapReadTombData(STsdbSnapReader* reader, uint8_t** data) {
×
372
  int32_t   code = 0;
×
373
  int32_t   lino = 0;
×
374
  SMetaInfo info;
375

376
  tTombBlockClear(reader->tombBlock);
×
377

378
  TABLEID tbid[1] = {0};
×
379
  for (STombRecord* record; (record = tsdbIterMergerGetTombRecord(reader->tombIterMerger)) != NULL;) {
×
380
    if (record->uid != tbid->uid) {
×
381
      tbid->suid = record->suid;
×
382
      tbid->uid = record->uid;
×
383
      if (metaGetInfo(reader->tsdb->pVnode->pMeta, tbid->uid, &info, NULL) != 0) {
×
384
        code = tsdbIterMergerSkipTableData(reader->tombIterMerger, tbid);
×
385
        TSDB_CHECK_CODE(code, lino, _exit);
×
386
        continue;
×
387
      }
388
    }
389

390
    code = tTombBlockPut(reader->tombBlock, record);
×
391
    TSDB_CHECK_CODE(code, lino, _exit);
×
392

393
    code = tsdbIterMergerNext(reader->tombIterMerger);
×
394
    TSDB_CHECK_CODE(code, lino, _exit);
×
395

396
    if (TOMB_BLOCK_SIZE(reader->tombBlock) >= 81920) {
×
397
      break;
×
398
    }
399
  }
400

401
  if (TOMB_BLOCK_SIZE(reader->tombBlock) > 0) {
×
402
    code = tsdbSnapCmprTombData(reader, data);
×
403
    TSDB_CHECK_CODE(code, lino, _exit);
×
404
  }
405

406
_exit:
×
407
  if (code) {
×
408
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
×
409
  }
410
  return code;
×
411
}
412

413
int32_t tsdbSnapReaderOpen(STsdb* tsdb, int64_t sver, int64_t ever, int8_t type, void* pRanges,
×
414
                           STsdbSnapReader** reader) {
415
  int32_t code = 0;
×
416
  int32_t lino = 0;
×
417

418
  reader[0] = (STsdbSnapReader*)taosMemoryCalloc(1, sizeof(*reader[0]));
×
419
  if (reader[0] == NULL) return terrno;
×
420

421
  reader[0]->tsdb = tsdb;
×
422
  reader[0]->sver = sver;
×
423
  reader[0]->ever = ever;
×
424
  reader[0]->type = type;
×
425

426
  code = tsdbFSCreateRefRangedSnapshot(tsdb->pFS, sver, ever, (TFileSetRangeArray*)pRanges, &reader[0]->fsrArr);
×
427
  TSDB_CHECK_CODE(code, lino, _exit);
×
428

429
_exit:
×
430
  if (code) {
×
431
    tsdbError("vgId:%d %s failed at %s:%d since %s, sver:%" PRId64 " ever:%" PRId64 " type:%d", TD_VID(tsdb->pVnode),
×
432
              __func__, __FILE__, lino, tstrerror(code), sver, ever, type);
433
    tsdbTFileSetRangeArrayDestroy(&reader[0]->fsrArr);
×
434
    taosMemoryFree(reader[0]);
×
435
    reader[0] = NULL;
×
436
  } else {
437
    tsdbInfo("vgId:%d, tsdb snapshot incremental reader opened. sver:%" PRId64 " ever:%" PRId64 " type:%d",
×
438
             TD_VID(tsdb->pVnode), sver, ever, type);
439
  }
440
  return code;
×
441
}
442

443
void tsdbSnapReaderClose(STsdbSnapReader** reader) {
×
444
  if (reader[0] == NULL) {
×
445
    return;
×
446
  }
447

448
  int32_t code = 0;
×
449

450
  STsdb* tsdb = reader[0]->tsdb;
×
451

452
  tTombBlockDestroy(reader[0]->tombBlock);
×
453
  tBlockDataDestroy(reader[0]->blockData);
×
454

455
  tsdbIterMergerClose(&reader[0]->dataIterMerger);
×
456
  tsdbIterMergerClose(&reader[0]->tombIterMerger);
×
457
  TARRAY2_DESTROY(reader[0]->dataIterArr, tsdbIterClose);
×
458
  TARRAY2_DESTROY(reader[0]->tombIterArr, tsdbIterClose);
×
459
  TARRAY2_DESTROY(reader[0]->sttReaderArr, tsdbSttFileReaderClose);
×
460
  tsdbDataFileReaderClose(&reader[0]->dataReader);
×
461

462
  tsdbFSDestroyRefRangedSnapshot(&reader[0]->fsrArr);
×
463
  tDestroyTSchema(reader[0]->skmTb->pTSchema);
×
464

465
  for (int32_t i = 0; i < ARRAY_SIZE(reader[0]->buffers); ++i) {
×
466
    tBufferDestroy(reader[0]->buffers + i);
×
467
  }
468

469
  taosMemoryFree(reader[0]);
×
470
  reader[0] = NULL;
×
471

472
  return;
×
473
}
474

475
int32_t tsdbSnapRead(STsdbSnapReader* reader, uint8_t** data) {
×
476
  int32_t code = 0;
×
477
  int32_t lino = 0;
×
478

479
  data[0] = NULL;
×
480

481
  for (;;) {
482
    if (reader->ctx->fsr == NULL) {
×
483
      code = tsdbSnapReadRangeBegin(reader);
×
484
      TSDB_CHECK_CODE(code, lino, _exit);
×
485

486
      if (reader->ctx->fsr == NULL) {
×
487
        break;
×
488
      }
489
    }
490

491
    if (!reader->ctx->isDataDone) {
×
492
      code = tsdbSnapReadTimeSeriesData(reader, data);
×
493
      TSDB_CHECK_CODE(code, lino, _exit);
×
494
      if (data[0]) {
×
495
        goto _exit;
×
496
      } else {
497
        reader->ctx->isDataDone = true;
×
498
      }
499
    }
500

501
    if (!reader->ctx->isTombDone) {
×
502
      code = tsdbSnapReadTombData(reader, data);
×
503
      TSDB_CHECK_CODE(code, lino, _exit);
×
504
      if (data[0]) {
×
505
        goto _exit;
×
506
      } else {
507
        reader->ctx->isTombDone = true;
×
508
      }
509
    }
510

511
    code = tsdbSnapReadRangeEnd(reader);
×
512
    TSDB_CHECK_CODE(code, lino, _exit);
×
513
  }
514

515
_exit:
×
516
  if (code) {
×
517
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
×
518
  } else {
519
    tsdbDebug("vgId:%d %s done", TD_VID(reader->tsdb->pVnode), __func__);
×
520
  }
521
  return code;
×
522
}
523

524
// STsdbSnapWriter ========================================
525
struct STsdbSnapWriter {
526
  STsdb*  tsdb;
527
  int64_t sver;
528
  int64_t ever;
529
  int32_t minutes;
530
  int8_t  precision;
531
  int32_t minRow;
532
  int32_t maxRow;
533
  int8_t  cmprAlg;
534
  int64_t commitID;
535
  int32_t szPage;
536
  int64_t compactVersion;
537
  int64_t now;
538
  SBuffer buffers[10];
539

540
  TFileSetArray* fsetArr;
541
  TFileOpArray   fopArr[1];
542

543
  struct {
544
    bool       fsetWriteBegin;
545
    int32_t    fid;
546
    STFileSet* fset;
547
    SDiskID    did;
548
    bool       hasData;  // if have time series data
549
    bool       hasTomb;  // if have tomb data
550

551
    // reader
552
    SDataFileReader*    dataReader;
553
    TSttFileReaderArray sttReaderArr[1];
554

555
    // iter/merger
556
    TTsdbIterArray dataIterArr[1];
557
    SIterMerger*   dataIterMerger;
558
    TTsdbIterArray tombIterArr[1];
559
    SIterMerger*   tombIterMerger;
560

561
    // writer
562
    bool         toSttOnly;
563
    SFSetWriter* fsetWriter;
564
  } ctx[1];
565
};
566

567
// APIs
568
static int32_t tsdbSnapWriteTimeSeriesRow(STsdbSnapWriter* writer, SRowInfo* row) {
×
569
  int32_t   code = 0;
×
570
  int32_t   lino = 0;
×
571
  TABLEID   tbid = {0};
×
572
  SMetaInfo info;
573

574
  while (writer->ctx->hasData) {
×
575
    SRowInfo* row1;
576
    for (;;) {
577
      row1 = tsdbIterMergerGetData(writer->ctx->dataIterMerger);
×
578
      if (row1 == NULL) {
×
579
        writer->ctx->hasData = false;
×
580
      } else if (row1->uid != tbid.uid) {
×
581
        tbid.suid = row1->suid;
×
582
        tbid.uid = row1->uid;
×
583
        if (metaGetInfo(writer->tsdb->pVnode->pMeta, tbid.uid, &info, NULL) != 0) {
×
584
          code = tsdbIterMergerSkipTableData(writer->ctx->dataIterMerger, &tbid);
×
585
          TSDB_CHECK_CODE(code, lino, _exit);
×
586
          continue;
×
587
        }
588
      }
589
      break;
×
590
    }
591

592
    if (writer->ctx->hasData == false) {
×
593
      break;
×
594
    }
595

596
    int32_t c = tRowInfoCmprFn(row1, row);
×
597
    if (c <= 0) {
×
598
      code = tsdbFSetWriteRow(writer->ctx->fsetWriter, row1);
×
599
      TSDB_CHECK_CODE(code, lino, _exit);
×
600

601
      code = tsdbIterMergerNext(writer->ctx->dataIterMerger);
×
602
      TSDB_CHECK_CODE(code, lino, _exit);
×
603
    } else {
604
      break;
×
605
    }
606
  }
607

608
  if (row->suid == INT64_MAX) {
×
609
    goto _exit;
×
610
  }
611

612
  code = tsdbFSetWriteRow(writer->ctx->fsetWriter, row);
×
613
  TSDB_CHECK_CODE(code, lino, _exit);
×
614

615
_exit:
×
616
  if (code) {
×
617
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
618
  }
619
  return code;
×
620
}
621

622
static int32_t tsdbSnapWriteFileSetOpenReader(STsdbSnapWriter* writer) {
×
623
  int32_t code = 0;
×
624
  int32_t lino = 0;
×
625

626
  writer->ctx->toSttOnly = false;
×
627
  if (writer->ctx->fset) {
×
628
#if 0
629
    // open data reader
630
    SDataFileReaderConfig dataFileReaderConfig = {
631
        .tsdb = writer->tsdb,
632
        .buffers = writer->buffers,
633
        .szPage = writer->szPage,
634
    };
635

636
    for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ++ftype) {
637
      if (writer->ctx->fset->farr[ftype] == NULL) {
638
        continue;
639
      }
640

641
      dataFileReaderConfig.files[ftype].exist = true;
642
      dataFileReaderConfig.files[ftype].file = writer->ctx->fset->farr[ftype]->f[0];
643

644
      STFileOp fileOp = {
645
          .optype = TSDB_FOP_REMOVE,
646
          .fid = writer->ctx->fset->fid,
647
          .of = writer->ctx->fset->farr[ftype]->f[0],
648
      };
649

650
      code = TARRAY2_APPEND(writer->fopArr, fileOp);
651
      TSDB_CHECK_CODE(code, lino, _exit);
652
    }
653

654
    code = tsdbDataFileReaderOpen(NULL, &dataFileReaderConfig, &writer->ctx->dataReader);
655
    TSDB_CHECK_CODE(code, lino, _exit);
656
#endif
657

658
    // open stt reader array
659
    SSttLvl* lvl;
660
    TARRAY2_FOREACH(writer->ctx->fset->lvlArr, lvl) {
×
661
      if (lvl->level != 0) {
×
662
        if (TARRAY2_SIZE(lvl->fobjArr) > 0) {
×
663
          writer->ctx->toSttOnly = true;
×
664
        }
665

666
        continue;  // Only merge level 0
×
667
      }
668

669
      STFileObj* fobj;
670
      TARRAY2_FOREACH(lvl->fobjArr, fobj) {
×
671
        SSttFileReader*      reader;
672
        SSttFileReaderConfig sttFileReaderConfig = {
×
673
            .tsdb = writer->tsdb,
×
674
            .szPage = writer->szPage,
×
675
            .buffers = writer->buffers,
×
676
            .file = fobj->f[0],
677
        };
678

679
        code = tsdbSttFileReaderOpen(fobj->fname, &sttFileReaderConfig, &reader);
×
680
        TSDB_CHECK_CODE(code, lino, _exit);
×
681

682
        code = TARRAY2_APPEND(writer->ctx->sttReaderArr, reader);
×
683
        TSDB_CHECK_CODE(code, lino, _exit);
×
684

685
        STFileOp fileOp = {
×
686
            .optype = TSDB_FOP_REMOVE,
687
            .fid = fobj->f->fid,
×
688
            .of = fobj->f[0],
689
        };
690

691
        code = TARRAY2_APPEND(writer->fopArr, fileOp);
×
692
        TSDB_CHECK_CODE(code, lino, _exit);
×
693
      }
694
    }
695
  }
696

697
_exit:
×
698
  if (code) {
×
699
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
700
  }
701
  return code;
×
702
}
703

704
static int32_t tsdbSnapWriteFileSetCloseReader(STsdbSnapWriter* writer) {
×
705
  TARRAY2_CLEAR(writer->ctx->sttReaderArr, tsdbSttFileReaderClose);
×
706
  tsdbDataFileReaderClose(&writer->ctx->dataReader);
×
707
  return 0;
×
708
}
709

710
static int32_t tsdbSnapWriteFileSetOpenIter(STsdbSnapWriter* writer) {
×
711
  int32_t code = 0;
×
712
  int32_t lino = 0;
×
713

714
  // data ieter
715
  if (writer->ctx->dataReader) {
×
716
    STsdbIter*      iter;
717
    STsdbIterConfig config = {0};
×
718

719
    // data
720
    config.type = TSDB_ITER_TYPE_DATA;
×
721
    config.dataReader = writer->ctx->dataReader;
×
722

723
    code = tsdbIterOpen(&config, &iter);
×
724
    TSDB_CHECK_CODE(code, lino, _exit);
×
725

726
    code = TARRAY2_APPEND(writer->ctx->dataIterArr, iter);
×
727
    TSDB_CHECK_CODE(code, lino, _exit);
×
728

729
    // tome
730
    config.type = TSDB_ITER_TYPE_DATA_TOMB;
×
731
    config.dataReader = writer->ctx->dataReader;
×
732

733
    code = tsdbIterOpen(&config, &iter);
×
734
    TSDB_CHECK_CODE(code, lino, _exit);
×
735

736
    code = TARRAY2_APPEND(writer->ctx->tombIterArr, iter);
×
737
    TSDB_CHECK_CODE(code, lino, _exit);
×
738
  }
739

740
  // stt iter
741
  SSttFileReader* sttFileReader;
742
  TARRAY2_FOREACH(writer->ctx->sttReaderArr, sttFileReader) {
×
743
    STsdbIter*      iter;
744
    STsdbIterConfig config = {0};
×
745

746
    // data
747
    config.type = TSDB_ITER_TYPE_STT;
×
748
    config.sttReader = sttFileReader;
×
749

750
    code = tsdbIterOpen(&config, &iter);
×
751
    TSDB_CHECK_CODE(code, lino, _exit);
×
752

753
    code = TARRAY2_APPEND(writer->ctx->dataIterArr, iter);
×
754
    TSDB_CHECK_CODE(code, lino, _exit);
×
755

756
    // tomb
757
    config.type = TSDB_ITER_TYPE_STT_TOMB;
×
758
    config.sttReader = sttFileReader;
×
759

760
    code = tsdbIterOpen(&config, &iter);
×
761
    TSDB_CHECK_CODE(code, lino, _exit);
×
762

763
    code = TARRAY2_APPEND(writer->ctx->tombIterArr, iter);
×
764
    TSDB_CHECK_CODE(code, lino, _exit);
×
765
  }
766

767
  // open merger
768
  code = tsdbIterMergerOpen(writer->ctx->dataIterArr, &writer->ctx->dataIterMerger, false);
×
769
  TSDB_CHECK_CODE(code, lino, _exit);
×
770

771
  code = tsdbIterMergerOpen(writer->ctx->tombIterArr, &writer->ctx->tombIterMerger, true);
×
772
  TSDB_CHECK_CODE(code, lino, _exit);
×
773

774
_exit:
×
775
  if (code) {
×
776
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
777
  }
778
  return code;
×
779
}
780

781
static int32_t tsdbSnapWriteFileSetCloseIter(STsdbSnapWriter* writer) {
×
782
  tsdbIterMergerClose(&writer->ctx->dataIterMerger);
×
783
  tsdbIterMergerClose(&writer->ctx->tombIterMerger);
×
784
  TARRAY2_CLEAR(writer->ctx->dataIterArr, tsdbIterClose);
×
785
  TARRAY2_CLEAR(writer->ctx->tombIterArr, tsdbIterClose);
×
786
  return 0;
×
787
}
788

789
static int32_t tsdbSnapWriteFileSetOpenWriter(STsdbSnapWriter* writer) {
×
790
  int32_t code = 0;
×
791
  int32_t lino = 0;
×
792

793
  SFSetWriterConfig config = {
×
794
      .tsdb = writer->tsdb,
×
795
      .toSttOnly = writer->ctx->toSttOnly,
×
796
      .compactVersion = writer->compactVersion,
×
797
      .minRow = writer->minRow,
×
798
      .maxRow = writer->maxRow,
×
799
      .szPage = writer->szPage,
×
800
      .cmprAlg = writer->cmprAlg,
×
801
      .fid = writer->ctx->fid,
×
802
      .cid = writer->commitID,
×
803
      .did = writer->ctx->did,
804
      .level = writer->ctx->toSttOnly ? 1 : 0,
×
805
  };
806
  // merge stt files to either data or a new stt file
807
  if (writer->ctx->fset) {
×
808
    for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ++ftype) {
×
809
      if (writer->ctx->fset->farr[ftype] != NULL) {
×
810
        config.files[ftype].exist = true;
×
811
        config.files[ftype].file = writer->ctx->fset->farr[ftype]->f[0];
×
812
      }
813
    }
814
  }
815

816
  code = tsdbFSetWriterOpen(&config, &writer->ctx->fsetWriter);
×
817
  TSDB_CHECK_CODE(code, lino, _exit);
×
818

819
_exit:
×
820
  if (code) {
×
821
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
822
  }
823
  return code;
×
824
}
825

826
static int32_t tsdbSnapWriteFileSetCloseWriter(STsdbSnapWriter* writer) {
×
827
  return tsdbFSetWriterClose(&writer->ctx->fsetWriter, 0, writer->fopArr);
×
828
}
829

830
static int32_t tsdbSnapWriteFileSetBegin(STsdbSnapWriter* writer, int32_t fid) {
×
831
  int32_t code = 0;
×
832
  int32_t lino = 0;
×
833

834
  STFileSet* fset = &(STFileSet){.fid = fid};
×
835

836
  writer->ctx->fid = fid;
×
837
  STFileSet** fsetPtr = TARRAY2_SEARCH(writer->fsetArr, &fset, tsdbTFileSetCmprFn, TD_EQ);
×
838
  writer->ctx->fset = (fsetPtr == NULL) ? NULL : *fsetPtr;
×
839

840
  int32_t level = tsdbFidLevel(fid, &writer->tsdb->keepCfg, taosGetTimestampSec());
×
841
  if (tfsAllocDisk(writer->tsdb->pVnode->pTfs, level, &writer->ctx->did)) {
×
842
    code = TSDB_CODE_NO_AVAIL_DISK;
×
843
    TSDB_CHECK_CODE(code, lino, _exit);
×
844
  }
845
  if (tfsMkdirRecurAt(writer->tsdb->pVnode->pTfs, writer->tsdb->path, writer->ctx->did) != 0) {
×
846
    tsdbError("vgId:%d failed to create directory %s", TD_VID(writer->tsdb->pVnode), writer->tsdb->path);
×
847
  }
848

849
  writer->ctx->hasData = true;
×
850
  writer->ctx->hasTomb = true;
×
851

852
  code = tsdbSnapWriteFileSetOpenReader(writer);
×
853
  TSDB_CHECK_CODE(code, lino, _exit);
×
854

855
  code = tsdbSnapWriteFileSetOpenIter(writer);
×
856
  TSDB_CHECK_CODE(code, lino, _exit);
×
857

858
  code = tsdbSnapWriteFileSetOpenWriter(writer);
×
859
  TSDB_CHECK_CODE(code, lino, _exit);
×
860

861
  writer->ctx->fsetWriteBegin = true;
×
862

863
_exit:
×
864
  if (code) {
×
865
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
866
  } else {
867
    tsdbInfo("vgId:%d %s succeeded, fid:%d", TD_VID(writer->tsdb->pVnode), __func__, fid);
×
868
  }
869
  return code;
×
870
}
871

872
static int32_t tsdbSnapWriteTombRecord(STsdbSnapWriter* writer, const STombRecord* record) {
×
873
  int32_t code = 0;
×
874
  int32_t lino = 0;
×
875

876
  while (writer->ctx->hasTomb) {
×
877
    STombRecord* record1 = tsdbIterMergerGetTombRecord(writer->ctx->tombIterMerger);
×
878
    if (record1 == NULL) {
×
879
      writer->ctx->hasTomb = false;
×
880
      break;
×
881
    }
882

883
    int32_t c = tTombRecordCompare(record1, record);
×
884
    if (c <= 0) {
×
885
      code = tsdbFSetWriteTombRecord(writer->ctx->fsetWriter, record1);
×
886
      TSDB_CHECK_CODE(code, lino, _exit);
×
887
    } else {
888
      break;
×
889
    }
890

891
    code = tsdbIterMergerNext(writer->ctx->tombIterMerger);
×
892
    TSDB_CHECK_CODE(code, lino, _exit);
×
893
  }
894

895
  if (record->suid == INT64_MAX) {
×
896
    goto _exit;
×
897
  }
898

899
  code = tsdbFSetWriteTombRecord(writer->ctx->fsetWriter, record);
×
900
  TSDB_CHECK_CODE(code, lino, _exit);
×
901

902
_exit:
×
903
  if (code) {
×
904
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
905
  }
906
  return code;
×
907
}
908

909
static int32_t tsdbSnapWriteFileSetEnd(STsdbSnapWriter* writer) {
×
910
  if (!writer->ctx->fsetWriteBegin) return 0;
×
911

912
  int32_t code = 0;
×
913
  int32_t lino = 0;
×
914

915
  // end timeseries data write
916
  SRowInfo row = {
×
917
      .suid = INT64_MAX,
918
      .uid = INT64_MAX,
919
  };
920

921
  code = tsdbSnapWriteTimeSeriesRow(writer, &row);
×
922
  TSDB_CHECK_CODE(code, lino, _exit);
×
923

924
  // end tombstone data write
925
  STombRecord record = {
×
926
      .suid = INT64_MAX,
927
      .uid = INT64_MAX,
928
  };
929

930
  code = tsdbSnapWriteTombRecord(writer, &record);
×
931
  TSDB_CHECK_CODE(code, lino, _exit);
×
932

933
  // close write
934
  code = tsdbSnapWriteFileSetCloseWriter(writer);
×
935
  TSDB_CHECK_CODE(code, lino, _exit);
×
936

937
  code = tsdbSnapWriteFileSetCloseIter(writer);
×
938
  TSDB_CHECK_CODE(code, lino, _exit);
×
939

940
  code = tsdbSnapWriteFileSetCloseReader(writer);
×
941
  TSDB_CHECK_CODE(code, lino, _exit);
×
942

943
  writer->ctx->fsetWriteBegin = false;
×
944

945
_exit:
×
946
  if (code) {
×
947
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
948
  } else {
949
    tsdbInfo("vgId:%d %s succeeded, fid:%d", TD_VID(writer->tsdb->pVnode), __func__, writer->ctx->fid);
×
950
  }
951
  return code;
×
952
}
953

954
static int32_t tsdbSnapWriteFileSetAbort(STsdbSnapWriter* writer) {
×
955
  if (!writer->ctx->fsetWriteBegin) return 0;
×
956

957
  int32_t code = 0;
×
958
  int32_t lino = 0;
×
959

960
  // close write
961
  code = tsdbSnapWriteFileSetCloseWriter(writer);
×
962
  TSDB_CHECK_CODE(code, lino, _exit);
×
963

964
  code = tsdbSnapWriteFileSetCloseIter(writer);
×
965
  TSDB_CHECK_CODE(code, lino, _exit);
×
966

967
  code = tsdbSnapWriteFileSetCloseReader(writer);
×
968
  TSDB_CHECK_CODE(code, lino, _exit);
×
969

970
  writer->ctx->fsetWriteBegin = false;
×
971

972
_exit:
×
973
  if (code) {
×
974
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
975
  }
976
  return code;
×
977
}
978

979
static int32_t tsdbSnapWriteTimeSeriesData(STsdbSnapWriter* writer, SSnapDataHdr* hdr) {
×
980
  int32_t code = 0;
×
981
  int32_t lino = 0;
×
982

983
  SBlockData blockData[1] = {0};
×
984

985
  SBuffer buffer = {
×
986
      .capacity = hdr->size,
×
987
      .data = hdr->data,
×
988
      .size = hdr->size,
×
989
  };
990
  SBufferReader br = BUFFER_READER_INITIALIZER(0, &buffer);
×
991

992
  code = tBlockDataDecompress(&br, blockData, &writer->buffers[0]);
×
993
  TSDB_CHECK_CODE(code, lino, _exit);
×
994

995
  int32_t fid = tsdbKeyFid(blockData->aTSKEY[0], writer->minutes, writer->precision);
×
996
  if (!writer->ctx->fsetWriteBegin || fid != writer->ctx->fid) {
×
997
    code = tsdbSnapWriteFileSetEnd(writer);
×
998
    TSDB_CHECK_CODE(code, lino, _exit);
×
999

1000
    code = tsdbSnapWriteFileSetBegin(writer, fid);
×
1001
    TSDB_CHECK_CODE(code, lino, _exit);
×
1002
  }
1003

1004
  for (int32_t i = 0; i < blockData->nRow; ++i) {
×
1005
    SRowInfo rowInfo = {
×
1006
        .suid = blockData->suid,
×
1007
        .uid = blockData->uid ? blockData->uid : blockData->aUid[i],
×
1008
        .row = tsdbRowFromBlockData(blockData, i),
1009
    };
1010

1011
    code = tsdbSnapWriteTimeSeriesRow(writer, &rowInfo);
×
1012
    TSDB_CHECK_CODE(code, lino, _exit);
×
1013
  }
1014

1015
_exit:
×
1016
  if (code) {
×
1017
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
1018
  } else {
1019
    tsdbDebug("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64 " nRow:%d", TD_VID(writer->tsdb->pVnode), __func__,
×
1020
              blockData->suid, blockData->uid, blockData->nRow);
1021
  }
1022
  tBlockDataDestroy(blockData);
×
1023
  return code;
×
1024
}
1025

1026
static int32_t tsdbSnapWriteDecmprTombBlock(SSnapDataHdr* hdr, STombBlock* tombBlock) {
×
1027
  int32_t code = 0;
×
1028
  int32_t lino = 0;
×
1029

1030
  tTombBlockClear(tombBlock);
×
1031

1032
  int64_t size = hdr->size;
×
1033
  size = size / TOMB_RECORD_ELEM_NUM;
×
1034
  tombBlock->numOfRecords = size / sizeof(int64_t);
×
1035

1036
  // int64_t* data = (int64_t*)hdr->data;
1037
  uint8_t* data = hdr->data;
×
1038
  for (int32_t i = 0; i < TOMB_RECORD_ELEM_NUM; ++i) {
×
1039
    code = tBufferPut(tombBlock->buffers + i, data, size);
×
1040
    TSDB_CHECK_CODE(code, lino, _exit);
×
1041
    data += size;
×
1042
  }
1043

1044
_exit:
×
1045
  return code;
×
1046
}
1047

1048
static int32_t tsdbSnapWriteTombData(STsdbSnapWriter* writer, SSnapDataHdr* hdr) {
×
1049
  int32_t code = 0;
×
1050
  int32_t lino = 0;
×
1051

1052
  STombRecord record;
1053
  STombBlock  tombBlock[1] = {0};
×
1054

1055
  code = tsdbSnapWriteDecmprTombBlock(hdr, tombBlock);
×
1056
  TSDB_CHECK_CODE(code, lino, _exit);
×
1057

1058
  code = tTombBlockGet(tombBlock, 0, &record);
×
1059
  TSDB_CHECK_CODE(code, lino, _exit);
×
1060
  int32_t fid = tsdbKeyFid(record.skey, writer->minutes, writer->precision);
×
1061
  if (!writer->ctx->fsetWriteBegin || fid != writer->ctx->fid) {
×
1062
    code = tsdbSnapWriteFileSetEnd(writer);
×
1063
    TSDB_CHECK_CODE(code, lino, _exit);
×
1064

1065
    code = tsdbSnapWriteFileSetBegin(writer, fid);
×
1066
    TSDB_CHECK_CODE(code, lino, _exit);
×
1067
  }
1068

1069
  if (writer->ctx->hasData) {
×
1070
    SRowInfo row = {
×
1071
        .suid = INT64_MAX,
1072
        .uid = INT64_MAX,
1073
    };
1074

1075
    code = tsdbSnapWriteTimeSeriesRow(writer, &row);
×
1076
    TSDB_CHECK_CODE(code, lino, _exit);
×
1077
  }
1078

1079
  for (int32_t i = 0; i < TOMB_BLOCK_SIZE(tombBlock); ++i) {
×
1080
    code = tTombBlockGet(tombBlock, i, &record);
×
1081
    TSDB_CHECK_CODE(code, lino, _exit);
×
1082

1083
    code = tsdbSnapWriteTombRecord(writer, &record);
×
1084
    TSDB_CHECK_CODE(code, lino, _exit);
×
1085
  }
1086

1087
  tTombBlockDestroy(tombBlock);
×
1088

1089
_exit:
×
1090
  if (code) {
×
1091
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
1092
  }
1093
  return code;
×
1094
}
1095

1096
int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, void* pRanges, STsdbSnapWriter** writer) {
×
1097
  int32_t code = 0;
×
1098
  int32_t lino = 0;
×
1099

1100
  // start to write
1101
  writer[0] = taosMemoryCalloc(1, sizeof(*writer[0]));
×
1102
  if (writer[0] == NULL) return terrno;
×
1103

1104
  writer[0]->tsdb = pTsdb;
×
1105
  writer[0]->sver = sver;
×
1106
  writer[0]->ever = ever;
×
1107
  writer[0]->minutes = pTsdb->keepCfg.days;
×
1108
  writer[0]->precision = pTsdb->keepCfg.precision;
×
1109
  writer[0]->minRow = pTsdb->pVnode->config.tsdbCfg.minRows;
×
1110
  writer[0]->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows;
×
1111
  writer[0]->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression;
×
1112
  writer[0]->commitID = tsdbFSAllocEid(pTsdb->pFS);
×
1113
  writer[0]->szPage = pTsdb->pVnode->config.tsdbPageSize;
×
1114
  writer[0]->compactVersion = INT64_MAX;
×
1115
  writer[0]->now = taosGetTimestampMs();
×
1116

1117
  code =
1118
      tsdbFSCreateCopyRangedSnapshot(pTsdb->pFS, (TFileSetRangeArray*)pRanges, &writer[0]->fsetArr, writer[0]->fopArr);
×
1119
  TSDB_CHECK_CODE(code, lino, _exit);
×
1120

1121
_exit:
×
1122
  if (code) {
×
1123
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
×
1124
  } else {
1125
    tsdbInfo("vgId:%d %s done, sver:%" PRId64 " ever:%" PRId64, TD_VID(pTsdb->pVnode), __func__, sver, ever);
×
1126
  }
1127
  return code;
×
1128
}
1129

1130
int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* writer, bool rollback) {
×
1131
  int32_t code = 0;
×
1132
  int32_t lino = 0;
×
1133

1134
  if (!rollback) {
×
1135
    code = tsdbSnapWriteFileSetEnd(writer);
×
1136
    TSDB_CHECK_CODE(code, lino, _exit);
×
1137

1138
    code = tsdbFSEditBegin(writer->tsdb->pFS, writer->fopArr, TSDB_FEDIT_COMMIT);
×
1139
    TSDB_CHECK_CODE(code, lino, _exit);
×
1140
  } else {
1141
    code = tsdbSnapWriteFileSetAbort(writer);
×
1142
    TSDB_CHECK_CODE(code, lino, _exit);
×
1143

1144
    code = tsdbFSEditBegin(writer->tsdb->pFS, writer->fopArr, TSDB_FEDIT_COMMIT);
×
1145
    TSDB_CHECK_CODE(code, lino, _exit);
×
1146
  }
1147

1148
_exit:
×
1149
  if (code) {
×
1150
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
1151
  } else {
1152
    tsdbDebug("vgId:%d %s done", TD_VID(writer->tsdb->pVnode), __func__);
×
1153
  }
1154
  return code;
×
1155
}
1156

1157
int32_t tsdbSnapWriterClose(STsdbSnapWriter** writer, int8_t rollback) {
×
1158
  if (writer[0] == NULL) return 0;
×
1159

1160
  int32_t code = 0;
×
1161
  int32_t lino = 0;
×
1162

1163
  STsdb* tsdb = writer[0]->tsdb;
×
1164

1165
  if (rollback) {
×
1166
    code = tsdbFSEditAbort(writer[0]->tsdb->pFS);
×
1167
    TSDB_CHECK_CODE(code, lino, _exit);
×
1168
  } else {
1169
    (void)taosThreadMutexLock(&writer[0]->tsdb->mutex);
×
1170

1171
    code = tsdbFSEditCommit(writer[0]->tsdb->pFS);
×
1172
    if (code) {
×
1173
      (void)taosThreadMutexUnlock(&writer[0]->tsdb->mutex);
×
1174
      TSDB_CHECK_CODE(code, lino, _exit);
×
1175
    }
1176

1177
    writer[0]->tsdb->pFS->fsstate = TSDB_FS_STATE_NORMAL;
×
1178

1179
    (void)taosThreadMutexUnlock(&writer[0]->tsdb->mutex);
×
1180
  }
1181

1182
  tsdbIterMergerClose(&writer[0]->ctx->tombIterMerger);
×
1183
  tsdbIterMergerClose(&writer[0]->ctx->dataIterMerger);
×
1184
  TARRAY2_DESTROY(writer[0]->ctx->tombIterArr, tsdbIterClose);
×
1185
  TARRAY2_DESTROY(writer[0]->ctx->dataIterArr, tsdbIterClose);
×
1186
  TARRAY2_DESTROY(writer[0]->ctx->sttReaderArr, tsdbSttFileReaderClose);
×
1187
  tsdbDataFileReaderClose(&writer[0]->ctx->dataReader);
×
1188

1189
  TARRAY2_DESTROY(writer[0]->fopArr, NULL);
×
1190
  tsdbFSDestroyCopyRangedSnapshot(&writer[0]->fsetArr);
×
1191

1192
  for (int32_t i = 0; i < ARRAY_SIZE(writer[0]->buffers); ++i) {
×
1193
    tBufferDestroy(writer[0]->buffers + i);
×
1194
  }
1195

1196
  taosMemoryFree(writer[0]);
×
1197
  writer[0] = NULL;
×
1198

1199
_exit:
×
1200
  if (code) {
×
1201
    TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
×
1202
  } else {
1203
    tsdbInfo("vgId:%d %s done, rollback:%d", TD_VID(tsdb->pVnode), __func__, rollback);
×
1204
  }
1205
  return code;
×
1206
}
1207

1208
int32_t tsdbSnapWrite(STsdbSnapWriter* writer, SSnapDataHdr* hdr) {
×
1209
  int32_t code = 0;
×
1210
  int32_t lino = 0;
×
1211

1212
  if (hdr->type == SNAP_DATA_TSDB) {
×
1213
    code = tsdbSnapWriteTimeSeriesData(writer, hdr);
×
1214
    TSDB_CHECK_CODE(code, lino, _exit);
×
1215
  } else if (hdr->type == SNAP_DATA_DEL) {
×
1216
    code = tsdbSnapWriteTombData(writer, hdr);
×
1217
    TSDB_CHECK_CODE(code, lino, _exit);
×
1218
  } else {
1219
    TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_PARA, lino, _exit);
×
1220
  }
1221

1222
_exit:
×
1223
  if (code) {
×
1224
    tsdbError("vgId:%d %s failed at line %d since %s, type:%d index:%" PRId64 " size:%" PRId64,
×
1225
              TD_VID(writer->tsdb->pVnode), __func__, lino, tstrerror(code), hdr->type, hdr->index, hdr->size);
1226
  } else {
1227
    tsdbDebug("vgId:%d %s done, type:%d index:%" PRId64 " size:%" PRId64, TD_VID(writer->tsdb->pVnode), __func__,
×
1228
              hdr->type, hdr->index, hdr->size);
1229
  }
1230
  return code;
×
1231
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc