• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4848

12 Nov 2025 01:06AM UTC coverage: 63.27% (+0.6%) from 62.651%
#4848

push

travis-ci

web-flow
Merge f12882a7a into e27395247

33 of 36 new or added lines in 4 files covered. (91.67%)

2652 existing lines in 104 files now uncovered.

138980 of 219661 relevant lines covered (63.27%)

110230098.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.01
/source/dnode/vnode/src/tsdb/tsdbSnapshot.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tsdb.h"
17
#include "tsdbDataFileRW.h"
18
#include "tsdbFS2.h"
19
#include "tsdbFSetRW.h"
20
#include "tsdbIter.h"
21
#include "tsdbSttFileRW.h"
22

23
extern int32_t tsdbUpdateTableSchema(SMeta* pMeta, int64_t suid, int64_t uid, SSkmInfo* pSkmInfo);
24

25
// STsdbSnapReader ========================================
26
struct STsdbSnapReader {
27
  STsdb*  tsdb;
28
  int64_t sver;
29
  int64_t ever;
30
  int8_t  type;
31

32
  SBuffer  buffers[10];
33
  SSkmInfo skmTb[1];
34

35
  TFileSetRangeArray* fsrArr;
36

37
  // context
38
  struct {
39
    int32_t         fsrArrIdx;
40
    STFileSetRange* fsr;
41
    bool            isDataDone;
42
    bool            isTombDone;
43
  } ctx[1];
44

45
  // reader
46
  SDataFileReader*    dataReader;
47
  TSttFileReaderArray sttReaderArr[1];
48

49
  // iter
50
  TTsdbIterArray dataIterArr[1];
51
  SIterMerger*   dataIterMerger;
52
  TTsdbIterArray tombIterArr[1];
53
  SIterMerger*   tombIterMerger;
54

55
  // data
56
  SBlockData blockData[1];
57
  STombBlock tombBlock[1];
58
};
59

60
static void tsdbSnapReadFileSetCloseReader(STsdbSnapReader* reader) {
1,575✔
61
  TARRAY2_CLEAR(reader->sttReaderArr, tsdbSttFileReaderClose);
2,233✔
62
  tsdbDataFileReaderClose(&reader->dataReader);
1,575✔
63
  return;
1,575✔
64
}
65

66
static int32_t tsdbSnapReadFileSetOpenReader(STsdbSnapReader* reader) {
1,575✔
67
  int32_t code = 0;
1,575✔
68
  int32_t lino = 0;
1,575✔
69

70
  // data
71
  SDataFileReaderConfig config = {
1,575✔
72
      .tsdb = reader->tsdb,
1,575✔
73
      .szPage = reader->tsdb->pVnode->config.tsdbPageSize,
1,575✔
74
      .buffers = reader->buffers,
1,575✔
75
  };
76
  bool hasDataFile = false;
1,575✔
77
  for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ftype++) {
7,875✔
78
    if (reader->ctx->fsr->fset->farr[ftype] != NULL) {
6,300✔
79
      hasDataFile = true;
4,725✔
80
      config.files[ftype].exist = true;
4,725✔
81
      config.files[ftype].file = reader->ctx->fsr->fset->farr[ftype]->f[0];
4,725✔
82
    }
83
  }
84

85
  if (hasDataFile) {
1,575✔
86
    code = tsdbDataFileReaderOpen(NULL, &config, &reader->dataReader);
1,575✔
87
    TSDB_CHECK_CODE(code, lino, _exit);
1,575✔
88
  }
89

90
  // stt
91
  SSttLvl* lvl;
92
  TARRAY2_FOREACH(reader->ctx->fsr->fset->lvlArr, lvl) {
2,233✔
93
    STFileObj* fobj;
94
    TARRAY2_FOREACH(lvl->fobjArr, fobj) {
1,316✔
95
      SSttFileReader*      sttReader;
658✔
96
      SSttFileReaderConfig config = {
1,316✔
97
          .tsdb = reader->tsdb,
658✔
98
          .szPage = reader->tsdb->pVnode->config.tsdbPageSize,
658✔
99
          .file = fobj->f[0],
100
          .buffers = reader->buffers,
658✔
101
      };
102

103
      code = tsdbSttFileReaderOpen(fobj->fname, &config, &sttReader);
658✔
104
      TSDB_CHECK_CODE(code, lino, _exit);
658✔
105

106
      if ((code = TARRAY2_APPEND(reader->sttReaderArr, sttReader))) {
1,316✔
107
        tsdbSttFileReaderClose(&sttReader);
×
108
        TSDB_CHECK_CODE(code, lino, _exit);
×
109
      }
110
    }
111
  }
112

113
_exit:
1,575✔
114
  if (code) {
1,575✔
115
    tsdbSnapReadFileSetCloseReader(reader);
×
116
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
×
117
  }
118
  return code;
1,575✔
119
}
120

121
static int32_t tsdbSnapReadFileSetOpenIter(STsdbSnapReader* reader) {
1,575✔
122
  int32_t code = 0;
1,575✔
123
  int32_t lino = 0;
1,575✔
124

125
  STsdbIter*      iter;
1,575✔
126
  STsdbIterConfig config = {
1,575✔
127
      .filterByVersion = true,
128
      .verRange[0] = reader->ctx->fsr->sver,
1,575✔
129
      .verRange[1] = reader->ctx->fsr->ever,
1,575✔
130
  };
131

132
  // data file
133
  if (reader->dataReader) {
1,575✔
134
    // data
135
    config.type = TSDB_ITER_TYPE_DATA;
1,575✔
136
    config.dataReader = reader->dataReader;
1,575✔
137

138
    code = tsdbIterOpen(&config, &iter);
1,575✔
139
    TSDB_CHECK_CODE(code, lino, _exit);
1,575✔
140

141
    code = TARRAY2_APPEND(reader->dataIterArr, iter);
1,575✔
142
    TSDB_CHECK_CODE(code, lino, _exit);
1,575✔
143

144
    // tomb
145
    config.type = TSDB_ITER_TYPE_DATA_TOMB;
1,575✔
146
    config.dataReader = reader->dataReader;
1,575✔
147

148
    code = tsdbIterOpen(&config, &iter);
1,575✔
149
    TSDB_CHECK_CODE(code, lino, _exit);
1,575✔
150

151
    code = TARRAY2_APPEND(reader->tombIterArr, iter);
1,575✔
152
    TSDB_CHECK_CODE(code, lino, _exit);
1,575✔
153
  }
154

155
  // stt file
156
  SSttFileReader* sttReader;
157
  TARRAY2_FOREACH(reader->sttReaderArr, sttReader) {
2,233✔
158
    // data
159
    config.type = TSDB_ITER_TYPE_STT;
658✔
160
    config.sttReader = sttReader;
658✔
161

162
    code = tsdbIterOpen(&config, &iter);
658✔
163
    TSDB_CHECK_CODE(code, lino, _exit);
658✔
164

165
    code = TARRAY2_APPEND(reader->dataIterArr, iter);
658✔
166
    TSDB_CHECK_CODE(code, lino, _exit);
658✔
167

168
    // tomb
169
    config.type = TSDB_ITER_TYPE_STT_TOMB;
658✔
170
    config.sttReader = sttReader;
658✔
171

172
    code = tsdbIterOpen(&config, &iter);
658✔
173
    TSDB_CHECK_CODE(code, lino, _exit);
658✔
174

175
    code = TARRAY2_APPEND(reader->tombIterArr, iter);
658✔
176
    TSDB_CHECK_CODE(code, lino, _exit);
658✔
177
  }
178

179
  // merger
180
  code = tsdbIterMergerOpen(reader->dataIterArr, &reader->dataIterMerger, false);
1,575✔
181
  TSDB_CHECK_CODE(code, lino, _exit);
1,575✔
182

183
  code = tsdbIterMergerOpen(reader->tombIterArr, &reader->tombIterMerger, true);
1,575✔
184
  TSDB_CHECK_CODE(code, lino, _exit);
1,575✔
185

186
_exit:
1,575✔
187
  if (code) {
1,575✔
188
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
×
189
  }
190
  return code;
1,575✔
191
}
192

193
static void tsdbSnapReadFileSetCloseIter(STsdbSnapReader* reader) {
1,575✔
194
  tsdbIterMergerClose(&reader->dataIterMerger);
1,575✔
195
  tsdbIterMergerClose(&reader->tombIterMerger);
1,575✔
196
  TARRAY2_CLEAR(reader->dataIterArr, tsdbIterClose);
3,808✔
197
  TARRAY2_CLEAR(reader->tombIterArr, tsdbIterClose);
3,808✔
198
  return;
1,575✔
199
}
200

201
static int32_t tsdbSnapReadRangeBegin(STsdbSnapReader* reader) {
3,150✔
202
  int32_t code = 0;
3,150✔
203
  int32_t lino = 0;
3,150✔
204

205
  if (reader->ctx->fsrArrIdx < TARRAY2_SIZE(reader->fsrArr)) {
3,150✔
206
    reader->ctx->fsr = TARRAY2_GET(reader->fsrArr, reader->ctx->fsrArrIdx++);
1,575✔
207
    reader->ctx->isDataDone = false;
1,575✔
208
    reader->ctx->isTombDone = false;
1,575✔
209

210
    code = tsdbSnapReadFileSetOpenReader(reader);
1,575✔
211
    TSDB_CHECK_CODE(code, lino, _exit);
1,575✔
212

213
    code = tsdbSnapReadFileSetOpenIter(reader);
1,575✔
214
    TSDB_CHECK_CODE(code, lino, _exit);
1,575✔
215
  }
216

217
_exit:
3,150✔
218
  if (code) {
3,150✔
219
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
×
220
  }
221
  return code;
3,150✔
222
}
223

224
static int32_t tsdbSnapReadRangeEnd(STsdbSnapReader* reader) {
1,575✔
225
  tsdbSnapReadFileSetCloseIter(reader);
1,575✔
226
  tsdbSnapReadFileSetCloseReader(reader);
1,575✔
227
  reader->ctx->fsr = NULL;
1,575✔
228
  return 0;
1,575✔
229
}
230

231
static int32_t tsdbSnapCmprData(STsdbSnapReader* reader, uint8_t** data) {
6,311✔
232
  int32_t code = 0;
6,311✔
233
  int32_t lino = 0;
6,311✔
234

235
  SColCompressInfo info;
236

237
  SColCompressInfo cmprInfo = {.pColCmpr = NULL, .defaultCmprAlg = NO_COMPRESSION};
6,311✔
238
  code = tBlockDataCompress(reader->blockData, (void*)&cmprInfo, reader->buffers, reader->buffers + 4);
6,311✔
239
  TSDB_CHECK_CODE(code, lino, _exit);
6,311✔
240

241
  int32_t size = 0;
6,311✔
242
  for (int i = 0; i < 4; i++) {
31,555✔
243
    size += reader->buffers[i].size;
25,244✔
244
  }
245
  *data = taosMemoryMalloc(sizeof(SSnapDataHdr) + size);
6,311✔
246
  if (*data == NULL) {
6,311✔
247
    code = terrno;
×
248
    TSDB_CHECK_CODE(code, lino, _exit);
×
249
  }
250

251
  SSnapDataHdr* pHdr = (SSnapDataHdr*)*data;
6,311✔
252
  uint8_t*      pBuf = pHdr->data;
6,311✔
253

254
  pHdr->type = reader->type;
6,311✔
255
  pHdr->size = size;
6,311✔
256
  for (int i = 0; i < 4; i++) {
31,555✔
257
    memcpy(pBuf, reader->buffers[i].data, reader->buffers[i].size);
25,244✔
258
    pBuf += reader->buffers[i].size;
25,244✔
259
  }
260

261
_exit:
6,311✔
262
  if (code) {
6,311✔
263
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), lino, code);
×
264
  }
265
  return code;
6,311✔
266
}
267

268
static int64_t tBlockDataSize(SBlockData* pBlockData) {
4,659,064✔
269
  int64_t nData = 0;
4,659,064✔
270
  for (int32_t iCol = 0; iCol < pBlockData->nColData; iCol++) {
50,278,140✔
271
    SColData* pColData = tBlockDataGetColDataByIdx(pBlockData, iCol);
45,619,076✔
272
    if (pColData->flag == HAS_NONE || pColData->flag == HAS_NULL) {
45,619,076✔
UNCOV
273
      continue;
×
274
    }
275

276
    if (pColData->flag != HAS_VALUE) {
45,619,076✔
UNCOV
277
      if (pColData->flag == (HAS_NONE | HAS_NULL | HAS_VALUE)) {
×
UNCOV
278
        nData += BIT2_SIZE(pColData->nVal);
×
279
      } else {
UNCOV
280
        nData += BIT1_SIZE(pColData->nVal);
×
281
      }
282
    }
283

284
    if (pColData->flag == (HAS_NONE | HAS_NULL)) {
45,619,076✔
UNCOV
285
      continue;
×
286
    }
287

288
    if (IS_VAR_DATA_TYPE(pColData->type)) {
45,619,076✔
UNCOV
289
      nData += pColData->nVal * sizeof(int32_t);  // var data offset
×
290
    }
291

292
    nData += pColData->nData;
45,619,076✔
293
  }
294
  return nData;
4,659,064✔
295
}
296

297
static int32_t tsdbSnapReadTimeSeriesData(STsdbSnapReader* reader, uint8_t** data) {
7,886✔
298
  int32_t   code = 0;
7,886✔
299
  int32_t   lino = 0;
7,886✔
300
  SMetaInfo info;
7,886✔
301

302
  tBlockDataReset(reader->blockData);
7,886✔
303

304
  TABLEID tbid[1] = {0};
7,886✔
305
  for (SRowInfo* row; (row = tsdbIterMergerGetData(reader->dataIterMerger));) {
74,553,808✔
306
    // skip dropped table
307
    if (row->uid != tbid->uid) {
74,550,658✔
308
      tbid->suid = row->suid;
14,404✔
309
      tbid->uid = row->uid;
14,404✔
310
      if (metaGetInfo(reader->tsdb->pVnode->pMeta, tbid->uid, &info, NULL) != 0) {
14,404✔
UNCOV
311
        code = tsdbIterMergerSkipTableData(reader->dataIterMerger, tbid);
×
UNCOV
312
        TSDB_CHECK_CODE(code, lino, _exit);
×
UNCOV
313
        continue;
×
314
      }
315
    }
316

317
    if (reader->blockData->suid == 0 && reader->blockData->uid == 0) {
74,550,658✔
318
      code = tsdbUpdateSkmTb(reader->tsdb, (TABLEID*)row, reader->skmTb);
6,311✔
319
      TSDB_CHECK_CODE(code, lino, _exit);
6,311✔
320

321
      TABLEID tbid1 = {
6,311✔
322
          .suid = row->suid,
6,311✔
323
          .uid = row->suid ? 0 : row->uid,
6,311✔
324
      };
325
      code = tBlockDataInit(reader->blockData, &tbid1, reader->skmTb->pTSchema, NULL, 0);
6,311✔
326
      TSDB_CHECK_CODE(code, lino, _exit);
6,311✔
327
    }
328

329
    if (!TABLE_SAME_SCHEMA(reader->blockData->suid, reader->blockData->uid, row->suid, row->uid)) {
74,550,658✔
UNCOV
330
      break;
×
331
    }
332

333
    code = tBlockDataAppendRow(reader->blockData, &row->row, NULL, row->uid);
74,550,658✔
334
    TSDB_CHECK_CODE(code, lino, _exit);
74,550,658✔
335

336
    code = tsdbIterMergerNext(reader->dataIterMerger);
74,550,658✔
337
    TSDB_CHECK_CODE(code, lino, _exit);
74,550,658✔
338

339
    if (!(reader->blockData->nRow % 16)) {
74,550,658✔
340
      int64_t nData = tBlockDataSize(reader->blockData);
4,659,064✔
341
      if (nData >= TSDB_SNAP_DATA_PAYLOAD_SIZE) {
4,659,064✔
342
        break;
4,736✔
343
      }
344
    }
345
  }
346

347
  if (reader->blockData->nRow > 0) {
7,886✔
348
    code = tsdbSnapCmprData(reader, data);
6,311✔
349
    TSDB_CHECK_CODE(code, lino, _exit);
6,311✔
350
  }
351

352
_exit:
7,886✔
353
  if (code) {
7,886✔
354
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
×
355
  } else {
356
    tsdbDebug("vgId:%d, tsdb snapshot read time-series data done, data size:%" PRId64 "", TD_VID(reader->tsdb->pVnode),
7,886✔
357
              data[0] ? ((SSnapDataHdr*)(data[0]))->size : 0);
358
  }
359
  return code;
7,886✔
360
}
361

UNCOV
362
static int32_t tsdbSnapCmprTombData(STsdbSnapReader* reader, uint8_t** data) {
×
UNCOV
363
  int32_t code = 0;
×
364
  int32_t lino = 0;
×
365

366
  int64_t size = 0;
×
UNCOV
367
  for (int32_t i = 0; i < ARRAY_SIZE(reader->tombBlock->buffers); i++) {
×
368
    size += reader->tombBlock->buffers[i].size;
×
369
  }
370

UNCOV
371
  data[0] = taosMemoryMalloc(size + sizeof(SSnapDataHdr));
×
UNCOV
372
  if (data[0] == NULL) {
×
UNCOV
373
    code = terrno;
×
UNCOV
374
    TSDB_CHECK_CODE(code, lino, _exit);
×
375
  }
376

UNCOV
377
  SSnapDataHdr* hdr = (SSnapDataHdr*)(data[0]);
×
UNCOV
378
  hdr->type = SNAP_DATA_DEL;
×
UNCOV
379
  hdr->size = size;
×
380

381
  uint8_t* tdata = hdr->data;
×
382
  for (int32_t i = 0; i < ARRAY_SIZE(reader->tombBlock->buffers); i++) {
×
383
    memcpy(tdata, reader->tombBlock->buffers[i].data, reader->tombBlock->buffers[i].size);
×
384
    tdata += reader->tombBlock->buffers[i].size;
×
385
  }
386

UNCOV
387
_exit:
×
UNCOV
388
  if (code) {
×
UNCOV
389
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
×
390
  }
391
  return code;
×
392
}
393

394
static int32_t tsdbSnapReadTombData(STsdbSnapReader* reader, uint8_t** data) {
1,575✔
395
  int32_t   code = 0;
1,575✔
396
  int32_t   lino = 0;
1,575✔
397
  SMetaInfo info;
1,575✔
398

399
  tTombBlockClear(reader->tombBlock);
1,575✔
400

401
  TABLEID tbid[1] = {0};
1,575✔
402
  for (STombRecord* record; (record = tsdbIterMergerGetTombRecord(reader->tombIterMerger)) != NULL;) {
1,575✔
403
    if (record->uid != tbid->uid) {
×
UNCOV
404
      tbid->suid = record->suid;
×
UNCOV
405
      tbid->uid = record->uid;
×
UNCOV
406
      if (metaGetInfo(reader->tsdb->pVnode->pMeta, tbid->uid, &info, NULL) != 0) {
×
UNCOV
407
        code = tsdbIterMergerSkipTableData(reader->tombIterMerger, tbid);
×
408
        TSDB_CHECK_CODE(code, lino, _exit);
×
UNCOV
409
        continue;
×
410
      }
411
    }
412

UNCOV
413
    code = tTombBlockPut(reader->tombBlock, record);
×
UNCOV
414
    TSDB_CHECK_CODE(code, lino, _exit);
×
415

UNCOV
416
    code = tsdbIterMergerNext(reader->tombIterMerger);
×
UNCOV
417
    TSDB_CHECK_CODE(code, lino, _exit);
×
418

UNCOV
419
    if (TOMB_BLOCK_SIZE(reader->tombBlock) >= 81920) {
×
UNCOV
420
      break;
×
421
    }
422
  }
423

424
  if (TOMB_BLOCK_SIZE(reader->tombBlock) > 0) {
1,575✔
UNCOV
425
    code = tsdbSnapCmprTombData(reader, data);
×
UNCOV
426
    TSDB_CHECK_CODE(code, lino, _exit);
×
427
  }
428

429
_exit:
1,575✔
430
  if (code) {
1,575✔
431
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
×
432
  }
433
  return code;
1,575✔
434
}
435

436
int32_t tsdbSnapReaderOpen(STsdb* tsdb, int64_t sver, int64_t ever, int8_t type, void* pRanges,
1,575✔
437
                           STsdbSnapReader** reader) {
438
  int32_t code = 0;
1,575✔
439
  int32_t lino = 0;
1,575✔
440

441
  reader[0] = (STsdbSnapReader*)taosMemoryCalloc(1, sizeof(*reader[0]));
1,575✔
442
  if (reader[0] == NULL) return terrno;
1,575✔
443

444
  reader[0]->tsdb = tsdb;
1,575✔
445
  reader[0]->sver = sver;
1,575✔
446
  reader[0]->ever = ever;
1,575✔
447
  reader[0]->type = type;
1,575✔
448

449
  code = tsdbFSCreateRefRangedSnapshot(tsdb->pFS, sver, ever, (TFileSetRangeArray*)pRanges, &reader[0]->fsrArr);
1,575✔
450
  TSDB_CHECK_CODE(code, lino, _exit);
1,575✔
451

452
_exit:
1,575✔
453
  if (code) {
1,575✔
UNCOV
454
    tsdbError("vgId:%d %s failed at %s:%d since %s, sver:%" PRId64 " ever:%" PRId64 " type:%d", TD_VID(tsdb->pVnode),
×
455
              __func__, __FILE__, lino, tstrerror(code), sver, ever, type);
UNCOV
456
    tsdbTFileSetRangeArrayDestroy(&reader[0]->fsrArr);
×
UNCOV
457
    taosMemoryFree(reader[0]);
×
UNCOV
458
    reader[0] = NULL;
×
459
  } else {
460
    tsdbInfo("vgId:%d, tsdb snapshot incremental reader opened. sver:%" PRId64 " ever:%" PRId64 " type:%d",
1,575✔
461
             TD_VID(tsdb->pVnode), sver, ever, type);
462
  }
463
  return code;
1,575✔
464
}
465

466
void tsdbSnapReaderClose(STsdbSnapReader** reader) {
1,575✔
467
  if (reader[0] == NULL) {
1,575✔
UNCOV
468
    return;
×
469
  }
470

471
  int32_t code = 0;
1,575✔
472

473
  STsdb* tsdb = reader[0]->tsdb;
1,575✔
474

475
  tTombBlockDestroy(reader[0]->tombBlock);
1,575✔
476
  tBlockDataDestroy(reader[0]->blockData);
1,575✔
477

478
  tsdbIterMergerClose(&reader[0]->dataIterMerger);
1,575✔
479
  tsdbIterMergerClose(&reader[0]->tombIterMerger);
1,575✔
480
  TARRAY2_DESTROY(reader[0]->dataIterArr, tsdbIterClose);
1,575✔
481
  TARRAY2_DESTROY(reader[0]->tombIterArr, tsdbIterClose);
1,575✔
482
  TARRAY2_DESTROY(reader[0]->sttReaderArr, tsdbSttFileReaderClose);
1,575✔
483
  tsdbDataFileReaderClose(&reader[0]->dataReader);
1,575✔
484

485
  tsdbFSDestroyRefRangedSnapshot(&reader[0]->fsrArr);
1,575✔
486
  tDestroyTSchema(reader[0]->skmTb->pTSchema);
1,575✔
487

488
  for (int32_t i = 0; i < ARRAY_SIZE(reader[0]->buffers); ++i) {
17,325✔
489
    tBufferDestroy(reader[0]->buffers + i);
15,750✔
490
  }
491

492
  taosMemoryFree(reader[0]);
1,575✔
493
  reader[0] = NULL;
1,575✔
494

495
  return;
1,575✔
496
}
497

498
int32_t tsdbSnapRead(STsdbSnapReader* reader, uint8_t** data) {
7,886✔
499
  int32_t code = 0;
7,886✔
500
  int32_t lino = 0;
7,886✔
501

502
  data[0] = NULL;
7,886✔
503

504
  for (;;) {
505
    if (reader->ctx->fsr == NULL) {
9,461✔
506
      code = tsdbSnapReadRangeBegin(reader);
3,150✔
507
      TSDB_CHECK_CODE(code, lino, _exit);
3,150✔
508

509
      if (reader->ctx->fsr == NULL) {
3,150✔
510
        break;
1,575✔
511
      }
512
    }
513

514
    if (!reader->ctx->isDataDone) {
7,886✔
515
      code = tsdbSnapReadTimeSeriesData(reader, data);
7,886✔
516
      TSDB_CHECK_CODE(code, lino, _exit);
7,886✔
517
      if (data[0]) {
7,886✔
518
        goto _exit;
6,311✔
519
      } else {
520
        reader->ctx->isDataDone = true;
1,575✔
521
      }
522
    }
523

524
    if (!reader->ctx->isTombDone) {
1,575✔
525
      code = tsdbSnapReadTombData(reader, data);
1,575✔
526
      TSDB_CHECK_CODE(code, lino, _exit);
1,575✔
527
      if (data[0]) {
1,575✔
UNCOV
528
        goto _exit;
×
529
      } else {
530
        reader->ctx->isTombDone = true;
1,575✔
531
      }
532
    }
533

534
    code = tsdbSnapReadRangeEnd(reader);
1,575✔
535
    TSDB_CHECK_CODE(code, lino, _exit);
1,575✔
536
  }
537

538
_exit:
7,886✔
539
  if (code) {
7,886✔
UNCOV
540
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
×
541
  } else {
542
    tsdbDebug("vgId:%d %s done", TD_VID(reader->tsdb->pVnode), __func__);
7,886✔
543
  }
544
  return code;
7,886✔
545
}
546

547
// STsdbSnapWriter ========================================
548
struct STsdbSnapWriter {
549
  STsdb*  tsdb;
550
  int64_t sver;
551
  int64_t ever;
552
  int32_t minutes;
553
  int8_t  precision;
554
  int32_t minRow;
555
  int32_t maxRow;
556
  int8_t  cmprAlg;
557
  int64_t commitID;
558
  int32_t szPage;
559
  int64_t compactVersion;
560
  int64_t now;
561
  SBuffer buffers[10];
562

563
  TFileSetArray* fsetArr;
564
  TFileOpArray   fopArr[1];
565

566
  struct {
567
    bool       fsetWriteBegin;
568
    int32_t    fid;
569
    STFileSet* fset;
570
    int32_t    expLevel;
571
    bool       hasData;  // if have time series data
572
    bool       hasTomb;  // if have tomb data
573

574
    // reader
575
    SDataFileReader*    dataReader;
576
    TSttFileReaderArray sttReaderArr[1];
577

578
    // iter/merger
579
    TTsdbIterArray dataIterArr[1];
580
    SIterMerger*   dataIterMerger;
581
    TTsdbIterArray tombIterArr[1];
582
    SIterMerger*   tombIterMerger;
583

584
    // writer
585
    bool         toSttOnly;
586
    SFSetWriter* fsetWriter;
587
  } ctx[1];
588
};
589

590
// APIs
591
static int32_t tsdbSnapWriteTimeSeriesRow(STsdbSnapWriter* writer, SRowInfo* row) {
42,552,898✔
592
  int32_t   code = 0;
42,552,898✔
593
  int32_t   lino = 0;
42,552,898✔
594
  TABLEID   tbid = {0};
42,552,898✔
595
  SMetaInfo info;
42,552,898✔
596

597
  while (writer->ctx->hasData) {
75,062,898✔
598
    SRowInfo* row1;
599
    for (;;) {
600
      row1 = tsdbIterMergerGetData(writer->ctx->dataIterMerger);
71,952,898✔
601
      if (row1 == NULL) {
71,952,898✔
602
        writer->ctx->hasData = false;
1,280✔
603
      } else if (row1->uid != tbid.uid) {
71,951,618✔
604
        tbid.suid = row1->suid;
39,447,668✔
605
        tbid.uid = row1->uid;
39,447,668✔
606
        if (metaGetInfo(writer->tsdb->pVnode->pMeta, tbid.uid, &info, NULL) != 0) {
39,447,668✔
607
          code = tsdbIterMergerSkipTableData(writer->ctx->dataIterMerger, &tbid);
658✔
608
          TSDB_CHECK_CODE(code, lino, _exit);
658✔
609
          continue;
658✔
610
        }
611
      }
612
      break;
71,952,240✔
613
    }
614

615
    if (writer->ctx->hasData == false) {
71,952,240✔
616
      break;
1,280✔
617
    }
618

619
    int32_t c = tRowInfoCmprFn(row1, row);
71,950,960✔
620
    if (c <= 0) {
71,950,960✔
621
      code = tsdbFSetWriteRow(writer->ctx->fsetWriter, row1);
32,510,000✔
622
      TSDB_CHECK_CODE(code, lino, _exit);
32,510,000✔
623

624
      code = tsdbIterMergerNext(writer->ctx->dataIterMerger);
32,510,000✔
625
      TSDB_CHECK_CODE(code, lino, _exit);
32,510,000✔
626
    } else {
627
      break;
39,440,960✔
628
    }
629
  }
630

631
  if (row->suid == INT64_MAX) {
42,552,898✔
632
    goto _exit;
1,280✔
633
  }
634

635
  code = tsdbFSetWriteRow(writer->ctx->fsetWriter, row);
42,551,618✔
636
  TSDB_CHECK_CODE(code, lino, _exit);
42,551,618✔
637

638
_exit:
42,552,898✔
639
  if (code) {
42,552,898✔
UNCOV
640
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
641
  }
642
  return code;
42,552,898✔
643
}
644

645
static int32_t tsdbSnapWriteFileSetOpenReader(STsdbSnapWriter* writer) {
1,574✔
646
  int32_t code = 0;
1,574✔
647
  int32_t lino = 0;
1,574✔
648

649
  writer->ctx->toSttOnly = false;
1,574✔
650
  if (writer->ctx->fset) {
1,574✔
651
#if 0
652
    // open data reader
653
    SDataFileReaderConfig dataFileReaderConfig = {
654
        .tsdb = writer->tsdb,
655
        .buffers = writer->buffers,
656
        .szPage = writer->szPage,
657
    };
658

659
    for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ++ftype) {
660
      if (writer->ctx->fset->farr[ftype] == NULL) {
661
        continue;
662
      }
663

664
      dataFileReaderConfig.files[ftype].exist = true;
665
      dataFileReaderConfig.files[ftype].file = writer->ctx->fset->farr[ftype]->f[0];
666

667
      STFileOp fileOp = {
668
          .optype = TSDB_FOP_REMOVE,
669
          .fid = writer->ctx->fset->fid,
670
          .of = writer->ctx->fset->farr[ftype]->f[0],
671
      };
672

673
      code = TARRAY2_APPEND(writer->fopArr, fileOp);
674
      TSDB_CHECK_CODE(code, lino, _exit);
675
    }
676

677
    code = tsdbDataFileReaderOpen(NULL, &dataFileReaderConfig, &writer->ctx->dataReader);
678
    TSDB_CHECK_CODE(code, lino, _exit);
679
#endif
680

681
    // open stt reader array
682
    SSttLvl* lvl;
683
    TARRAY2_FOREACH(writer->ctx->fset->lvlArr, lvl) {
3,806✔
684
      if (lvl->level != 0) {
2,232✔
685
        if (TARRAY2_SIZE(lvl->fobjArr) > 0) {
658✔
686
          writer->ctx->toSttOnly = true;
658✔
687
        }
688

689
        continue;  // Only merge level 0
658✔
690
      }
691

692
      STFileObj* fobj;
693
      TARRAY2_FOREACH(lvl->fobjArr, fobj) {
3,148✔
694
        SSttFileReader*      reader;
1,574✔
695
        SSttFileReaderConfig sttFileReaderConfig = {
3,148✔
696
            .tsdb = writer->tsdb,
1,574✔
697
            .szPage = writer->szPage,
1,574✔
698
            .buffers = writer->buffers,
1,574✔
699
            .file = fobj->f[0],
700
        };
701

702
        code = tsdbSttFileReaderOpen(fobj->fname, &sttFileReaderConfig, &reader);
1,574✔
703
        TSDB_CHECK_CODE(code, lino, _exit);
1,574✔
704

705
        code = TARRAY2_APPEND(writer->ctx->sttReaderArr, reader);
1,574✔
706
        TSDB_CHECK_CODE(code, lino, _exit);
1,574✔
707

708
        STFileOp fileOp = {
3,148✔
709
            .optype = TSDB_FOP_REMOVE,
710
            .fid = fobj->f->fid,
1,574✔
711
            .of = fobj->f[0],
712
        };
713

714
        code = TARRAY2_APPEND(writer->fopArr, fileOp);
1,574✔
715
        TSDB_CHECK_CODE(code, lino, _exit);
1,574✔
716
      }
717
    }
718
  }
719

720
_exit:
1,574✔
721
  if (code) {
1,574✔
UNCOV
722
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
723
  }
724
  return code;
1,574✔
725
}
726

727
static int32_t tsdbSnapWriteFileSetCloseReader(STsdbSnapWriter* writer) {
1,574✔
728
  TARRAY2_CLEAR(writer->ctx->sttReaderArr, tsdbSttFileReaderClose);
3,148✔
729
  tsdbDataFileReaderClose(&writer->ctx->dataReader);
1,574✔
730
  return 0;
1,574✔
731
}
732

733
static int32_t tsdbSnapWriteFileSetOpenIter(STsdbSnapWriter* writer) {
1,574✔
734
  int32_t code = 0;
1,574✔
735
  int32_t lino = 0;
1,574✔
736

737
  // data ieter
738
  if (writer->ctx->dataReader) {
1,574✔
UNCOV
739
    STsdbIter*      iter;
×
UNCOV
740
    STsdbIterConfig config = {0};
×
741

742
    // data
UNCOV
743
    config.type = TSDB_ITER_TYPE_DATA;
×
UNCOV
744
    config.dataReader = writer->ctx->dataReader;
×
745

UNCOV
746
    code = tsdbIterOpen(&config, &iter);
×
UNCOV
747
    TSDB_CHECK_CODE(code, lino, _exit);
×
748

UNCOV
749
    code = TARRAY2_APPEND(writer->ctx->dataIterArr, iter);
×
UNCOV
750
    TSDB_CHECK_CODE(code, lino, _exit);
×
751

752
    // tome
UNCOV
753
    config.type = TSDB_ITER_TYPE_DATA_TOMB;
×
UNCOV
754
    config.dataReader = writer->ctx->dataReader;
×
755

UNCOV
756
    code = tsdbIterOpen(&config, &iter);
×
UNCOV
757
    TSDB_CHECK_CODE(code, lino, _exit);
×
758

UNCOV
759
    code = TARRAY2_APPEND(writer->ctx->tombIterArr, iter);
×
UNCOV
760
    TSDB_CHECK_CODE(code, lino, _exit);
×
761
  }
762

763
  // stt iter
764
  SSttFileReader* sttFileReader;
765
  TARRAY2_FOREACH(writer->ctx->sttReaderArr, sttFileReader) {
3,148✔
766
    STsdbIter*      iter;
1,574✔
767
    STsdbIterConfig config = {0};
1,574✔
768

769
    // data
770
    config.type = TSDB_ITER_TYPE_STT;
1,574✔
771
    config.sttReader = sttFileReader;
1,574✔
772

773
    code = tsdbIterOpen(&config, &iter);
1,574✔
774
    TSDB_CHECK_CODE(code, lino, _exit);
1,574✔
775

776
    code = TARRAY2_APPEND(writer->ctx->dataIterArr, iter);
1,574✔
777
    TSDB_CHECK_CODE(code, lino, _exit);
1,574✔
778

779
    // tomb
780
    config.type = TSDB_ITER_TYPE_STT_TOMB;
1,574✔
781
    config.sttReader = sttFileReader;
1,574✔
782

783
    code = tsdbIterOpen(&config, &iter);
1,574✔
784
    TSDB_CHECK_CODE(code, lino, _exit);
1,574✔
785

786
    code = TARRAY2_APPEND(writer->ctx->tombIterArr, iter);
1,574✔
787
    TSDB_CHECK_CODE(code, lino, _exit);
1,574✔
788
  }
789

790
  // open merger
791
  code = tsdbIterMergerOpen(writer->ctx->dataIterArr, &writer->ctx->dataIterMerger, false);
1,574✔
792
  TSDB_CHECK_CODE(code, lino, _exit);
1,574✔
793

794
  code = tsdbIterMergerOpen(writer->ctx->tombIterArr, &writer->ctx->tombIterMerger, true);
1,574✔
795
  TSDB_CHECK_CODE(code, lino, _exit);
1,574✔
796

797
_exit:
1,574✔
798
  if (code) {
1,574✔
UNCOV
799
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
800
  }
801
  return code;
1,574✔
802
}
803

804
static int32_t tsdbSnapWriteFileSetCloseIter(STsdbSnapWriter* writer) {
1,574✔
805
  tsdbIterMergerClose(&writer->ctx->dataIterMerger);
1,574✔
806
  tsdbIterMergerClose(&writer->ctx->tombIterMerger);
1,574✔
807
  TARRAY2_CLEAR(writer->ctx->dataIterArr, tsdbIterClose);
3,148✔
808
  TARRAY2_CLEAR(writer->ctx->tombIterArr, tsdbIterClose);
3,148✔
809
  return 0;
1,574✔
810
}
811

812
static int32_t tsdbSnapWriteFileSetOpenWriter(STsdbSnapWriter* writer) {
1,574✔
813
  int32_t code = 0;
1,574✔
814
  int32_t lino = 0;
1,574✔
815

816
  SFSetWriterConfig config = {
1,574✔
817
      .tsdb = writer->tsdb,
1,574✔
818
      .toSttOnly = writer->ctx->toSttOnly,
1,574✔
819
      .compactVersion = writer->compactVersion,
1,574✔
820
      .minRow = writer->minRow,
1,574✔
821
      .maxRow = writer->maxRow,
1,574✔
822
      .szPage = writer->szPage,
1,574✔
823
      .cmprAlg = writer->cmprAlg,
1,574✔
824
      .fid = writer->ctx->fid,
1,574✔
825
      .cid = writer->commitID,
1,574✔
826
      .expLevel = writer->ctx->expLevel,
1,574✔
827
      .level = writer->ctx->toSttOnly ? 1 : 0,
1,574✔
828
  };
829
  // merge stt files to either data or a new stt file
830
  if (writer->ctx->fset) {
1,574✔
831
    for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ++ftype) {
7,870✔
832
      if (writer->ctx->fset->farr[ftype] != NULL) {
6,296✔
833
        config.files[ftype].exist = true;
1,974✔
834
        config.files[ftype].file = writer->ctx->fset->farr[ftype]->f[0];
1,974✔
835
      }
836
    }
837
  }
838

839
  code = tsdbFSetWriterOpen(&config, &writer->ctx->fsetWriter);
1,574✔
840
  TSDB_CHECK_CODE(code, lino, _exit);
1,574✔
841

842
_exit:
1,574✔
843
  if (code) {
1,574✔
UNCOV
844
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
845
  }
846
  return code;
1,574✔
847
}
848

849
static int32_t tsdbSnapWriteFileSetCloseWriter(STsdbSnapWriter* writer) {
1,574✔
850
  return tsdbFSetWriterClose(&writer->ctx->fsetWriter, 0, writer->fopArr);
1,574✔
851
}
852

853
static int32_t tsdbSnapWriteFileSetBegin(STsdbSnapWriter* writer, int32_t fid) {
1,574✔
854
  int32_t code = 0;
1,574✔
855
  int32_t lino = 0;
1,574✔
856

857
  STFileSet* fset = &(STFileSet){.fid = fid};
1,574✔
858

859
  writer->ctx->fid = fid;
1,574✔
860
  STFileSet** fsetPtr = TARRAY2_SEARCH(writer->fsetArr, &fset, tsdbTFileSetCmprFn, TD_EQ);
1,574✔
861
  writer->ctx->fset = (fsetPtr == NULL) ? NULL : *fsetPtr;
1,574✔
862

863
  writer->ctx->expLevel = tsdbFidLevel(fid, &writer->tsdb->keepCfg, taosGetTimestampSec());
1,574✔
864

865
  writer->ctx->hasData = true;
1,574✔
866
  writer->ctx->hasTomb = true;
1,574✔
867

868
  code = tsdbSnapWriteFileSetOpenReader(writer);
1,574✔
869
  TSDB_CHECK_CODE(code, lino, _exit);
1,574✔
870

871
  code = tsdbSnapWriteFileSetOpenIter(writer);
1,574✔
872
  TSDB_CHECK_CODE(code, lino, _exit);
1,574✔
873

874
  code = tsdbSnapWriteFileSetOpenWriter(writer);
1,574✔
875
  TSDB_CHECK_CODE(code, lino, _exit);
1,574✔
876

877
  writer->ctx->fsetWriteBegin = true;
1,574✔
878

879
_exit:
1,574✔
880
  if (code) {
1,574✔
881
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
882
  } else {
883
    tsdbInfo("vgId:%d %s succeeded, fid:%d", TD_VID(writer->tsdb->pVnode), __func__, fid);
1,574✔
884
  }
885
  return code;
1,574✔
886
}
887

888
static int32_t tsdbSnapWriteTombRecord(STsdbSnapWriter* writer, const STombRecord* record) {
1,280✔
889
  int32_t code = 0;
1,280✔
890
  int32_t lino = 0;
1,280✔
891

892
  while (writer->ctx->hasTomb) {
1,280✔
893
    STombRecord* record1 = tsdbIterMergerGetTombRecord(writer->ctx->tombIterMerger);
1,280✔
894
    if (record1 == NULL) {
1,280✔
895
      writer->ctx->hasTomb = false;
1,280✔
896
      break;
1,280✔
897
    }
898

UNCOV
899
    int32_t c = tTombRecordCompare(record1, record);
×
UNCOV
900
    if (c <= 0) {
×
UNCOV
901
      code = tsdbFSetWriteTombRecord(writer->ctx->fsetWriter, record1);
×
UNCOV
902
      TSDB_CHECK_CODE(code, lino, _exit);
×
903
    } else {
UNCOV
904
      break;
×
905
    }
906

UNCOV
907
    code = tsdbIterMergerNext(writer->ctx->tombIterMerger);
×
UNCOV
908
    TSDB_CHECK_CODE(code, lino, _exit);
×
909
  }
910

911
  if (record->suid == INT64_MAX) {
1,280✔
912
    goto _exit;
1,280✔
913
  }
914

UNCOV
915
  code = tsdbFSetWriteTombRecord(writer->ctx->fsetWriter, record);
×
UNCOV
916
  TSDB_CHECK_CODE(code, lino, _exit);
×
917

UNCOV
918
_exit:
×
919
  if (code) {
1,280✔
UNCOV
920
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
921
  }
922
  return code;
1,280✔
923
}
924

925
static int32_t tsdbSnapWriteFileSetEnd(STsdbSnapWriter* writer) {
2,854✔
926
  if (!writer->ctx->fsetWriteBegin) return 0;
2,854✔
927

928
  int32_t code = 0;
1,280✔
929
  int32_t lino = 0;
1,280✔
930

931
  // end timeseries data write
932
  SRowInfo row = {
1,280✔
933
      .suid = INT64_MAX,
934
      .uid = INT64_MAX,
935
  };
936

937
  code = tsdbSnapWriteTimeSeriesRow(writer, &row);
1,280✔
938
  TSDB_CHECK_CODE(code, lino, _exit);
1,280✔
939

940
  // end tombstone data write
941
  STombRecord record = {
1,280✔
942
      .suid = INT64_MAX,
943
      .uid = INT64_MAX,
944
  };
945

946
  code = tsdbSnapWriteTombRecord(writer, &record);
1,280✔
947
  TSDB_CHECK_CODE(code, lino, _exit);
1,280✔
948

949
  // close write
950
  code = tsdbSnapWriteFileSetCloseWriter(writer);
1,280✔
951
  TSDB_CHECK_CODE(code, lino, _exit);
1,280✔
952

953
  code = tsdbSnapWriteFileSetCloseIter(writer);
1,280✔
954
  TSDB_CHECK_CODE(code, lino, _exit);
1,280✔
955

956
  code = tsdbSnapWriteFileSetCloseReader(writer);
1,280✔
957
  TSDB_CHECK_CODE(code, lino, _exit);
1,280✔
958

959
  writer->ctx->fsetWriteBegin = false;
1,280✔
960

961
_exit:
1,280✔
962
  if (code) {
1,280✔
UNCOV
963
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
964
  } else {
965
    tsdbInfo("vgId:%d %s succeeded, fid:%d", TD_VID(writer->tsdb->pVnode), __func__, writer->ctx->fid);
1,280✔
966
  }
967
  return code;
1,280✔
968
}
969

970
static int32_t tsdbSnapWriteFileSetAbort(STsdbSnapWriter* writer) {
294✔
971
  if (!writer->ctx->fsetWriteBegin) return 0;
294✔
972

973
  int32_t code = 0;
294✔
974
  int32_t lino = 0;
294✔
975

976
  // close write
977
  code = tsdbSnapWriteFileSetCloseWriter(writer);
294✔
978
  TSDB_CHECK_CODE(code, lino, _exit);
294✔
979

980
  code = tsdbSnapWriteFileSetCloseIter(writer);
294✔
981
  TSDB_CHECK_CODE(code, lino, _exit);
294✔
982

983
  code = tsdbSnapWriteFileSetCloseReader(writer);
294✔
984
  TSDB_CHECK_CODE(code, lino, _exit);
294✔
985

986
  writer->ctx->fsetWriteBegin = false;
294✔
987

988
_exit:
294✔
989
  if (code) {
294✔
UNCOV
990
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
991
  }
992
  return code;
294✔
993
}
994

995
static int32_t tsdbSnapWriteTimeSeriesData(STsdbSnapWriter* writer, SSnapDataHdr* hdr) {
3,649✔
996
  int32_t code = 0;
3,649✔
997
  int32_t lino = 0;
3,649✔
998

999
  SBlockData blockData[1] = {0};
3,649✔
1000

1001
  SBuffer buffer = {
3,649✔
1002
      .capacity = hdr->size,
3,649✔
1003
      .data = hdr->data,
3,649✔
1004
      .size = hdr->size,
3,649✔
1005
  };
1006
  SBufferReader br = BUFFER_READER_INITIALIZER(0, &buffer);
3,649✔
1007

1008
  code = tBlockDataDecompress(&br, blockData, &writer->buffers[0]);
3,649✔
1009
  TSDB_CHECK_CODE(code, lino, _exit);
3,649✔
1010

1011
  int32_t fid = tsdbKeyFid(blockData->aTSKEY[0], writer->minutes, writer->precision);
3,649✔
1012
  if (!writer->ctx->fsetWriteBegin || fid != writer->ctx->fid) {
3,649✔
1013
    code = tsdbSnapWriteFileSetEnd(writer);
1,574✔
1014
    TSDB_CHECK_CODE(code, lino, _exit);
1,574✔
1015

1016
    code = tsdbSnapWriteFileSetBegin(writer, fid);
1,574✔
1017
    TSDB_CHECK_CODE(code, lino, _exit);
1,574✔
1018
  }
1019

1020
  for (int32_t i = 0; i < blockData->nRow; ++i) {
42,555,267✔
1021
    SRowInfo rowInfo = {
42,551,618✔
1022
        .suid = blockData->suid,
42,551,618✔
1023
        .uid = blockData->uid ? blockData->uid : blockData->aUid[i],
42,551,618✔
1024
        .row = tsdbRowFromBlockData(blockData, i),
1025
    };
1026

1027
    code = tsdbSnapWriteTimeSeriesRow(writer, &rowInfo);
42,551,618✔
1028
    TSDB_CHECK_CODE(code, lino, _exit);
42,551,618✔
1029
  }
1030

1031
_exit:
3,649✔
1032
  if (code) {
3,649✔
1033
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
1034
  } else {
1035
    tsdbDebug("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64 " nRow:%d", TD_VID(writer->tsdb->pVnode), __func__,
3,649✔
1036
              blockData->suid, blockData->uid, blockData->nRow);
1037
  }
1038
  tBlockDataDestroy(blockData);
3,649✔
1039
  return code;
3,649✔
1040
}
1041

1042
static int32_t tsdbSnapWriteDecmprTombBlock(SSnapDataHdr* hdr, STombBlock* tombBlock) {
×
1043
  int32_t code = 0;
×
UNCOV
1044
  int32_t lino = 0;
×
1045

1046
  tTombBlockClear(tombBlock);
×
1047

1048
  int64_t size = hdr->size;
×
1049
  size = size / TOMB_RECORD_ELEM_NUM;
×
UNCOV
1050
  tombBlock->numOfRecords = size / sizeof(int64_t);
×
1051

1052
  // int64_t* data = (int64_t*)hdr->data;
1053
  uint8_t* data = hdr->data;
×
1054
  for (int32_t i = 0; i < TOMB_RECORD_ELEM_NUM; ++i) {
×
1055
    code = tBufferPut(tombBlock->buffers + i, data, size);
×
1056
    TSDB_CHECK_CODE(code, lino, _exit);
×
UNCOV
1057
    data += size;
×
1058
  }
1059

UNCOV
1060
_exit:
×
UNCOV
1061
  return code;
×
1062
}
1063

UNCOV
1064
static int32_t tsdbSnapWriteTombData(STsdbSnapWriter* writer, SSnapDataHdr* hdr) {
×
UNCOV
1065
  int32_t code = 0;
×
UNCOV
1066
  int32_t lino = 0;
×
1067

1068
  STombRecord record;
×
1069
  STombBlock  tombBlock[1] = {0};
×
1070

UNCOV
1071
  code = tsdbSnapWriteDecmprTombBlock(hdr, tombBlock);
×
1072
  TSDB_CHECK_CODE(code, lino, _exit);
×
1073

1074
  code = tTombBlockGet(tombBlock, 0, &record);
×
UNCOV
1075
  TSDB_CHECK_CODE(code, lino, _exit);
×
1076
  int32_t fid = tsdbKeyFid(record.skey, writer->minutes, writer->precision);
×
1077
  if (!writer->ctx->fsetWriteBegin || fid != writer->ctx->fid) {
×
UNCOV
1078
    code = tsdbSnapWriteFileSetEnd(writer);
×
UNCOV
1079
    TSDB_CHECK_CODE(code, lino, _exit);
×
1080

UNCOV
1081
    code = tsdbSnapWriteFileSetBegin(writer, fid);
×
1082
    TSDB_CHECK_CODE(code, lino, _exit);
×
1083
  }
1084

UNCOV
1085
  if (writer->ctx->hasData) {
×
1086
    SRowInfo row = {
×
1087
        .suid = INT64_MAX,
1088
        .uid = INT64_MAX,
1089
    };
1090

UNCOV
1091
    code = tsdbSnapWriteTimeSeriesRow(writer, &row);
×
UNCOV
1092
    TSDB_CHECK_CODE(code, lino, _exit);
×
1093
  }
1094

UNCOV
1095
  for (int32_t i = 0; i < TOMB_BLOCK_SIZE(tombBlock); ++i) {
×
UNCOV
1096
    code = tTombBlockGet(tombBlock, i, &record);
×
UNCOV
1097
    TSDB_CHECK_CODE(code, lino, _exit);
×
1098

UNCOV
1099
    code = tsdbSnapWriteTombRecord(writer, &record);
×
UNCOV
1100
    TSDB_CHECK_CODE(code, lino, _exit);
×
1101
  }
1102

UNCOV
1103
  tTombBlockDestroy(tombBlock);
×
1104

UNCOV
1105
_exit:
×
UNCOV
1106
  if (code) {
×
UNCOV
1107
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
1108
  }
UNCOV
1109
  return code;
×
1110
}
1111

1112
int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, void* pRanges, STsdbSnapWriter** writer) {
1,574✔
1113
  int32_t code = 0;
1,574✔
1114
  int32_t lino = 0;
1,574✔
1115

1116
  // start to write
1117
  writer[0] = taosMemoryCalloc(1, sizeof(*writer[0]));
1,574✔
1118
  if (writer[0] == NULL) return terrno;
1,574✔
1119

1120
  writer[0]->tsdb = pTsdb;
1,574✔
1121
  writer[0]->sver = sver;
1,574✔
1122
  writer[0]->ever = ever;
1,574✔
1123
  writer[0]->minutes = pTsdb->keepCfg.days;
1,574✔
1124
  writer[0]->precision = pTsdb->keepCfg.precision;
1,574✔
1125
  writer[0]->minRow = pTsdb->pVnode->config.tsdbCfg.minRows;
1,574✔
1126
  writer[0]->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows;
1,574✔
1127
  writer[0]->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression;
1,574✔
1128
  writer[0]->commitID = tsdbFSAllocEid(pTsdb->pFS);
1,574✔
1129
  writer[0]->szPage = pTsdb->pVnode->config.tsdbPageSize;
1,574✔
1130
  writer[0]->compactVersion = INT64_MAX;
1,574✔
1131
  writer[0]->now = taosGetTimestampMs();
3,148✔
1132

1133
  code =
1134
      tsdbFSCreateCopyRangedSnapshot(pTsdb->pFS, (TFileSetRangeArray*)pRanges, &writer[0]->fsetArr, writer[0]->fopArr);
1,574✔
1135
  TSDB_CHECK_CODE(code, lino, _exit);
1,574✔
1136

1137
_exit:
1,574✔
1138
  if (code) {
1,574✔
UNCOV
1139
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
×
1140
  } else {
1141
    tsdbInfo("vgId:%d %s done, sver:%" PRId64 " ever:%" PRId64, TD_VID(pTsdb->pVnode), __func__, sver, ever);
1,574✔
1142
  }
1143
  return code;
1,574✔
1144
}
1145

1146
int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* writer, bool rollback) {
1,574✔
1147
  int32_t code = 0;
1,574✔
1148
  int32_t lino = 0;
1,574✔
1149

1150
  if (!rollback) {
1,574✔
1151
    code = tsdbSnapWriteFileSetEnd(writer);
1,280✔
1152
    TSDB_CHECK_CODE(code, lino, _exit);
1,280✔
1153

1154
    code = tsdbFSEditBegin(writer->tsdb->pFS, writer->fopArr, TSDB_FEDIT_COMMIT);
1,280✔
1155
    TSDB_CHECK_CODE(code, lino, _exit);
1,280✔
1156
  } else {
1157
    code = tsdbSnapWriteFileSetAbort(writer);
294✔
1158
    TSDB_CHECK_CODE(code, lino, _exit);
294✔
1159

1160
    code = tsdbFSEditBegin(writer->tsdb->pFS, writer->fopArr, TSDB_FEDIT_COMMIT);
294✔
1161
    TSDB_CHECK_CODE(code, lino, _exit);
294✔
1162
  }
1163

1164
_exit:
294✔
1165
  if (code) {
1,574✔
1166
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
1167
  } else {
1168
    tsdbDebug("vgId:%d %s done", TD_VID(writer->tsdb->pVnode), __func__);
1,574✔
1169
  }
1170
  return code;
1,574✔
1171
}
1172

1173
int32_t tsdbSnapWriterClose(STsdbSnapWriter** writer, int8_t rollback) {
1,574✔
1174
  if (writer[0] == NULL) return 0;
1,574✔
1175

1176
  int32_t code = 0;
1,574✔
1177
  int32_t lino = 0;
1,574✔
1178

1179
  STsdb* tsdb = writer[0]->tsdb;
1,574✔
1180

1181
  if (rollback) {
1,574✔
1182
    code = tsdbFSEditAbort(writer[0]->tsdb->pFS);
294✔
1183
    TSDB_CHECK_CODE(code, lino, _exit);
294✔
1184
  } else {
1185
    (void)taosThreadMutexLock(&writer[0]->tsdb->mutex);
1,280✔
1186

1187
    code = tsdbFSEditCommit(writer[0]->tsdb->pFS);
1,280✔
1188
    if (code) {
1,280✔
UNCOV
1189
      (void)taosThreadMutexUnlock(&writer[0]->tsdb->mutex);
×
UNCOV
1190
      TSDB_CHECK_CODE(code, lino, _exit);
×
1191
    }
1192

1193
    writer[0]->tsdb->pFS->fsstate = TSDB_FS_STATE_NORMAL;
1,280✔
1194

1195
    (void)taosThreadMutexUnlock(&writer[0]->tsdb->mutex);
1,280✔
1196
  }
1197

1198
  tsdbIterMergerClose(&writer[0]->ctx->tombIterMerger);
1,574✔
1199
  tsdbIterMergerClose(&writer[0]->ctx->dataIterMerger);
1,574✔
1200
  TARRAY2_DESTROY(writer[0]->ctx->tombIterArr, tsdbIterClose);
1,574✔
1201
  TARRAY2_DESTROY(writer[0]->ctx->dataIterArr, tsdbIterClose);
1,574✔
1202
  TARRAY2_DESTROY(writer[0]->ctx->sttReaderArr, tsdbSttFileReaderClose);
1,574✔
1203
  tsdbDataFileReaderClose(&writer[0]->ctx->dataReader);
1,574✔
1204

1205
  TARRAY2_DESTROY(writer[0]->fopArr, NULL);
1,574✔
1206
  tsdbFSDestroyCopyRangedSnapshot(&writer[0]->fsetArr);
1,574✔
1207

1208
  for (int32_t i = 0; i < ARRAY_SIZE(writer[0]->buffers); ++i) {
17,314✔
1209
    tBufferDestroy(writer[0]->buffers + i);
15,740✔
1210
  }
1211

1212
  taosMemoryFree(writer[0]);
1,574✔
1213
  writer[0] = NULL;
1,574✔
1214

1215
_exit:
1,574✔
1216
  if (code) {
1,574✔
1217
    TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
×
1218
  } else {
1219
    tsdbInfo("vgId:%d %s done, rollback:%d", TD_VID(tsdb->pVnode), __func__, rollback);
1,574✔
1220
  }
1221
  return code;
1,574✔
1222
}
1223

1224
int32_t tsdbSnapWrite(STsdbSnapWriter* writer, SSnapDataHdr* hdr) {
3,649✔
1225
  int32_t code = 0;
3,649✔
1226
  int32_t lino = 0;
3,649✔
1227

1228
  if (hdr->type == SNAP_DATA_TSDB) {
3,649✔
1229
    code = tsdbSnapWriteTimeSeriesData(writer, hdr);
3,649✔
1230
    TSDB_CHECK_CODE(code, lino, _exit);
3,649✔
UNCOV
1231
  } else if (hdr->type == SNAP_DATA_DEL) {
×
UNCOV
1232
    code = tsdbSnapWriteTombData(writer, hdr);
×
UNCOV
1233
    TSDB_CHECK_CODE(code, lino, _exit);
×
1234
  } else {
UNCOV
1235
    TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_PARA, lino, _exit);
×
1236
  }
1237

1238
_exit:
3,649✔
1239
  if (code) {
3,649✔
UNCOV
1240
    tsdbError("vgId:%d %s failed at line %d since %s, type:%d index:%" PRId64 " size:%" PRId64,
×
1241
              TD_VID(writer->tsdb->pVnode), __func__, lino, tstrerror(code), hdr->type, hdr->index, hdr->size);
1242
  } else {
1243
    tsdbDebug("vgId:%d %s done, type:%d index:%" PRId64 " size:%" PRId64, TD_VID(writer->tsdb->pVnode), __func__,
3,649✔
1244
              hdr->type, hdr->index, hdr->size);
1245
  }
1246
  return code;
3,649✔
1247
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc