• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4851

14 Nov 2025 08:06AM UTC coverage: 63.754% (+0.03%) from 63.728%
#4851

push

travis-ci

guanshengliang
Merge branch '3.0' into cover/3.0

354 of 675 new or added lines in 18 files covered. (52.44%)

3145 existing lines in 113 files now uncovered.

149128 of 233910 relevant lines covered (63.75%)

117183401.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.75
/source/dnode/vnode/src/tsdb/tsdbSnapshot.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tsdb.h"
17
#include "tsdbDataFileRW.h"
18
#include "tsdbFS2.h"
19
#include "tsdbFSetRW.h"
20
#include "tsdbIter.h"
21
#include "tsdbSttFileRW.h"
22

23
extern int32_t tsdbUpdateTableSchema(SMeta* pMeta, int64_t suid, int64_t uid, SSkmInfo* pSkmInfo);
24

25
// STsdbSnapReader ========================================
26
struct STsdbSnapReader {
27
  STsdb*  tsdb;
28
  int64_t sver;
29
  int64_t ever;
30
  int8_t  type;
31

32
  SBuffer  buffers[10];
33
  SSkmInfo skmTb[1];
34

35
  TFileSetRangeArray* fsrArr;
36

37
  // context
38
  struct {
39
    int32_t         fsrArrIdx;
40
    STFileSetRange* fsr;
41
    bool            isDataDone;
42
    bool            isTombDone;
43
  } ctx[1];
44

45
  // reader
46
  SDataFileReader*    dataReader;
47
  TSttFileReaderArray sttReaderArr[1];
48

49
  // iter
50
  TTsdbIterArray dataIterArr[1];
51
  SIterMerger*   dataIterMerger;
52
  TTsdbIterArray tombIterArr[1];
53
  SIterMerger*   tombIterMerger;
54

55
  // data
56
  SBlockData blockData[1];
57
  STombBlock tombBlock[1];
58
};
59

60
static void tsdbSnapReadFileSetCloseReader(STsdbSnapReader* reader) {
962✔
61
  TARRAY2_CLEAR(reader->sttReaderArr, tsdbSttFileReaderClose);
962✔
62
  tsdbDataFileReaderClose(&reader->dataReader);
962✔
63
  return;
962✔
64
}
65

66
static int32_t tsdbSnapReadFileSetOpenReader(STsdbSnapReader* reader) {
962✔
67
  int32_t code = 0;
962✔
68
  int32_t lino = 0;
962✔
69

70
  // data
71
  SDataFileReaderConfig config = {
962✔
72
      .tsdb = reader->tsdb,
962✔
73
      .szPage = reader->tsdb->pVnode->config.tsdbPageSize,
962✔
74
      .buffers = reader->buffers,
962✔
75
  };
76
  bool hasDataFile = false;
962✔
77
  for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ftype++) {
4,810✔
78
    if (reader->ctx->fsr->fset->farr[ftype] != NULL) {
3,848✔
79
      hasDataFile = true;
2,886✔
80
      config.files[ftype].exist = true;
2,886✔
81
      config.files[ftype].file = reader->ctx->fsr->fset->farr[ftype]->f[0];
2,886✔
82
    }
83
  }
84

85
  if (hasDataFile) {
962✔
86
    code = tsdbDataFileReaderOpen(NULL, &config, &reader->dataReader);
962✔
87
    TSDB_CHECK_CODE(code, lino, _exit);
962✔
88
  }
89

90
  // stt
91
  SSttLvl* lvl;
92
  TARRAY2_FOREACH(reader->ctx->fsr->fset->lvlArr, lvl) {
962✔
93
    STFileObj* fobj;
UNCOV
94
    TARRAY2_FOREACH(lvl->fobjArr, fobj) {
×
UNCOV
95
      SSttFileReader*      sttReader;
×
UNCOV
96
      SSttFileReaderConfig config = {
×
UNCOV
97
          .tsdb = reader->tsdb,
×
UNCOV
98
          .szPage = reader->tsdb->pVnode->config.tsdbPageSize,
×
99
          .file = fobj->f[0],
UNCOV
100
          .buffers = reader->buffers,
×
101
      };
102

UNCOV
103
      code = tsdbSttFileReaderOpen(fobj->fname, &config, &sttReader);
×
UNCOV
104
      TSDB_CHECK_CODE(code, lino, _exit);
×
105

UNCOV
106
      if ((code = TARRAY2_APPEND(reader->sttReaderArr, sttReader))) {
×
107
        tsdbSttFileReaderClose(&sttReader);
×
108
        TSDB_CHECK_CODE(code, lino, _exit);
×
109
      }
110
    }
111
  }
112

113
_exit:
962✔
114
  if (code) {
962✔
115
    tsdbSnapReadFileSetCloseReader(reader);
×
116
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
×
117
  }
118
  return code;
962✔
119
}
120

121
static int32_t tsdbSnapReadFileSetOpenIter(STsdbSnapReader* reader) {
962✔
122
  int32_t code = 0;
962✔
123
  int32_t lino = 0;
962✔
124

125
  STsdbIter*      iter;
962✔
126
  STsdbIterConfig config = {
962✔
127
      .filterByVersion = true,
128
      .verRange[0] = reader->ctx->fsr->sver,
962✔
129
      .verRange[1] = reader->ctx->fsr->ever,
962✔
130
  };
131

132
  // data file
133
  if (reader->dataReader) {
962✔
134
    // data
135
    config.type = TSDB_ITER_TYPE_DATA;
962✔
136
    config.dataReader = reader->dataReader;
962✔
137

138
    code = tsdbIterOpen(&config, &iter);
962✔
139
    TSDB_CHECK_CODE(code, lino, _exit);
962✔
140

141
    code = TARRAY2_APPEND(reader->dataIterArr, iter);
962✔
142
    TSDB_CHECK_CODE(code, lino, _exit);
962✔
143

144
    // tomb
145
    config.type = TSDB_ITER_TYPE_DATA_TOMB;
962✔
146
    config.dataReader = reader->dataReader;
962✔
147

148
    code = tsdbIterOpen(&config, &iter);
962✔
149
    TSDB_CHECK_CODE(code, lino, _exit);
962✔
150

151
    code = TARRAY2_APPEND(reader->tombIterArr, iter);
962✔
152
    TSDB_CHECK_CODE(code, lino, _exit);
962✔
153
  }
154

155
  // stt file
156
  SSttFileReader* sttReader;
157
  TARRAY2_FOREACH(reader->sttReaderArr, sttReader) {
962✔
158
    // data
UNCOV
159
    config.type = TSDB_ITER_TYPE_STT;
×
UNCOV
160
    config.sttReader = sttReader;
×
161

UNCOV
162
    code = tsdbIterOpen(&config, &iter);
×
UNCOV
163
    TSDB_CHECK_CODE(code, lino, _exit);
×
164

UNCOV
165
    code = TARRAY2_APPEND(reader->dataIterArr, iter);
×
UNCOV
166
    TSDB_CHECK_CODE(code, lino, _exit);
×
167

168
    // tomb
UNCOV
169
    config.type = TSDB_ITER_TYPE_STT_TOMB;
×
UNCOV
170
    config.sttReader = sttReader;
×
171

UNCOV
172
    code = tsdbIterOpen(&config, &iter);
×
UNCOV
173
    TSDB_CHECK_CODE(code, lino, _exit);
×
174

UNCOV
175
    code = TARRAY2_APPEND(reader->tombIterArr, iter);
×
UNCOV
176
    TSDB_CHECK_CODE(code, lino, _exit);
×
177
  }
178

179
  // merger
180
  code = tsdbIterMergerOpen(reader->dataIterArr, &reader->dataIterMerger, false);
962✔
181
  TSDB_CHECK_CODE(code, lino, _exit);
962✔
182

183
  code = tsdbIterMergerOpen(reader->tombIterArr, &reader->tombIterMerger, true);
962✔
184
  TSDB_CHECK_CODE(code, lino, _exit);
962✔
185

186
_exit:
962✔
187
  if (code) {
962✔
188
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
×
189
  }
190
  return code;
962✔
191
}
192

193
static void tsdbSnapReadFileSetCloseIter(STsdbSnapReader* reader) {
962✔
194
  tsdbIterMergerClose(&reader->dataIterMerger);
962✔
195
  tsdbIterMergerClose(&reader->tombIterMerger);
962✔
196
  TARRAY2_CLEAR(reader->dataIterArr, tsdbIterClose);
1,924✔
197
  TARRAY2_CLEAR(reader->tombIterArr, tsdbIterClose);
1,924✔
198
  return;
962✔
199
}
200

201
static int32_t tsdbSnapReadRangeBegin(STsdbSnapReader* reader) {
1,924✔
202
  int32_t code = 0;
1,924✔
203
  int32_t lino = 0;
1,924✔
204

205
  if (reader->ctx->fsrArrIdx < TARRAY2_SIZE(reader->fsrArr)) {
1,924✔
206
    reader->ctx->fsr = TARRAY2_GET(reader->fsrArr, reader->ctx->fsrArrIdx++);
962✔
207
    reader->ctx->isDataDone = false;
962✔
208
    reader->ctx->isTombDone = false;
962✔
209

210
    code = tsdbSnapReadFileSetOpenReader(reader);
962✔
211
    TSDB_CHECK_CODE(code, lino, _exit);
962✔
212

213
    code = tsdbSnapReadFileSetOpenIter(reader);
962✔
214
    TSDB_CHECK_CODE(code, lino, _exit);
962✔
215
  }
216

217
_exit:
1,924✔
218
  if (code) {
1,924✔
219
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
×
220
  }
221
  return code;
1,924✔
222
}
223

224
static int32_t tsdbSnapReadRangeEnd(STsdbSnapReader* reader) {
962✔
225
  tsdbSnapReadFileSetCloseIter(reader);
962✔
226
  tsdbSnapReadFileSetCloseReader(reader);
962✔
227
  reader->ctx->fsr = NULL;
962✔
228
  return 0;
962✔
229
}
230

231
static int32_t tsdbSnapCmprData(STsdbSnapReader* reader, uint8_t** data) {
5,909✔
232
  int32_t code = 0;
5,909✔
233
  int32_t lino = 0;
5,909✔
234

235
  SColCompressInfo info;
236

237
  SColCompressInfo cmprInfo = {.pColCmpr = NULL, .defaultCmprAlg = NO_COMPRESSION};
5,909✔
238
  code = tBlockDataCompress(reader->blockData, (void*)&cmprInfo, reader->buffers, reader->buffers + 4);
5,909✔
239
  TSDB_CHECK_CODE(code, lino, _exit);
5,909✔
240

241
  int32_t size = 0;
5,909✔
242
  for (int i = 0; i < 4; i++) {
29,545✔
243
    size += reader->buffers[i].size;
23,636✔
244
  }
245
  *data = taosMemoryMalloc(sizeof(SSnapDataHdr) + size);
5,909✔
246
  if (*data == NULL) {
5,909✔
247
    code = terrno;
×
248
    TSDB_CHECK_CODE(code, lino, _exit);
×
249
  }
250

251
  SSnapDataHdr* pHdr = (SSnapDataHdr*)*data;
5,909✔
252
  uint8_t*      pBuf = pHdr->data;
5,909✔
253

254
  pHdr->type = reader->type;
5,909✔
255
  pHdr->size = size;
5,909✔
256
  for (int i = 0; i < 4; i++) {
29,545✔
257
    memcpy(pBuf, reader->buffers[i].data, reader->buffers[i].size);
23,636✔
258
    pBuf += reader->buffers[i].size;
23,636✔
259
  }
260

261
_exit:
5,909✔
262
  if (code) {
5,909✔
263
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), lino, code);
×
264
  }
265
  return code;
5,909✔
266
}
267

268
static int64_t tBlockDataSize(SBlockData* pBlockData) {
4,871,548✔
269
  int64_t nData = 0;
4,871,548✔
270
  for (int32_t iCol = 0; iCol < pBlockData->nColData; iCol++) {
52,565,480✔
271
    SColData* pColData = tBlockDataGetColDataByIdx(pBlockData, iCol);
47,693,932✔
272
    if (pColData->flag == HAS_NONE || pColData->flag == HAS_NULL) {
47,693,932✔
273
      continue;
×
274
    }
275

276
    if (pColData->flag != HAS_VALUE) {
47,693,932✔
277
      if (pColData->flag == (HAS_NONE | HAS_NULL | HAS_VALUE)) {
×
278
        nData += BIT2_SIZE(pColData->nVal);
×
279
      } else {
280
        nData += BIT1_SIZE(pColData->nVal);
×
281
      }
282
    }
283

284
    if (pColData->flag == (HAS_NONE | HAS_NULL)) {
47,693,932✔
285
      continue;
×
286
    }
287

288
    if (IS_VAR_DATA_TYPE(pColData->type)) {
47,693,932✔
289
      nData += pColData->nVal * sizeof(int32_t);  // var data offset
×
290
    }
291

292
    nData += pColData->nData;
47,693,932✔
293
  }
294
  return nData;
4,871,548✔
295
}
296

297
static int32_t tsdbSnapReadTimeSeriesData(STsdbSnapReader* reader, uint8_t** data) {
6,871✔
298
  int32_t   code = 0;
6,871✔
299
  int32_t   lino = 0;
6,871✔
300
  SMetaInfo info;
6,871✔
301

302
  tBlockDataReset(reader->blockData);
6,871✔
303

304
  TABLEID tbid[1] = {0};
6,871✔
305
  for (SRowInfo* row; (row = tsdbIterMergerGetData(reader->dataIterMerger));) {
77,951,924✔
306
    // skip dropped table
307
    if (row->uid != tbid->uid) {
77,950,000✔
308
      tbid->suid = row->suid;
14,377✔
309
      tbid->uid = row->uid;
14,377✔
310
      if (metaGetInfo(reader->tsdb->pVnode->pMeta, tbid->uid, &info, NULL) != 0) {
14,377✔
311
        code = tsdbIterMergerSkipTableData(reader->dataIterMerger, tbid);
×
312
        TSDB_CHECK_CODE(code, lino, _exit);
×
313
        continue;
×
314
      }
315
    }
316

317
    if (reader->blockData->suid == 0 && reader->blockData->uid == 0) {
77,950,000✔
318
      code = tsdbUpdateSkmTb(reader->tsdb, (TABLEID*)row, reader->skmTb);
5,909✔
319
      TSDB_CHECK_CODE(code, lino, _exit);
5,909✔
320

321
      TABLEID tbid1 = {
5,909✔
322
          .suid = row->suid,
5,909✔
323
          .uid = row->suid ? 0 : row->uid,
5,909✔
324
      };
325
      code = tBlockDataInit(reader->blockData, &tbid1, reader->skmTb->pTSchema, NULL, 0);
5,909✔
326
      TSDB_CHECK_CODE(code, lino, _exit);
5,909✔
327
    }
328

329
    if (!TABLE_SAME_SCHEMA(reader->blockData->suid, reader->blockData->uid, row->suid, row->uid)) {
77,950,000✔
330
      break;
×
331
    }
332

333
    code = tBlockDataAppendRow(reader->blockData, &row->row, NULL, row->uid);
77,950,000✔
334
    TSDB_CHECK_CODE(code, lino, _exit);
77,950,000✔
335

336
    code = tsdbIterMergerNext(reader->dataIterMerger);
77,950,000✔
337
    TSDB_CHECK_CODE(code, lino, _exit);
77,950,000✔
338

339
    if (!(reader->blockData->nRow % 16)) {
77,950,000✔
340
      int64_t nData = tBlockDataSize(reader->blockData);
4,871,548✔
341
      if (nData >= TSDB_SNAP_DATA_PAYLOAD_SIZE) {
4,871,548✔
342
        break;
4,947✔
343
      }
344
    }
345
  }
346

347
  if (reader->blockData->nRow > 0) {
6,871✔
348
    code = tsdbSnapCmprData(reader, data);
5,909✔
349
    TSDB_CHECK_CODE(code, lino, _exit);
5,909✔
350
  }
351

352
_exit:
6,871✔
353
  if (code) {
6,871✔
354
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
×
355
  } else {
356
    tsdbDebug("vgId:%d, tsdb snapshot read time-series data done, data size:%" PRId64 "", TD_VID(reader->tsdb->pVnode),
6,871✔
357
              data[0] ? ((SSnapDataHdr*)(data[0]))->size : 0);
358
  }
359
  return code;
6,871✔
360
}
361

362
static int32_t tsdbSnapCmprTombData(STsdbSnapReader* reader, uint8_t** data) {
×
363
  int32_t code = 0;
×
364
  int32_t lino = 0;
×
365

366
  int64_t size = 0;
×
367
  for (int32_t i = 0; i < ARRAY_SIZE(reader->tombBlock->buffers); i++) {
×
368
    size += reader->tombBlock->buffers[i].size;
×
369
  }
370

371
  data[0] = taosMemoryMalloc(size + sizeof(SSnapDataHdr));
×
372
  if (data[0] == NULL) {
×
373
    code = terrno;
×
374
    TSDB_CHECK_CODE(code, lino, _exit);
×
375
  }
376

377
  SSnapDataHdr* hdr = (SSnapDataHdr*)(data[0]);
×
378
  hdr->type = SNAP_DATA_DEL;
×
379
  hdr->size = size;
×
380

381
  uint8_t* tdata = hdr->data;
×
382
  for (int32_t i = 0; i < ARRAY_SIZE(reader->tombBlock->buffers); i++) {
×
383
    memcpy(tdata, reader->tombBlock->buffers[i].data, reader->tombBlock->buffers[i].size);
×
384
    tdata += reader->tombBlock->buffers[i].size;
×
385
  }
386

387
_exit:
×
388
  if (code) {
×
389
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
×
390
  }
391
  return code;
×
392
}
393

394
static int32_t tsdbSnapReadTombData(STsdbSnapReader* reader, uint8_t** data) {
962✔
395
  int32_t   code = 0;
962✔
396
  int32_t   lino = 0;
962✔
397
  SMetaInfo info;
962✔
398

399
  tTombBlockClear(reader->tombBlock);
962✔
400

401
  TABLEID tbid[1] = {0};
962✔
402
  for (STombRecord* record; (record = tsdbIterMergerGetTombRecord(reader->tombIterMerger)) != NULL;) {
962✔
403
    if (record->uid != tbid->uid) {
×
404
      tbid->suid = record->suid;
×
405
      tbid->uid = record->uid;
×
406
      if (metaGetInfo(reader->tsdb->pVnode->pMeta, tbid->uid, &info, NULL) != 0) {
×
407
        code = tsdbIterMergerSkipTableData(reader->tombIterMerger, tbid);
×
408
        TSDB_CHECK_CODE(code, lino, _exit);
×
409
        continue;
×
410
      }
411
    }
412

413
    code = tTombBlockPut(reader->tombBlock, record);
×
414
    TSDB_CHECK_CODE(code, lino, _exit);
×
415

416
    code = tsdbIterMergerNext(reader->tombIterMerger);
×
417
    TSDB_CHECK_CODE(code, lino, _exit);
×
418

419
    if (TOMB_BLOCK_SIZE(reader->tombBlock) >= 81920) {
×
420
      break;
×
421
    }
422
  }
423

424
  if (TOMB_BLOCK_SIZE(reader->tombBlock) > 0) {
962✔
425
    code = tsdbSnapCmprTombData(reader, data);
×
426
    TSDB_CHECK_CODE(code, lino, _exit);
×
427
  }
428

429
_exit:
962✔
430
  if (code) {
962✔
431
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
×
432
  }
433
  return code;
962✔
434
}
435

436
int32_t tsdbSnapReaderOpen(STsdb* tsdb, int64_t sver, int64_t ever, int8_t type, void* pRanges,
962✔
437
                           STsdbSnapReader** reader) {
438
  int32_t code = 0;
962✔
439
  int32_t lino = 0;
962✔
440

441
  reader[0] = (STsdbSnapReader*)taosMemoryCalloc(1, sizeof(*reader[0]));
962✔
442
  if (reader[0] == NULL) return terrno;
962✔
443

444
  reader[0]->tsdb = tsdb;
962✔
445
  reader[0]->sver = sver;
962✔
446
  reader[0]->ever = ever;
962✔
447
  reader[0]->type = type;
962✔
448

449
  code = tsdbFSCreateRefRangedSnapshot(tsdb->pFS, sver, ever, (TFileSetRangeArray*)pRanges, &reader[0]->fsrArr);
962✔
450
  TSDB_CHECK_CODE(code, lino, _exit);
962✔
451

452
_exit:
962✔
453
  if (code) {
962✔
454
    tsdbError("vgId:%d %s failed at %s:%d since %s, sver:%" PRId64 " ever:%" PRId64 " type:%d", TD_VID(tsdb->pVnode),
×
455
              __func__, __FILE__, lino, tstrerror(code), sver, ever, type);
456
    tsdbTFileSetRangeArrayDestroy(&reader[0]->fsrArr);
×
457
    taosMemoryFree(reader[0]);
×
458
    reader[0] = NULL;
×
459
  } else {
460
    tsdbInfo("vgId:%d, tsdb snapshot incremental reader opened. sver:%" PRId64 " ever:%" PRId64 " type:%d",
962✔
461
             TD_VID(tsdb->pVnode), sver, ever, type);
462
  }
463
  return code;
962✔
464
}
465

466
void tsdbSnapReaderClose(STsdbSnapReader** reader) {
962✔
467
  if (reader[0] == NULL) {
962✔
468
    return;
×
469
  }
470

471
  int32_t code = 0;
962✔
472

473
  STsdb* tsdb = reader[0]->tsdb;
962✔
474

475
  tTombBlockDestroy(reader[0]->tombBlock);
962✔
476
  tBlockDataDestroy(reader[0]->blockData);
962✔
477

478
  tsdbIterMergerClose(&reader[0]->dataIterMerger);
962✔
479
  tsdbIterMergerClose(&reader[0]->tombIterMerger);
962✔
480
  TARRAY2_DESTROY(reader[0]->dataIterArr, tsdbIterClose);
962✔
481
  TARRAY2_DESTROY(reader[0]->tombIterArr, tsdbIterClose);
962✔
482
  TARRAY2_DESTROY(reader[0]->sttReaderArr, tsdbSttFileReaderClose);
962✔
483
  tsdbDataFileReaderClose(&reader[0]->dataReader);
962✔
484

485
  tsdbFSDestroyRefRangedSnapshot(&reader[0]->fsrArr);
962✔
486
  tDestroyTSchema(reader[0]->skmTb->pTSchema);
962✔
487

488
  for (int32_t i = 0; i < ARRAY_SIZE(reader[0]->buffers); ++i) {
10,582✔
489
    tBufferDestroy(reader[0]->buffers + i);
9,620✔
490
  }
491

492
  taosMemoryFree(reader[0]);
962✔
493
  reader[0] = NULL;
962✔
494

495
  return;
962✔
496
}
497

498
int32_t tsdbSnapRead(STsdbSnapReader* reader, uint8_t** data) {
6,871✔
499
  int32_t code = 0;
6,871✔
500
  int32_t lino = 0;
6,871✔
501

502
  data[0] = NULL;
6,871✔
503

504
  for (;;) {
505
    if (reader->ctx->fsr == NULL) {
7,833✔
506
      code = tsdbSnapReadRangeBegin(reader);
1,924✔
507
      TSDB_CHECK_CODE(code, lino, _exit);
1,924✔
508

509
      if (reader->ctx->fsr == NULL) {
1,924✔
510
        break;
962✔
511
      }
512
    }
513

514
    if (!reader->ctx->isDataDone) {
6,871✔
515
      code = tsdbSnapReadTimeSeriesData(reader, data);
6,871✔
516
      TSDB_CHECK_CODE(code, lino, _exit);
6,871✔
517
      if (data[0]) {
6,871✔
518
        goto _exit;
5,909✔
519
      } else {
520
        reader->ctx->isDataDone = true;
962✔
521
      }
522
    }
523

524
    if (!reader->ctx->isTombDone) {
962✔
525
      code = tsdbSnapReadTombData(reader, data);
962✔
526
      TSDB_CHECK_CODE(code, lino, _exit);
962✔
527
      if (data[0]) {
962✔
528
        goto _exit;
×
529
      } else {
530
        reader->ctx->isTombDone = true;
962✔
531
      }
532
    }
533

534
    code = tsdbSnapReadRangeEnd(reader);
962✔
535
    TSDB_CHECK_CODE(code, lino, _exit);
962✔
536
  }
537

538
_exit:
6,871✔
539
  if (code) {
6,871✔
540
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
×
541
  } else {
542
    tsdbDebug("vgId:%d %s done", TD_VID(reader->tsdb->pVnode), __func__);
6,871✔
543
  }
544
  return code;
6,871✔
545
}
546

547
// STsdbSnapWriter ========================================
548
struct STsdbSnapWriter {
549
  STsdb*  tsdb;
550
  int64_t sver;
551
  int64_t ever;
552
  int32_t minutes;
553
  int8_t  precision;
554
  int32_t minRow;
555
  int32_t maxRow;
556
  int8_t  cmprAlg;
557
  int64_t commitID;
558
  int32_t szPage;
559
  int64_t compactVersion;
560
  int64_t now;
561
  SBuffer buffers[10];
562

563
  TFileSetArray* fsetArr;
564
  TFileOpArray   fopArr[1];
565

566
  struct {
567
    bool       fsetWriteBegin;
568
    int32_t    fid;
569
    STFileSet* fset;
570
    int32_t    expLevel;
571
    bool       hasData;  // if have time series data
572
    bool       hasTomb;  // if have tomb data
573

574
    // reader
575
    SDataFileReader*    dataReader;
576
    TSttFileReaderArray sttReaderArr[1];
577

578
    // iter/merger
579
    TTsdbIterArray dataIterArr[1];
580
    SIterMerger*   dataIterMerger;
581
    TTsdbIterArray tombIterArr[1];
582
    SIterMerger*   tombIterMerger;
583

584
    // writer
585
    bool         toSttOnly;
586
    SFSetWriter* fsetWriter;
587
  } ctx[1];
588
};
589

590
// APIs
591
static int32_t tsdbSnapWriteTimeSeriesRow(STsdbSnapWriter* writer, SRowInfo* row) {
32,357,054✔
592
  int32_t   code = 0;
32,357,054✔
593
  int32_t   lino = 0;
32,357,054✔
594
  TABLEID   tbid = {0};
32,357,054✔
595
  SMetaInfo info;
32,357,054✔
596

597
  while (writer->ctx->hasData) {
53,927,054✔
598
    SRowInfo* row1;
599
    for (;;) {
600
      row1 = tsdbIterMergerGetData(writer->ctx->dataIterMerger);
50,657,054✔
601
      if (row1 == NULL) {
50,657,054✔
602
        writer->ctx->hasData = false;
654✔
603
      } else if (row1->uid != tbid.uid) {
50,656,400✔
604
        tbid.suid = row1->suid;
29,091,500✔
605
        tbid.uid = row1->uid;
29,091,500✔
606
        if (metaGetInfo(writer->tsdb->pVnode->pMeta, tbid.uid, &info, NULL) != 0) {
29,091,500✔
UNCOV
607
          code = tsdbIterMergerSkipTableData(writer->ctx->dataIterMerger, &tbid);
×
UNCOV
608
          TSDB_CHECK_CODE(code, lino, _exit);
×
UNCOV
609
          continue;
×
610
        }
611
      }
612
      break;
50,657,054✔
613
    }
614

615
    if (writer->ctx->hasData == false) {
50,657,054✔
616
      break;
654✔
617
    }
618

619
    int32_t c = tRowInfoCmprFn(row1, row);
50,656,400✔
620
    if (c <= 0) {
50,656,400✔
621
      code = tsdbFSetWriteRow(writer->ctx->fsetWriter, row1);
21,570,000✔
622
      TSDB_CHECK_CODE(code, lino, _exit);
21,570,000✔
623

624
      code = tsdbIterMergerNext(writer->ctx->dataIterMerger);
21,570,000✔
625
      TSDB_CHECK_CODE(code, lino, _exit);
21,570,000✔
626
    } else {
627
      break;
29,086,400✔
628
    }
629
  }
630

631
  if (row->suid == INT64_MAX) {
32,357,054✔
632
    goto _exit;
654✔
633
  }
634

635
  code = tsdbFSetWriteRow(writer->ctx->fsetWriter, row);
32,356,400✔
636
  TSDB_CHECK_CODE(code, lino, _exit);
32,356,400✔
637

638
_exit:
32,357,054✔
639
  if (code) {
32,357,054✔
640
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
641
  }
642
  return code;
32,357,054✔
643
}
644

645
static int32_t tsdbSnapWriteFileSetOpenReader(STsdbSnapWriter* writer) {
959✔
646
  int32_t code = 0;
959✔
647
  int32_t lino = 0;
959✔
648

649
  writer->ctx->toSttOnly = false;
959✔
650
  if (writer->ctx->fset) {
959✔
651
#if 0
652
    // open data reader
653
    SDataFileReaderConfig dataFileReaderConfig = {
654
        .tsdb = writer->tsdb,
655
        .buffers = writer->buffers,
656
        .szPage = writer->szPage,
657
    };
658

659
    for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ++ftype) {
660
      if (writer->ctx->fset->farr[ftype] == NULL) {
661
        continue;
662
      }
663

664
      dataFileReaderConfig.files[ftype].exist = true;
665
      dataFileReaderConfig.files[ftype].file = writer->ctx->fset->farr[ftype]->f[0];
666

667
      STFileOp fileOp = {
668
          .optype = TSDB_FOP_REMOVE,
669
          .fid = writer->ctx->fset->fid,
670
          .of = writer->ctx->fset->farr[ftype]->f[0],
671
      };
672

673
      code = TARRAY2_APPEND(writer->fopArr, fileOp);
674
      TSDB_CHECK_CODE(code, lino, _exit);
675
    }
676

677
    code = tsdbDataFileReaderOpen(NULL, &dataFileReaderConfig, &writer->ctx->dataReader);
678
    TSDB_CHECK_CODE(code, lino, _exit);
679
#endif
680

681
    // open stt reader array
682
    SSttLvl* lvl;
683
    TARRAY2_FOREACH(writer->ctx->fset->lvlArr, lvl) {
1,918✔
684
      if (lvl->level != 0) {
959✔
UNCOV
685
        if (TARRAY2_SIZE(lvl->fobjArr) > 0) {
×
UNCOV
686
          writer->ctx->toSttOnly = true;
×
687
        }
688

UNCOV
689
        continue;  // Only merge level 0
×
690
      }
691

692
      STFileObj* fobj;
693
      TARRAY2_FOREACH(lvl->fobjArr, fobj) {
1,918✔
694
        SSttFileReader*      reader;
959✔
695
        SSttFileReaderConfig sttFileReaderConfig = {
1,918✔
696
            .tsdb = writer->tsdb,
959✔
697
            .szPage = writer->szPage,
959✔
698
            .buffers = writer->buffers,
959✔
699
            .file = fobj->f[0],
700
        };
701

702
        code = tsdbSttFileReaderOpen(fobj->fname, &sttFileReaderConfig, &reader);
959✔
703
        TSDB_CHECK_CODE(code, lino, _exit);
959✔
704

705
        code = TARRAY2_APPEND(writer->ctx->sttReaderArr, reader);
959✔
706
        TSDB_CHECK_CODE(code, lino, _exit);
959✔
707

708
        STFileOp fileOp = {
1,918✔
709
            .optype = TSDB_FOP_REMOVE,
710
            .fid = fobj->f->fid,
959✔
711
            .of = fobj->f[0],
712
        };
713

714
        code = TARRAY2_APPEND(writer->fopArr, fileOp);
959✔
715
        TSDB_CHECK_CODE(code, lino, _exit);
959✔
716
      }
717
    }
718
  }
719

720
_exit:
959✔
721
  if (code) {
959✔
722
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
723
  }
724
  return code;
959✔
725
}
726

727
static int32_t tsdbSnapWriteFileSetCloseReader(STsdbSnapWriter* writer) {
959✔
728
  TARRAY2_CLEAR(writer->ctx->sttReaderArr, tsdbSttFileReaderClose);
1,918✔
729
  tsdbDataFileReaderClose(&writer->ctx->dataReader);
959✔
730
  return 0;
959✔
731
}
732

733
static int32_t tsdbSnapWriteFileSetOpenIter(STsdbSnapWriter* writer) {
959✔
734
  int32_t code = 0;
959✔
735
  int32_t lino = 0;
959✔
736

737
  // data ieter
738
  if (writer->ctx->dataReader) {
959✔
739
    STsdbIter*      iter;
×
740
    STsdbIterConfig config = {0};
×
741

742
    // data
743
    config.type = TSDB_ITER_TYPE_DATA;
×
744
    config.dataReader = writer->ctx->dataReader;
×
745

746
    code = tsdbIterOpen(&config, &iter);
×
747
    TSDB_CHECK_CODE(code, lino, _exit);
×
748

749
    code = TARRAY2_APPEND(writer->ctx->dataIterArr, iter);
×
750
    TSDB_CHECK_CODE(code, lino, _exit);
×
751

752
    // tome
753
    config.type = TSDB_ITER_TYPE_DATA_TOMB;
×
754
    config.dataReader = writer->ctx->dataReader;
×
755

756
    code = tsdbIterOpen(&config, &iter);
×
757
    TSDB_CHECK_CODE(code, lino, _exit);
×
758

759
    code = TARRAY2_APPEND(writer->ctx->tombIterArr, iter);
×
760
    TSDB_CHECK_CODE(code, lino, _exit);
×
761
  }
762

763
  // stt iter
764
  SSttFileReader* sttFileReader;
765
  TARRAY2_FOREACH(writer->ctx->sttReaderArr, sttFileReader) {
1,918✔
766
    STsdbIter*      iter;
959✔
767
    STsdbIterConfig config = {0};
959✔
768

769
    // data
770
    config.type = TSDB_ITER_TYPE_STT;
959✔
771
    config.sttReader = sttFileReader;
959✔
772

773
    code = tsdbIterOpen(&config, &iter);
959✔
774
    TSDB_CHECK_CODE(code, lino, _exit);
959✔
775

776
    code = TARRAY2_APPEND(writer->ctx->dataIterArr, iter);
959✔
777
    TSDB_CHECK_CODE(code, lino, _exit);
959✔
778

779
    // tomb
780
    config.type = TSDB_ITER_TYPE_STT_TOMB;
959✔
781
    config.sttReader = sttFileReader;
959✔
782

783
    code = tsdbIterOpen(&config, &iter);
959✔
784
    TSDB_CHECK_CODE(code, lino, _exit);
959✔
785

786
    code = TARRAY2_APPEND(writer->ctx->tombIterArr, iter);
959✔
787
    TSDB_CHECK_CODE(code, lino, _exit);
959✔
788
  }
789

790
  // open merger
791
  code = tsdbIterMergerOpen(writer->ctx->dataIterArr, &writer->ctx->dataIterMerger, false);
959✔
792
  TSDB_CHECK_CODE(code, lino, _exit);
959✔
793

794
  code = tsdbIterMergerOpen(writer->ctx->tombIterArr, &writer->ctx->tombIterMerger, true);
959✔
795
  TSDB_CHECK_CODE(code, lino, _exit);
959✔
796

797
_exit:
959✔
798
  if (code) {
959✔
799
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
800
  }
801
  return code;
959✔
802
}
803

804
static int32_t tsdbSnapWriteFileSetCloseIter(STsdbSnapWriter* writer) {
959✔
805
  tsdbIterMergerClose(&writer->ctx->dataIterMerger);
959✔
806
  tsdbIterMergerClose(&writer->ctx->tombIterMerger);
959✔
807
  TARRAY2_CLEAR(writer->ctx->dataIterArr, tsdbIterClose);
1,918✔
808
  TARRAY2_CLEAR(writer->ctx->tombIterArr, tsdbIterClose);
1,918✔
809
  return 0;
959✔
810
}
811

812
static int32_t tsdbSnapWriteFileSetOpenWriter(STsdbSnapWriter* writer) {
959✔
813
  int32_t code = 0;
959✔
814
  int32_t lino = 0;
959✔
815

816
  SFSetWriterConfig config = {
959✔
817
      .tsdb = writer->tsdb,
959✔
818
      .toSttOnly = writer->ctx->toSttOnly,
959✔
819
      .compactVersion = writer->compactVersion,
959✔
820
      .minRow = writer->minRow,
959✔
821
      .maxRow = writer->maxRow,
959✔
822
      .szPage = writer->szPage,
959✔
823
      .cmprAlg = writer->cmprAlg,
959✔
824
      .fid = writer->ctx->fid,
959✔
825
      .cid = writer->commitID,
959✔
826
      .expLevel = writer->ctx->expLevel,
959✔
827
      .level = writer->ctx->toSttOnly ? 1 : 0,
959✔
828
  };
829
  // merge stt files to either data or a new stt file
830
  if (writer->ctx->fset) {
959✔
831
    for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ++ftype) {
4,795✔
832
      if (writer->ctx->fset->farr[ftype] != NULL) {
3,836✔
UNCOV
833
        config.files[ftype].exist = true;
×
UNCOV
834
        config.files[ftype].file = writer->ctx->fset->farr[ftype]->f[0];
×
835
      }
836
    }
837
  }
838

839
  code = tsdbFSetWriterOpen(&config, &writer->ctx->fsetWriter);
959✔
840
  TSDB_CHECK_CODE(code, lino, _exit);
959✔
841

842
_exit:
959✔
843
  if (code) {
959✔
844
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
845
  }
846
  return code;
959✔
847
}
848

849
static int32_t tsdbSnapWriteFileSetCloseWriter(STsdbSnapWriter* writer) {
959✔
850
  return tsdbFSetWriterClose(&writer->ctx->fsetWriter, 0, writer->fopArr);
959✔
851
}
852

853
static int32_t tsdbSnapWriteFileSetBegin(STsdbSnapWriter* writer, int32_t fid) {
959✔
854
  int32_t code = 0;
959✔
855
  int32_t lino = 0;
959✔
856

857
  STFileSet* fset = &(STFileSet){.fid = fid};
959✔
858

859
  writer->ctx->fid = fid;
959✔
860
  STFileSet** fsetPtr = TARRAY2_SEARCH(writer->fsetArr, &fset, tsdbTFileSetCmprFn, TD_EQ);
959✔
861
  writer->ctx->fset = (fsetPtr == NULL) ? NULL : *fsetPtr;
959✔
862

863
  writer->ctx->expLevel = tsdbFidLevel(fid, &writer->tsdb->keepCfg, taosGetTimestampSec());
959✔
864

865
  writer->ctx->hasData = true;
959✔
866
  writer->ctx->hasTomb = true;
959✔
867

868
  code = tsdbSnapWriteFileSetOpenReader(writer);
959✔
869
  TSDB_CHECK_CODE(code, lino, _exit);
959✔
870

871
  code = tsdbSnapWriteFileSetOpenIter(writer);
959✔
872
  TSDB_CHECK_CODE(code, lino, _exit);
959✔
873

874
  code = tsdbSnapWriteFileSetOpenWriter(writer);
959✔
875
  TSDB_CHECK_CODE(code, lino, _exit);
959✔
876

877
  writer->ctx->fsetWriteBegin = true;
959✔
878

879
_exit:
959✔
880
  if (code) {
959✔
881
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
882
  } else {
883
    tsdbInfo("vgId:%d %s succeeded, fid:%d", TD_VID(writer->tsdb->pVnode), __func__, fid);
959✔
884
  }
885
  return code;
959✔
886
}
887

888
static int32_t tsdbSnapWriteTombRecord(STsdbSnapWriter* writer, const STombRecord* record) {
654✔
889
  int32_t code = 0;
654✔
890
  int32_t lino = 0;
654✔
891

892
  while (writer->ctx->hasTomb) {
654✔
893
    STombRecord* record1 = tsdbIterMergerGetTombRecord(writer->ctx->tombIterMerger);
654✔
894
    if (record1 == NULL) {
654✔
895
      writer->ctx->hasTomb = false;
654✔
896
      break;
654✔
897
    }
898

899
    int32_t c = tTombRecordCompare(record1, record);
×
900
    if (c <= 0) {
×
901
      code = tsdbFSetWriteTombRecord(writer->ctx->fsetWriter, record1);
×
902
      TSDB_CHECK_CODE(code, lino, _exit);
×
903
    } else {
904
      break;
×
905
    }
906

907
    code = tsdbIterMergerNext(writer->ctx->tombIterMerger);
×
908
    TSDB_CHECK_CODE(code, lino, _exit);
×
909
  }
910

911
  if (record->suid == INT64_MAX) {
654✔
912
    goto _exit;
654✔
913
  }
914

915
  code = tsdbFSetWriteTombRecord(writer->ctx->fsetWriter, record);
×
916
  TSDB_CHECK_CODE(code, lino, _exit);
×
917

918
_exit:
×
919
  if (code) {
654✔
920
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
921
  }
922
  return code;
654✔
923
}
924

925
static int32_t tsdbSnapWriteFileSetEnd(STsdbSnapWriter* writer) {
1,613✔
926
  if (!writer->ctx->fsetWriteBegin) return 0;
1,613✔
927

928
  int32_t code = 0;
654✔
929
  int32_t lino = 0;
654✔
930

931
  // end timeseries data write
932
  SRowInfo row = {
654✔
933
      .suid = INT64_MAX,
934
      .uid = INT64_MAX,
935
  };
936

937
  code = tsdbSnapWriteTimeSeriesRow(writer, &row);
654✔
938
  TSDB_CHECK_CODE(code, lino, _exit);
654✔
939

940
  // end tombstone data write
941
  STombRecord record = {
654✔
942
      .suid = INT64_MAX,
943
      .uid = INT64_MAX,
944
  };
945

946
  code = tsdbSnapWriteTombRecord(writer, &record);
654✔
947
  TSDB_CHECK_CODE(code, lino, _exit);
654✔
948

949
  // close write
950
  code = tsdbSnapWriteFileSetCloseWriter(writer);
654✔
951
  TSDB_CHECK_CODE(code, lino, _exit);
654✔
952

953
  code = tsdbSnapWriteFileSetCloseIter(writer);
654✔
954
  TSDB_CHECK_CODE(code, lino, _exit);
654✔
955

956
  code = tsdbSnapWriteFileSetCloseReader(writer);
654✔
957
  TSDB_CHECK_CODE(code, lino, _exit);
654✔
958

959
  writer->ctx->fsetWriteBegin = false;
654✔
960

961
_exit:
654✔
962
  if (code) {
654✔
963
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
964
  } else {
965
    tsdbInfo("vgId:%d %s succeeded, fid:%d", TD_VID(writer->tsdb->pVnode), __func__, writer->ctx->fid);
654✔
966
  }
967
  return code;
654✔
968
}
969

970
static int32_t tsdbSnapWriteFileSetAbort(STsdbSnapWriter* writer) {
305✔
971
  if (!writer->ctx->fsetWriteBegin) return 0;
305✔
972

973
  int32_t code = 0;
305✔
974
  int32_t lino = 0;
305✔
975

976
  // close write
977
  code = tsdbSnapWriteFileSetCloseWriter(writer);
305✔
978
  TSDB_CHECK_CODE(code, lino, _exit);
305✔
979

980
  code = tsdbSnapWriteFileSetCloseIter(writer);
305✔
981
  TSDB_CHECK_CODE(code, lino, _exit);
305✔
982

983
  code = tsdbSnapWriteFileSetCloseReader(writer);
305✔
984
  TSDB_CHECK_CODE(code, lino, _exit);
305✔
985

986
  writer->ctx->fsetWriteBegin = false;
305✔
987

988
_exit:
305✔
989
  if (code) {
305✔
990
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
991
  }
992
  return code;
305✔
993
}
994

995
static int32_t tsdbSnapWriteTimeSeriesData(STsdbSnapWriter* writer, SSnapDataHdr* hdr) {
2,201✔
996
  int32_t code = 0;
2,201✔
997
  int32_t lino = 0;
2,201✔
998

999
  SBlockData blockData[1] = {0};
2,201✔
1000

1001
  SBuffer buffer = {
2,201✔
1002
      .capacity = hdr->size,
2,201✔
1003
      .data = hdr->data,
2,201✔
1004
      .size = hdr->size,
2,201✔
1005
  };
1006
  SBufferReader br = BUFFER_READER_INITIALIZER(0, &buffer);
2,201✔
1007

1008
  code = tBlockDataDecompress(&br, blockData, &writer->buffers[0]);
2,201✔
1009
  TSDB_CHECK_CODE(code, lino, _exit);
2,201✔
1010

1011
  int32_t fid = tsdbKeyFid(blockData->aTSKEY[0], writer->minutes, writer->precision);
2,201✔
1012
  if (!writer->ctx->fsetWriteBegin || fid != writer->ctx->fid) {
2,201✔
1013
    code = tsdbSnapWriteFileSetEnd(writer);
959✔
1014
    TSDB_CHECK_CODE(code, lino, _exit);
959✔
1015

1016
    code = tsdbSnapWriteFileSetBegin(writer, fid);
959✔
1017
    TSDB_CHECK_CODE(code, lino, _exit);
959✔
1018
  }
1019

1020
  for (int32_t i = 0; i < blockData->nRow; ++i) {
32,358,601✔
1021
    SRowInfo rowInfo = {
32,356,400✔
1022
        .suid = blockData->suid,
32,356,400✔
1023
        .uid = blockData->uid ? blockData->uid : blockData->aUid[i],
32,356,400✔
1024
        .row = tsdbRowFromBlockData(blockData, i),
1025
    };
1026

1027
    code = tsdbSnapWriteTimeSeriesRow(writer, &rowInfo);
32,356,400✔
1028
    TSDB_CHECK_CODE(code, lino, _exit);
32,356,400✔
1029
  }
1030

1031
_exit:
2,201✔
1032
  if (code) {
2,201✔
1033
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
1034
  } else {
1035
    tsdbDebug("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64 " nRow:%d", TD_VID(writer->tsdb->pVnode), __func__,
2,201✔
1036
              blockData->suid, blockData->uid, blockData->nRow);
1037
  }
1038
  tBlockDataDestroy(blockData);
2,201✔
1039
  return code;
2,201✔
1040
}
1041

1042
static int32_t tsdbSnapWriteDecmprTombBlock(SSnapDataHdr* hdr, STombBlock* tombBlock) {
×
1043
  int32_t code = 0;
×
1044
  int32_t lino = 0;
×
1045

1046
  tTombBlockClear(tombBlock);
×
1047

1048
  int64_t size = hdr->size;
×
1049
  size = size / TOMB_RECORD_ELEM_NUM;
×
1050
  tombBlock->numOfRecords = size / sizeof(int64_t);
×
1051

1052
  // int64_t* data = (int64_t*)hdr->data;
1053
  uint8_t* data = hdr->data;
×
1054
  for (int32_t i = 0; i < TOMB_RECORD_ELEM_NUM; ++i) {
×
1055
    code = tBufferPut(tombBlock->buffers + i, data, size);
×
1056
    TSDB_CHECK_CODE(code, lino, _exit);
×
1057
    data += size;
×
1058
  }
1059

1060
_exit:
×
1061
  return code;
×
1062
}
1063

1064
static int32_t tsdbSnapWriteTombData(STsdbSnapWriter* writer, SSnapDataHdr* hdr) {
×
1065
  int32_t code = 0;
×
1066
  int32_t lino = 0;
×
1067

1068
  STombRecord record;
×
1069
  STombBlock  tombBlock[1] = {0};
×
1070

1071
  code = tsdbSnapWriteDecmprTombBlock(hdr, tombBlock);
×
1072
  TSDB_CHECK_CODE(code, lino, _exit);
×
1073

1074
  code = tTombBlockGet(tombBlock, 0, &record);
×
1075
  TSDB_CHECK_CODE(code, lino, _exit);
×
1076
  int32_t fid = tsdbKeyFid(record.skey, writer->minutes, writer->precision);
×
1077
  if (!writer->ctx->fsetWriteBegin || fid != writer->ctx->fid) {
×
1078
    code = tsdbSnapWriteFileSetEnd(writer);
×
1079
    TSDB_CHECK_CODE(code, lino, _exit);
×
1080

1081
    code = tsdbSnapWriteFileSetBegin(writer, fid);
×
1082
    TSDB_CHECK_CODE(code, lino, _exit);
×
1083
  }
1084

1085
  if (writer->ctx->hasData) {
×
1086
    SRowInfo row = {
×
1087
        .suid = INT64_MAX,
1088
        .uid = INT64_MAX,
1089
    };
1090

1091
    code = tsdbSnapWriteTimeSeriesRow(writer, &row);
×
1092
    TSDB_CHECK_CODE(code, lino, _exit);
×
1093
  }
1094

1095
  for (int32_t i = 0; i < TOMB_BLOCK_SIZE(tombBlock); ++i) {
×
1096
    code = tTombBlockGet(tombBlock, i, &record);
×
1097
    TSDB_CHECK_CODE(code, lino, _exit);
×
1098

1099
    code = tsdbSnapWriteTombRecord(writer, &record);
×
1100
    TSDB_CHECK_CODE(code, lino, _exit);
×
1101
  }
1102

1103
  tTombBlockDestroy(tombBlock);
×
1104

1105
_exit:
×
1106
  if (code) {
×
1107
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
1108
  }
1109
  return code;
×
1110
}
1111

1112
int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, void* pRanges, STsdbSnapWriter** writer) {
959✔
1113
  int32_t code = 0;
959✔
1114
  int32_t lino = 0;
959✔
1115

1116
  // start to write
1117
  writer[0] = taosMemoryCalloc(1, sizeof(*writer[0]));
959✔
1118
  if (writer[0] == NULL) return terrno;
959✔
1119

1120
  writer[0]->tsdb = pTsdb;
959✔
1121
  writer[0]->sver = sver;
959✔
1122
  writer[0]->ever = ever;
959✔
1123
  writer[0]->minutes = pTsdb->keepCfg.days;
959✔
1124
  writer[0]->precision = pTsdb->keepCfg.precision;
959✔
1125
  writer[0]->minRow = pTsdb->pVnode->config.tsdbCfg.minRows;
959✔
1126
  writer[0]->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows;
959✔
1127
  writer[0]->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression;
959✔
1128
  writer[0]->commitID = tsdbFSAllocEid(pTsdb->pFS);
959✔
1129
  writer[0]->szPage = pTsdb->pVnode->config.tsdbPageSize;
959✔
1130
  writer[0]->compactVersion = INT64_MAX;
959✔
1131
  writer[0]->now = taosGetTimestampMs();
1,918✔
1132

1133
  code =
1134
      tsdbFSCreateCopyRangedSnapshot(pTsdb->pFS, (TFileSetRangeArray*)pRanges, &writer[0]->fsetArr, writer[0]->fopArr);
959✔
1135
  TSDB_CHECK_CODE(code, lino, _exit);
959✔
1136

1137
_exit:
959✔
1138
  if (code) {
959✔
1139
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
×
1140
  } else {
1141
    tsdbInfo("vgId:%d %s done, sver:%" PRId64 " ever:%" PRId64, TD_VID(pTsdb->pVnode), __func__, sver, ever);
959✔
1142
  }
1143
  return code;
959✔
1144
}
1145

1146
int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* writer, bool rollback) {
959✔
1147
  int32_t code = 0;
959✔
1148
  int32_t lino = 0;
959✔
1149

1150
  if (!rollback) {
959✔
1151
    code = tsdbSnapWriteFileSetEnd(writer);
654✔
1152
    TSDB_CHECK_CODE(code, lino, _exit);
654✔
1153

1154
    code = tsdbFSEditBegin(writer->tsdb->pFS, writer->fopArr, TSDB_FEDIT_COMMIT);
654✔
1155
    TSDB_CHECK_CODE(code, lino, _exit);
654✔
1156
  } else {
1157
    code = tsdbSnapWriteFileSetAbort(writer);
305✔
1158
    TSDB_CHECK_CODE(code, lino, _exit);
305✔
1159

1160
    code = tsdbFSEditBegin(writer->tsdb->pFS, writer->fopArr, TSDB_FEDIT_COMMIT);
305✔
1161
    TSDB_CHECK_CODE(code, lino, _exit);
305✔
1162
  }
1163

1164
_exit:
305✔
1165
  if (code) {
959✔
1166
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
1167
  } else {
1168
    tsdbDebug("vgId:%d %s done", TD_VID(writer->tsdb->pVnode), __func__);
959✔
1169
  }
1170
  return code;
959✔
1171
}
1172

1173
int32_t tsdbSnapWriterClose(STsdbSnapWriter** writer, int8_t rollback) {
959✔
1174
  if (writer[0] == NULL) return 0;
959✔
1175

1176
  int32_t code = 0;
959✔
1177
  int32_t lino = 0;
959✔
1178

1179
  STsdb* tsdb = writer[0]->tsdb;
959✔
1180

1181
  if (rollback) {
959✔
1182
    code = tsdbFSEditAbort(writer[0]->tsdb->pFS);
305✔
1183
    TSDB_CHECK_CODE(code, lino, _exit);
305✔
1184
  } else {
1185
    (void)taosThreadMutexLock(&writer[0]->tsdb->mutex);
654✔
1186

1187
    code = tsdbFSEditCommit(writer[0]->tsdb->pFS);
654✔
1188
    if (code) {
654✔
1189
      (void)taosThreadMutexUnlock(&writer[0]->tsdb->mutex);
×
1190
      TSDB_CHECK_CODE(code, lino, _exit);
×
1191
    }
1192

1193
    writer[0]->tsdb->pFS->fsstate = TSDB_FS_STATE_NORMAL;
654✔
1194

1195
    (void)taosThreadMutexUnlock(&writer[0]->tsdb->mutex);
654✔
1196
  }
1197

1198
  tsdbIterMergerClose(&writer[0]->ctx->tombIterMerger);
959✔
1199
  tsdbIterMergerClose(&writer[0]->ctx->dataIterMerger);
959✔
1200
  TARRAY2_DESTROY(writer[0]->ctx->tombIterArr, tsdbIterClose);
959✔
1201
  TARRAY2_DESTROY(writer[0]->ctx->dataIterArr, tsdbIterClose);
959✔
1202
  TARRAY2_DESTROY(writer[0]->ctx->sttReaderArr, tsdbSttFileReaderClose);
959✔
1203
  tsdbDataFileReaderClose(&writer[0]->ctx->dataReader);
959✔
1204

1205
  TARRAY2_DESTROY(writer[0]->fopArr, NULL);
959✔
1206
  tsdbFSDestroyCopyRangedSnapshot(&writer[0]->fsetArr);
959✔
1207

1208
  for (int32_t i = 0; i < ARRAY_SIZE(writer[0]->buffers); ++i) {
10,549✔
1209
    tBufferDestroy(writer[0]->buffers + i);
9,590✔
1210
  }
1211

1212
  taosMemoryFree(writer[0]);
959✔
1213
  writer[0] = NULL;
959✔
1214

1215
_exit:
959✔
1216
  if (code) {
959✔
1217
    TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
×
1218
  } else {
1219
    tsdbInfo("vgId:%d %s done, rollback:%d", TD_VID(tsdb->pVnode), __func__, rollback);
959✔
1220
  }
1221
  return code;
959✔
1222
}
1223

1224
int32_t tsdbSnapWrite(STsdbSnapWriter* writer, SSnapDataHdr* hdr) {
2,201✔
1225
  int32_t code = 0;
2,201✔
1226
  int32_t lino = 0;
2,201✔
1227

1228
  if (hdr->type == SNAP_DATA_TSDB) {
2,201✔
1229
    code = tsdbSnapWriteTimeSeriesData(writer, hdr);
2,201✔
1230
    TSDB_CHECK_CODE(code, lino, _exit);
2,201✔
1231
  } else if (hdr->type == SNAP_DATA_DEL) {
×
1232
    code = tsdbSnapWriteTombData(writer, hdr);
×
1233
    TSDB_CHECK_CODE(code, lino, _exit);
×
1234
  } else {
1235
    TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_PARA, lino, _exit);
×
1236
  }
1237

1238
_exit:
2,201✔
1239
  if (code) {
2,201✔
1240
    tsdbError("vgId:%d %s failed at line %d since %s, type:%d index:%" PRId64 " size:%" PRId64,
×
1241
              TD_VID(writer->tsdb->pVnode), __func__, lino, tstrerror(code), hdr->type, hdr->index, hdr->size);
1242
  } else {
1243
    tsdbDebug("vgId:%d %s done, type:%d index:%" PRId64 " size:%" PRId64, TD_VID(writer->tsdb->pVnode), __func__,
2,201✔
1244
              hdr->type, hdr->index, hdr->size);
1245
  }
1246
  return code;
2,201✔
1247
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc