• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4913

06 Jan 2026 01:30AM UTC coverage: 64.884% (-0.004%) from 64.888%
#4913

push

travis-ci

web-flow
merge: from main to 3.0 branch #34167

180 of 319 new or added lines in 14 files covered. (56.43%)

571 existing lines in 128 files now uncovered.

195016 of 300563 relevant lines covered (64.88%)

117540852.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.98
/source/dnode/vnode/src/tsdb/tsdbSnapshot.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tsdb.h"
17
#include "tsdbDataFileRW.h"
18
#include "tsdbFS2.h"
19
#include "tsdbFSetRW.h"
20
#include "tsdbIter.h"
21
#include "tsdbSttFileRW.h"
22

23
extern int32_t tsdbUpdateTableSchema(SMeta* pMeta, int64_t suid, int64_t uid, SSkmInfo* pSkmInfo);
24

25
// STsdbSnapReader ========================================
26
struct STsdbSnapReader {
27
  STsdb*  tsdb;
28
  int64_t sver;
29
  int64_t ever;
30
  int8_t  type;
31

32
  SBuffer  buffers[10];
33
  SSkmInfo skmTb[1];
34

35
  TFileSetRangeArray* fsrArr;
36

37
  // context
38
  struct {
39
    int32_t         fsrArrIdx;
40
    STFileSetRange* fsr;
41
    bool            isDataDone;
42
    bool            isTombDone;
43
  } ctx[1];
44

45
  // reader
46
  SDataFileReader*    dataReader;
47
  TSttFileReaderArray sttReaderArr[1];
48

49
  // iter
50
  TTsdbIterArray dataIterArr[1];
51
  SIterMerger*   dataIterMerger;
52
  TTsdbIterArray tombIterArr[1];
53
  SIterMerger*   tombIterMerger;
54

55
  // data
56
  SBlockData blockData[1];
57
  STombBlock tombBlock[1];
58
};
59

60
static void tsdbSnapReadFileSetCloseReader(STsdbSnapReader* reader) {
359✔
61
  TARRAY2_CLEAR(reader->sttReaderArr, tsdbSttFileReaderClose);
359✔
62
  tsdbDataFileReaderClose(&reader->dataReader);
359✔
63
  return;
359✔
64
}
65

66
static int32_t tsdbSnapReadFileSetOpenReader(STsdbSnapReader* reader) {
359✔
67
  int32_t code = 0;
359✔
68
  int32_t lino = 0;
359✔
69

70
  // data
71
  SDataFileReaderConfig config = {
359✔
72
      .tsdb = reader->tsdb,
359✔
73
      .szPage = reader->tsdb->pVnode->config.tsdbPageSize,
359✔
74
      .buffers = reader->buffers,
359✔
75
  };
76
  bool hasDataFile = false;
359✔
77
  for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ftype++) {
1,795✔
78
    if (reader->ctx->fsr->fset->farr[ftype] != NULL) {
1,436✔
79
      hasDataFile = true;
1,077✔
80
      config.files[ftype].exist = true;
1,077✔
81
      config.files[ftype].file = reader->ctx->fsr->fset->farr[ftype]->f[0];
1,077✔
82
    }
83
  }
84

85
  if (hasDataFile) {
359✔
86
    code = tsdbDataFileReaderOpen(NULL, &config, &reader->dataReader);
359✔
87
    TSDB_CHECK_CODE(code, lino, _exit);
359✔
88
  }
89

90
  // stt
91
  SSttLvl* lvl;
92
  TARRAY2_FOREACH(reader->ctx->fsr->fset->lvlArr, lvl) {
359✔
93
    STFileObj* fobj;
UNCOV
94
    TARRAY2_FOREACH(lvl->fobjArr, fobj) {
×
UNCOV
95
      SSttFileReader*      sttReader;
×
UNCOV
96
      SSttFileReaderConfig config = {
×
UNCOV
97
          .tsdb = reader->tsdb,
×
UNCOV
98
          .szPage = reader->tsdb->pVnode->config.tsdbPageSize,
×
99
          .file = fobj->f[0],
UNCOV
100
          .buffers = reader->buffers,
×
101
      };
102

UNCOV
103
      code = tsdbSttFileReaderOpen(fobj->fname, &config, &sttReader);
×
UNCOV
104
      TSDB_CHECK_CODE(code, lino, _exit);
×
105

UNCOV
106
      if ((code = TARRAY2_APPEND(reader->sttReaderArr, sttReader))) {
×
107
        tsdbSttFileReaderClose(&sttReader);
×
108
        TSDB_CHECK_CODE(code, lino, _exit);
×
109
      }
110
    }
111
  }
112

113
_exit:
359✔
114
  if (code) {
359✔
115
    tsdbSnapReadFileSetCloseReader(reader);
×
116
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
×
117
  }
118
  return code;
359✔
119
}
120

121
static int32_t tsdbSnapReadFileSetOpenIter(STsdbSnapReader* reader) {
359✔
122
  int32_t code = 0;
359✔
123
  int32_t lino = 0;
359✔
124

125
  STsdbIter*      iter;
359✔
126
  STsdbIterConfig config = {
359✔
127
      .filterByVersion = true,
128
      .verRange[0] = reader->ctx->fsr->sver,
359✔
129
      .verRange[1] = reader->ctx->fsr->ever,
359✔
130
  };
131

132
  // data file
133
  if (reader->dataReader) {
359✔
134
    // data
135
    config.type = TSDB_ITER_TYPE_DATA;
359✔
136
    config.dataReader = reader->dataReader;
359✔
137

138
    code = tsdbIterOpen(&config, &iter);
359✔
139
    TSDB_CHECK_CODE(code, lino, _exit);
359✔
140

141
    code = TARRAY2_APPEND(reader->dataIterArr, iter);
359✔
142
    TSDB_CHECK_CODE(code, lino, _exit);
359✔
143

144
    // tomb
145
    config.type = TSDB_ITER_TYPE_DATA_TOMB;
359✔
146
    config.dataReader = reader->dataReader;
359✔
147

148
    code = tsdbIterOpen(&config, &iter);
359✔
149
    TSDB_CHECK_CODE(code, lino, _exit);
359✔
150

151
    code = TARRAY2_APPEND(reader->tombIterArr, iter);
359✔
152
    TSDB_CHECK_CODE(code, lino, _exit);
359✔
153
  }
154

155
  // stt file
156
  SSttFileReader* sttReader;
157
  TARRAY2_FOREACH(reader->sttReaderArr, sttReader) {
359✔
158
    // data
UNCOV
159
    config.type = TSDB_ITER_TYPE_STT;
×
UNCOV
160
    config.sttReader = sttReader;
×
161

UNCOV
162
    code = tsdbIterOpen(&config, &iter);
×
UNCOV
163
    TSDB_CHECK_CODE(code, lino, _exit);
×
164

UNCOV
165
    code = TARRAY2_APPEND(reader->dataIterArr, iter);
×
UNCOV
166
    TSDB_CHECK_CODE(code, lino, _exit);
×
167

168
    // tomb
UNCOV
169
    config.type = TSDB_ITER_TYPE_STT_TOMB;
×
UNCOV
170
    config.sttReader = sttReader;
×
171

UNCOV
172
    code = tsdbIterOpen(&config, &iter);
×
UNCOV
173
    TSDB_CHECK_CODE(code, lino, _exit);
×
174

UNCOV
175
    code = TARRAY2_APPEND(reader->tombIterArr, iter);
×
UNCOV
176
    TSDB_CHECK_CODE(code, lino, _exit);
×
177
  }
178

179
  // merger
180
  code = tsdbIterMergerOpen(reader->dataIterArr, &reader->dataIterMerger, false);
359✔
181
  TSDB_CHECK_CODE(code, lino, _exit);
359✔
182

183
  code = tsdbIterMergerOpen(reader->tombIterArr, &reader->tombIterMerger, true);
359✔
184
  TSDB_CHECK_CODE(code, lino, _exit);
359✔
185

186
_exit:
359✔
187
  if (code) {
359✔
188
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
×
189
  }
190
  return code;
359✔
191
}
192

193
static void tsdbSnapReadFileSetCloseIter(STsdbSnapReader* reader) {
359✔
194
  tsdbIterMergerClose(&reader->dataIterMerger);
359✔
195
  tsdbIterMergerClose(&reader->tombIterMerger);
359✔
196
  TARRAY2_CLEAR(reader->dataIterArr, tsdbIterClose);
718✔
197
  TARRAY2_CLEAR(reader->tombIterArr, tsdbIterClose);
718✔
198
  return;
359✔
199
}
200

201
static int32_t tsdbSnapReadRangeBegin(STsdbSnapReader* reader) {
718✔
202
  int32_t code = 0;
718✔
203
  int32_t lino = 0;
718✔
204

205
  if (reader->ctx->fsrArrIdx < TARRAY2_SIZE(reader->fsrArr)) {
718✔
206
    reader->ctx->fsr = TARRAY2_GET(reader->fsrArr, reader->ctx->fsrArrIdx++);
359✔
207
    reader->ctx->isDataDone = false;
359✔
208
    reader->ctx->isTombDone = false;
359✔
209

210
    code = tsdbSnapReadFileSetOpenReader(reader);
359✔
211
    TSDB_CHECK_CODE(code, lino, _exit);
359✔
212

213
    code = tsdbSnapReadFileSetOpenIter(reader);
359✔
214
    TSDB_CHECK_CODE(code, lino, _exit);
359✔
215
  }
216

217
_exit:
718✔
218
  if (code) {
718✔
219
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
×
220
  }
221
  return code;
718✔
222
}
223

224
static int32_t tsdbSnapReadRangeEnd(STsdbSnapReader* reader) {
359✔
225
  tsdbSnapReadFileSetCloseIter(reader);
359✔
226
  tsdbSnapReadFileSetCloseReader(reader);
359✔
227
  reader->ctx->fsr = NULL;
359✔
228
  return 0;
359✔
229
}
230

231
static int32_t tsdbSnapCmprData(STsdbSnapReader* reader, uint8_t** data) {
2,572✔
232
  int32_t code = 0;
2,572✔
233
  int32_t lino = 0;
2,572✔
234

235
  SColCompressInfo info;
236

237
  SColCompressInfo cmprInfo = {.pColCmpr = NULL, .defaultCmprAlg = NO_COMPRESSION};
2,572✔
238
  code = tBlockDataCompress(reader->blockData, (void*)&cmprInfo, reader->buffers, reader->buffers + 4);
2,572✔
239
  TSDB_CHECK_CODE(code, lino, _exit);
2,572✔
240

241
  int32_t size = 0;
2,572✔
242
  for (int i = 0; i < 4; i++) {
12,860✔
243
    size += reader->buffers[i].size;
10,288✔
244
  }
245
  *data = taosMemoryMalloc(sizeof(SSnapDataHdr) + size);
2,572✔
246
  if (*data == NULL) {
2,572✔
247
    code = terrno;
×
248
    TSDB_CHECK_CODE(code, lino, _exit);
×
249
  }
250

251
  SSnapDataHdr* pHdr = (SSnapDataHdr*)*data;
2,572✔
252
  uint8_t*      pBuf = pHdr->data;
2,572✔
253

254
  pHdr->type = reader->type;
2,572✔
255
  pHdr->size = size;
2,572✔
256
  for (int i = 0; i < 4; i++) {
12,860✔
257
    memcpy(pBuf, reader->buffers[i].data, reader->buffers[i].size);
10,288✔
258
    pBuf += reader->buffers[i].size;
10,288✔
259
  }
260

261
_exit:
2,572✔
262
  if (code) {
2,572✔
263
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), lino, code);
×
264
  }
265
  return code;
2,572✔
266
}
267

268
static int64_t tBlockDataSize(SBlockData* pBlockData) {
26,968,256✔
269
  int64_t nData = 0;
26,968,256✔
270

271
  // Key part
272
  if (pBlockData->uid == 0) {
26,968,256✔
273
    nData += (sizeof(int64_t) * pBlockData->nRow);  // uid
26,969,280✔
274
  }
275
  nData += (sizeof(int64_t) * pBlockData->nRow);  // version
26,968,896✔
276
  nData += (sizeof(TSKEY) * pBlockData->nRow);    // primary keys
26,972,864✔
277

278
  // General column part
279
  for (int32_t iCol = 0; iCol < pBlockData->nColData; iCol++) {
289,439,424✔
280
    SColData* pColData = tBlockDataGetColDataByIdx(pBlockData, iCol);
262,561,792✔
281
    if (pColData->flag == HAS_NONE || pColData->flag == HAS_NULL) {
262,430,592✔
282
      continue;
×
283
    }
284

285
    if (pColData->flag != HAS_VALUE) {
262,421,376✔
286
      if (pColData->flag == (HAS_NONE | HAS_NULL | HAS_VALUE)) {
×
287
        nData += BIT2_SIZE(pColData->nVal);
×
288
      } else {
289
        nData += BIT1_SIZE(pColData->nVal);
×
290
      }
291
    }
292

293
    if (pColData->flag == (HAS_NONE | HAS_NULL)) {
262,426,368✔
294
      continue;
×
295
    }
296

297
    if (IS_VAR_DATA_TYPE(pColData->type)) {
262,387,968✔
298
      nData += pColData->nVal * sizeof(int32_t);  // var data offset
45,568✔
299
    }
300

301
    nData += pColData->nData;
262,594,688✔
302
  }
303
  return nData;
26,985,152✔
304
}
305

306
static int32_t tsdbSnapReadTimeSeriesData(STsdbSnapReader* reader, uint8_t** data) {
2,931✔
307
  int32_t   code = 0;
2,931✔
308
  int32_t   lino = 0;
2,931✔
309
  SMetaInfo info;
2,931✔
310

311
  tBlockDataReset(reader->blockData);
2,931✔
312

313
  TABLEID tbid[1] = {0};
2,931✔
314
  for (SRowInfo* row; (row = tsdbIterMergerGetData(reader->dataIterMerger));) {
26,986,254✔
315
    // skip dropped table
316
    if (row->uid != tbid->uid) {
26,984,512✔
317
      tbid->suid = row->suid;
5,553✔
318
      tbid->uid = row->uid;
5,553✔
319
      if (metaGetInfo(reader->tsdb->pVnode->pMeta, tbid->uid, &info, NULL) != 0) {
5,553✔
320
        code = tsdbIterMergerSkipTableData(reader->dataIterMerger, tbid);
×
321
        TSDB_CHECK_CODE(code, lino, _exit);
×
322
        continue;
×
323
      }
324
    }
325

326
    if (reader->blockData->suid == 0 && reader->blockData->uid == 0) {
26,984,768✔
327
      code = tsdbUpdateSkmTb(reader->tsdb, (TABLEID*)row, reader->skmTb);
2,572✔
328
      TSDB_CHECK_CODE(code, lino, _exit);
2,572✔
329

330
      TABLEID tbid1 = {
2,572✔
331
          .suid = row->suid,
2,572✔
332
          .uid = row->suid ? 0 : row->uid,
2,572✔
333
      };
334
      code = tBlockDataInit(reader->blockData, &tbid1, reader->skmTb->pTSchema, NULL, 0);
2,572✔
335
      TSDB_CHECK_CODE(code, lino, _exit);
2,572✔
336
    }
337

338
    if (!TABLE_SAME_SCHEMA(reader->blockData->suid, reader->blockData->uid, row->suid, row->uid)) {
26,985,152✔
339
      break;
×
340
    }
341

342
    code = tBlockDataAppendRow(reader->blockData, &row->row, NULL, row->uid);
26,985,152✔
343
    TSDB_CHECK_CODE(code, lino, _exit);
26,985,536✔
344

345
    code = tsdbIterMergerNext(reader->dataIterMerger);
26,985,536✔
346
    TSDB_CHECK_CODE(code, lino, _exit);
26,969,792✔
347

348
    if (reader->blockData->nRow >= TSDB_SNAP_MAX_ROWS_PER_DATA ||
53,955,328✔
349
        tBlockDataSize(reader->blockData) >= TSDB_SNAP_DATA_PAYLOAD_SIZE) {
26,969,920✔
350
      break;
351
    }
352
  }
353

354
  if (reader->blockData->nRow > 0) {
2,675✔
355
    code = tsdbSnapCmprData(reader, data);
2,572✔
356
    TSDB_CHECK_CODE(code, lino, _exit);
2,572✔
357
  }
358

359
_exit:
2,931✔
360
  if (code) {
2,931✔
361
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
×
362
  } else {
363
    tsdbDebug("vgId:%d, tsdb snapshot read time-series data done, data size:%" PRId64 "", TD_VID(reader->tsdb->pVnode),
2,931✔
364
              data[0] ? ((SSnapDataHdr*)(data[0]))->size : 0);
365
  }
366
  return code;
2,931✔
367
}
368

369
static int32_t tsdbSnapCmprTombData(STsdbSnapReader* reader, uint8_t** data) {
×
370
  int32_t code = 0;
×
371
  int32_t lino = 0;
×
372

373
  int64_t size = 0;
×
374
  for (int32_t i = 0; i < ARRAY_SIZE(reader->tombBlock->buffers); i++) {
×
375
    size += reader->tombBlock->buffers[i].size;
×
376
  }
377

378
  data[0] = taosMemoryMalloc(size + sizeof(SSnapDataHdr));
×
379
  if (data[0] == NULL) {
×
380
    code = terrno;
×
381
    TSDB_CHECK_CODE(code, lino, _exit);
×
382
  }
383

384
  SSnapDataHdr* hdr = (SSnapDataHdr*)(data[0]);
×
385
  hdr->type = SNAP_DATA_DEL;
×
386
  hdr->size = size;
×
387

388
  uint8_t* tdata = hdr->data;
×
389
  for (int32_t i = 0; i < ARRAY_SIZE(reader->tombBlock->buffers); i++) {
×
390
    memcpy(tdata, reader->tombBlock->buffers[i].data, reader->tombBlock->buffers[i].size);
×
391
    tdata += reader->tombBlock->buffers[i].size;
×
392
  }
393

394
_exit:
×
395
  if (code) {
×
396
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
×
397
  }
398
  return code;
×
399
}
400

401
static int32_t tsdbSnapReadTombData(STsdbSnapReader* reader, uint8_t** data) {
359✔
402
  int32_t   code = 0;
359✔
403
  int32_t   lino = 0;
359✔
404
  SMetaInfo info;
359✔
405

406
  tTombBlockClear(reader->tombBlock);
359✔
407

408
  TABLEID tbid[1] = {0};
359✔
409
  for (STombRecord* record; (record = tsdbIterMergerGetTombRecord(reader->tombIterMerger)) != NULL;) {
359✔
410
    if (record->uid != tbid->uid) {
×
411
      tbid->suid = record->suid;
×
412
      tbid->uid = record->uid;
×
413
      if (metaGetInfo(reader->tsdb->pVnode->pMeta, tbid->uid, &info, NULL) != 0) {
×
414
        code = tsdbIterMergerSkipTableData(reader->tombIterMerger, tbid);
×
415
        TSDB_CHECK_CODE(code, lino, _exit);
×
416
        continue;
×
417
      }
418
    }
419

420
    code = tTombBlockPut(reader->tombBlock, record);
×
421
    TSDB_CHECK_CODE(code, lino, _exit);
×
422

423
    code = tsdbIterMergerNext(reader->tombIterMerger);
×
424
    TSDB_CHECK_CODE(code, lino, _exit);
×
425

426
    if (TOMB_BLOCK_SIZE(reader->tombBlock) >= 81920) {
×
427
      break;
×
428
    }
429
  }
430

431
  if (TOMB_BLOCK_SIZE(reader->tombBlock) > 0) {
359✔
432
    code = tsdbSnapCmprTombData(reader, data);
×
433
    TSDB_CHECK_CODE(code, lino, _exit);
×
434
  }
435

436
_exit:
359✔
437
  if (code) {
359✔
438
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
×
439
  }
440
  return code;
359✔
441
}
442

443
int32_t tsdbSnapReaderOpen(STsdb* tsdb, int64_t sver, int64_t ever, int8_t type, void* pRanges,
359✔
444
                           STsdbSnapReader** reader) {
445
  int32_t code = 0;
359✔
446
  int32_t lino = 0;
359✔
447

448
  reader[0] = (STsdbSnapReader*)taosMemoryCalloc(1, sizeof(*reader[0]));
359✔
449
  if (reader[0] == NULL) return terrno;
359✔
450

451
  reader[0]->tsdb = tsdb;
359✔
452
  reader[0]->sver = sver;
359✔
453
  reader[0]->ever = ever;
359✔
454
  reader[0]->type = type;
359✔
455

456
  code = tsdbFSCreateRefRangedSnapshot(tsdb->pFS, sver, ever, (TFileSetRangeArray*)pRanges, &reader[0]->fsrArr);
359✔
457
  TSDB_CHECK_CODE(code, lino, _exit);
359✔
458

459
_exit:
359✔
460
  if (code) {
359✔
461
    tsdbError("vgId:%d %s failed at %s:%d since %s, sver:%" PRId64 " ever:%" PRId64 " type:%d", TD_VID(tsdb->pVnode),
×
462
              __func__, __FILE__, lino, tstrerror(code), sver, ever, type);
463
    tsdbTFileSetRangeArrayDestroy(&reader[0]->fsrArr);
×
464
    taosMemoryFree(reader[0]);
×
465
    reader[0] = NULL;
×
466
  } else {
467
    tsdbInfo("vgId:%d, tsdb snapshot incremental reader opened. sver:%" PRId64 " ever:%" PRId64 " type:%d",
359✔
468
             TD_VID(tsdb->pVnode), sver, ever, type);
469
  }
470
  return code;
359✔
471
}
472

473
void tsdbSnapReaderClose(STsdbSnapReader** reader) {
359✔
474
  if (reader[0] == NULL) {
359✔
475
    return;
×
476
  }
477

478
  int32_t code = 0;
359✔
479

480
  STsdb* tsdb = reader[0]->tsdb;
359✔
481

482
  tTombBlockDestroy(reader[0]->tombBlock);
359✔
483
  tBlockDataDestroy(reader[0]->blockData);
359✔
484

485
  tsdbIterMergerClose(&reader[0]->dataIterMerger);
359✔
486
  tsdbIterMergerClose(&reader[0]->tombIterMerger);
359✔
487
  TARRAY2_DESTROY(reader[0]->dataIterArr, tsdbIterClose);
359✔
488
  TARRAY2_DESTROY(reader[0]->tombIterArr, tsdbIterClose);
359✔
489
  TARRAY2_DESTROY(reader[0]->sttReaderArr, tsdbSttFileReaderClose);
359✔
490
  tsdbDataFileReaderClose(&reader[0]->dataReader);
359✔
491

492
  tsdbFSDestroyRefRangedSnapshot(&reader[0]->fsrArr);
359✔
493
  tDestroyTSchema(reader[0]->skmTb->pTSchema);
359✔
494

495
  for (int32_t i = 0; i < ARRAY_SIZE(reader[0]->buffers); ++i) {
3,949✔
496
    tBufferDestroy(reader[0]->buffers + i);
3,590✔
497
  }
498

499
  taosMemoryFree(reader[0]);
359✔
500
  reader[0] = NULL;
359✔
501

502
  return;
359✔
503
}
504

505
int32_t tsdbSnapRead(STsdbSnapReader* reader, uint8_t** data) {
2,931✔
506
  int32_t code = 0;
2,931✔
507
  int32_t lino = 0;
2,931✔
508

509
  data[0] = NULL;
2,931✔
510

511
  for (;;) {
512
    if (reader->ctx->fsr == NULL) {
3,290✔
513
      code = tsdbSnapReadRangeBegin(reader);
718✔
514
      TSDB_CHECK_CODE(code, lino, _exit);
718✔
515

516
      if (reader->ctx->fsr == NULL) {
718✔
517
        break;
359✔
518
      }
519
    }
520

521
    if (!reader->ctx->isDataDone) {
2,931✔
522
      code = tsdbSnapReadTimeSeriesData(reader, data);
2,931✔
523
      TSDB_CHECK_CODE(code, lino, _exit);
2,931✔
524
      if (data[0]) {
2,931✔
525
        goto _exit;
2,572✔
526
      } else {
527
        reader->ctx->isDataDone = true;
359✔
528
      }
529
    }
530

531
    if (!reader->ctx->isTombDone) {
359✔
532
      code = tsdbSnapReadTombData(reader, data);
359✔
533
      TSDB_CHECK_CODE(code, lino, _exit);
359✔
534
      if (data[0]) {
359✔
535
        goto _exit;
×
536
      } else {
537
        reader->ctx->isTombDone = true;
359✔
538
      }
539
    }
540

541
    code = tsdbSnapReadRangeEnd(reader);
359✔
542
    TSDB_CHECK_CODE(code, lino, _exit);
359✔
543
  }
544

545
_exit:
2,931✔
546
  if (code) {
2,931✔
547
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
×
548
  } else {
549
    tsdbDebug("vgId:%d %s done", TD_VID(reader->tsdb->pVnode), __func__);
2,931✔
550
  }
551
  return code;
2,931✔
552
}
553

554
// STsdbSnapWriter ========================================
555
struct STsdbSnapWriter {
556
  STsdb*  tsdb;
557
  int64_t sver;
558
  int64_t ever;
559
  int32_t minutes;
560
  int8_t  precision;
561
  int32_t minRow;
562
  int32_t maxRow;
563
  int8_t  cmprAlg;
564
  int64_t commitID;
565
  int32_t szPage;
566
  int64_t compactVersion;
567
  int64_t now;
568
  SBuffer buffers[10];
569

570
  TFileSetArray* fsetArr;
571
  TFileOpArray   fopArr[1];
572

573
  struct {
574
    bool       fsetWriteBegin;
575
    int32_t    fid;
576
    STFileSet* fset;
577
    int32_t    expLevel;
578
    bool       hasData;  // if have time series data
579
    bool       hasTomb;  // if have tomb data
580

581
    // reader
582
    SDataFileReader*    dataReader;
583
    TSttFileReaderArray sttReaderArr[1];
584

585
    // iter/merger
586
    TTsdbIterArray dataIterArr[1];
587
    SIterMerger*   dataIterMerger;
588
    TTsdbIterArray tombIterArr[1];
589
    SIterMerger*   tombIterMerger;
590

591
    // writer
592
    bool         toSttOnly;
593
    SFSetWriter* fsetWriter;
594
  } ctx[1];
595
};
596

597
// APIs
598
static int32_t tsdbSnapWriteTimeSeriesRow(STsdbSnapWriter* writer, SRowInfo* row) {
16,785,746✔
599
  int32_t   code = 0;
16,785,746✔
600
  int32_t   lino = 0;
16,785,746✔
601
  TABLEID   tbid = {0};
16,785,746✔
602
  SMetaInfo info;
16,785,746✔
603

604
  while (writer->ctx->hasData) {
29,395,746✔
605
    SRowInfo* row1;
606
    for (;;) {
607
      row1 = tsdbIterMergerGetData(writer->ctx->dataIterMerger);
28,113,698✔
608
      if (row1 == NULL) {
28,112,290✔
609
        writer->ctx->hasData = false;
256✔
610
      } else if (row1->uid != tbid.uid) {
28,112,034✔
611
        tbid.suid = row1->suid;
15,507,903✔
612
        tbid.uid = row1->uid;
15,507,903✔
613
        if (metaGetInfo(writer->tsdb->pVnode->pMeta, tbid.uid, &info, NULL) != 0) {
15,507,903✔
UNCOV
614
          code = tsdbIterMergerSkipTableData(writer->ctx->dataIterMerger, &tbid);
×
UNCOV
615
          TSDB_CHECK_CODE(code, lino, _exit);
×
UNCOV
616
          continue;
×
617
        }
618
      }
619
      break;
28,111,778✔
620
    }
621

622
    if (writer->ctx->hasData == false) {
28,111,778✔
623
      break;
256✔
624
    }
625

626
    int32_t c = tRowInfoCmprFn(row1, row);
28,111,778✔
627
    if (c <= 0) {
28,113,314✔
628
      code = tsdbFSetWriteRow(writer->ctx->fsetWriter, row1);
12,607,824✔
629
      TSDB_CHECK_CODE(code, lino, _exit);
12,610,000✔
630

631
      code = tsdbIterMergerNext(writer->ctx->dataIterMerger);
12,610,000✔
632
      TSDB_CHECK_CODE(code, lino, _exit);
12,610,000✔
633
    } else {
634
      break;
15,505,490✔
635
    }
636
  }
637

638
  if (row->suid == INT64_MAX) {
16,785,618✔
639
    goto _exit;
256✔
640
  }
641

642
  code = tsdbFSetWriteRow(writer->ctx->fsetWriter, row);
16,785,490✔
643
  TSDB_CHECK_CODE(code, lino, _exit);
16,785,490✔
644

645
_exit:
16,785,746✔
646
  if (code) {
16,785,746✔
647
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
648
  }
649
  return code;
16,785,746✔
650
}
651

652
static int32_t tsdbSnapWriteFileSetOpenReader(STsdbSnapWriter* writer) {
359✔
653
  int32_t code = 0;
359✔
654
  int32_t lino = 0;
359✔
655

656
  writer->ctx->toSttOnly = false;
359✔
657
  if (writer->ctx->fset) {
359✔
658
#if 0
659
    // open data reader
660
    SDataFileReaderConfig dataFileReaderConfig = {
661
        .tsdb = writer->tsdb,
662
        .buffers = writer->buffers,
663
        .szPage = writer->szPage,
664
    };
665

666
    for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ++ftype) {
667
      if (writer->ctx->fset->farr[ftype] == NULL) {
668
        continue;
669
      }
670

671
      dataFileReaderConfig.files[ftype].exist = true;
672
      dataFileReaderConfig.files[ftype].file = writer->ctx->fset->farr[ftype]->f[0];
673

674
      STFileOp fileOp = {
675
          .optype = TSDB_FOP_REMOVE,
676
          .fid = writer->ctx->fset->fid,
677
          .of = writer->ctx->fset->farr[ftype]->f[0],
678
      };
679

680
      code = TARRAY2_APPEND(writer->fopArr, fileOp);
681
      TSDB_CHECK_CODE(code, lino, _exit);
682
    }
683

684
    code = tsdbDataFileReaderOpen(NULL, &dataFileReaderConfig, &writer->ctx->dataReader);
685
    TSDB_CHECK_CODE(code, lino, _exit);
686
#endif
687

688
    // open stt reader array
689
    SSttLvl* lvl;
690
    TARRAY2_FOREACH(writer->ctx->fset->lvlArr, lvl) {
718✔
691
      if (lvl->level != 0) {
359✔
UNCOV
692
        if (TARRAY2_SIZE(lvl->fobjArr) > 0) {
×
UNCOV
693
          writer->ctx->toSttOnly = true;
×
694
        }
695

UNCOV
696
        continue;  // Only merge level 0
×
697
      }
698

699
      STFileObj* fobj;
700
      TARRAY2_FOREACH(lvl->fobjArr, fobj) {
718✔
701
        SSttFileReader*      reader;
359✔
702
        SSttFileReaderConfig sttFileReaderConfig = {
718✔
703
            .tsdb = writer->tsdb,
359✔
704
            .szPage = writer->szPage,
359✔
705
            .buffers = writer->buffers,
359✔
706
            .file = fobj->f[0],
707
        };
708

709
        code = tsdbSttFileReaderOpen(fobj->fname, &sttFileReaderConfig, &reader);
359✔
710
        TSDB_CHECK_CODE(code, lino, _exit);
359✔
711

712
        code = TARRAY2_APPEND(writer->ctx->sttReaderArr, reader);
359✔
713
        TSDB_CHECK_CODE(code, lino, _exit);
359✔
714

715
        STFileOp fileOp = {
718✔
716
            .optype = TSDB_FOP_REMOVE,
717
            .fid = fobj->f->fid,
359✔
718
            .of = fobj->f[0],
719
        };
720

721
        code = TARRAY2_APPEND(writer->fopArr, fileOp);
359✔
722
        TSDB_CHECK_CODE(code, lino, _exit);
359✔
723
      }
724
    }
725
  }
726

727
_exit:
359✔
728
  if (code) {
359✔
729
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
730
  }
731
  return code;
359✔
732
}
733

734
static int32_t tsdbSnapWriteFileSetCloseReader(STsdbSnapWriter* writer) {
359✔
735
  TARRAY2_CLEAR(writer->ctx->sttReaderArr, tsdbSttFileReaderClose);
718✔
736
  tsdbDataFileReaderClose(&writer->ctx->dataReader);
359✔
737
  return 0;
359✔
738
}
739

740
static int32_t tsdbSnapWriteFileSetOpenIter(STsdbSnapWriter* writer) {
359✔
741
  int32_t code = 0;
359✔
742
  int32_t lino = 0;
359✔
743

744
  // data ieter
745
  if (writer->ctx->dataReader) {
359✔
746
    STsdbIter*      iter;
×
747
    STsdbIterConfig config = {0};
×
748

749
    // data
750
    config.type = TSDB_ITER_TYPE_DATA;
×
751
    config.dataReader = writer->ctx->dataReader;
×
752

753
    code = tsdbIterOpen(&config, &iter);
×
754
    TSDB_CHECK_CODE(code, lino, _exit);
×
755

756
    code = TARRAY2_APPEND(writer->ctx->dataIterArr, iter);
×
757
    TSDB_CHECK_CODE(code, lino, _exit);
×
758

759
    // tome
760
    config.type = TSDB_ITER_TYPE_DATA_TOMB;
×
761
    config.dataReader = writer->ctx->dataReader;
×
762

763
    code = tsdbIterOpen(&config, &iter);
×
764
    TSDB_CHECK_CODE(code, lino, _exit);
×
765

766
    code = TARRAY2_APPEND(writer->ctx->tombIterArr, iter);
×
767
    TSDB_CHECK_CODE(code, lino, _exit);
×
768
  }
769

770
  // stt iter
771
  SSttFileReader* sttFileReader;
772
  TARRAY2_FOREACH(writer->ctx->sttReaderArr, sttFileReader) {
718✔
773
    STsdbIter*      iter;
359✔
774
    STsdbIterConfig config = {0};
359✔
775

776
    // data
777
    config.type = TSDB_ITER_TYPE_STT;
359✔
778
    config.sttReader = sttFileReader;
359✔
779

780
    code = tsdbIterOpen(&config, &iter);
359✔
781
    TSDB_CHECK_CODE(code, lino, _exit);
359✔
782

783
    code = TARRAY2_APPEND(writer->ctx->dataIterArr, iter);
359✔
784
    TSDB_CHECK_CODE(code, lino, _exit);
359✔
785

786
    // tomb
787
    config.type = TSDB_ITER_TYPE_STT_TOMB;
359✔
788
    config.sttReader = sttFileReader;
359✔
789

790
    code = tsdbIterOpen(&config, &iter);
359✔
791
    TSDB_CHECK_CODE(code, lino, _exit);
359✔
792

793
    code = TARRAY2_APPEND(writer->ctx->tombIterArr, iter);
359✔
794
    TSDB_CHECK_CODE(code, lino, _exit);
359✔
795
  }
796

797
  // open merger
798
  code = tsdbIterMergerOpen(writer->ctx->dataIterArr, &writer->ctx->dataIterMerger, false);
359✔
799
  TSDB_CHECK_CODE(code, lino, _exit);
359✔
800

801
  code = tsdbIterMergerOpen(writer->ctx->tombIterArr, &writer->ctx->tombIterMerger, true);
359✔
802
  TSDB_CHECK_CODE(code, lino, _exit);
359✔
803

804
_exit:
359✔
805
  if (code) {
359✔
806
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
807
  }
808
  return code;
359✔
809
}
810

811
static int32_t tsdbSnapWriteFileSetCloseIter(STsdbSnapWriter* writer) {
359✔
812
  tsdbIterMergerClose(&writer->ctx->dataIterMerger);
359✔
813
  tsdbIterMergerClose(&writer->ctx->tombIterMerger);
359✔
814
  TARRAY2_CLEAR(writer->ctx->dataIterArr, tsdbIterClose);
718✔
815
  TARRAY2_CLEAR(writer->ctx->tombIterArr, tsdbIterClose);
718✔
816
  return 0;
359✔
817
}
818

819
static int32_t tsdbSnapWriteFileSetOpenWriter(STsdbSnapWriter* writer) {
359✔
820
  int32_t code = 0;
359✔
821
  int32_t lino = 0;
359✔
822

823
  SFSetWriterConfig config = {
359✔
824
      .tsdb = writer->tsdb,
359✔
825
      .toSttOnly = writer->ctx->toSttOnly,
359✔
826
      .compactVersion = writer->compactVersion,
359✔
827
      .minRow = writer->minRow,
359✔
828
      .maxRow = writer->maxRow,
359✔
829
      .szPage = writer->szPage,
359✔
830
      .cmprAlg = writer->cmprAlg,
359✔
831
      .fid = writer->ctx->fid,
359✔
832
      .cid = writer->commitID,
359✔
833
      .expLevel = writer->ctx->expLevel,
359✔
834
      .level = writer->ctx->toSttOnly ? 1 : 0,
359✔
835
  };
836
  // merge stt files to either data or a new stt file
837
  if (writer->ctx->fset) {
359✔
838
    for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ++ftype) {
1,795✔
839
      if (writer->ctx->fset->farr[ftype] != NULL) {
1,436✔
UNCOV
840
        config.files[ftype].exist = true;
×
UNCOV
841
        config.files[ftype].file = writer->ctx->fset->farr[ftype]->f[0];
×
842
      }
843
    }
844
  }
845

846
  code = tsdbFSetWriterOpen(&config, &writer->ctx->fsetWriter);
359✔
847
  TSDB_CHECK_CODE(code, lino, _exit);
359✔
848

849
_exit:
359✔
850
  if (code) {
359✔
851
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
852
  }
853
  return code;
359✔
854
}
855

856
static int32_t tsdbSnapWriteFileSetCloseWriter(STsdbSnapWriter* writer) {
359✔
857
  return tsdbFSetWriterClose(&writer->ctx->fsetWriter, 0, writer->fopArr);
359✔
858
}
859

860
static int32_t tsdbSnapWriteFileSetBegin(STsdbSnapWriter* writer, int32_t fid) {
359✔
861
  int32_t code = 0;
359✔
862
  int32_t lino = 0;
359✔
863

864
  STFileSet* fset = &(STFileSet){.fid = fid};
359✔
865

866
  writer->ctx->fid = fid;
359✔
867
  STFileSet** fsetPtr = TARRAY2_SEARCH(writer->fsetArr, &fset, tsdbTFileSetCmprFn, TD_EQ);
359✔
868
  writer->ctx->fset = (fsetPtr == NULL) ? NULL : *fsetPtr;
359✔
869

870
  writer->ctx->expLevel = tsdbFidLevel(fid, &writer->tsdb->keepCfg, taosGetTimestampSec());
359✔
871

872
  writer->ctx->hasData = true;
359✔
873
  writer->ctx->hasTomb = true;
359✔
874

875
  code = tsdbSnapWriteFileSetOpenReader(writer);
359✔
876
  TSDB_CHECK_CODE(code, lino, _exit);
359✔
877

878
  code = tsdbSnapWriteFileSetOpenIter(writer);
359✔
879
  TSDB_CHECK_CODE(code, lino, _exit);
359✔
880

881
  code = tsdbSnapWriteFileSetOpenWriter(writer);
359✔
882
  TSDB_CHECK_CODE(code, lino, _exit);
359✔
883

884
  writer->ctx->fsetWriteBegin = true;
359✔
885

886
_exit:
359✔
887
  if (code) {
359✔
888
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
889
  } else {
890
    tsdbInfo("vgId:%d %s succeeded, fid:%d", TD_VID(writer->tsdb->pVnode), __func__, fid);
359✔
891
  }
892
  return code;
359✔
893
}
894

895
static int32_t tsdbSnapWriteTombRecord(STsdbSnapWriter* writer, const STombRecord* record) {
256✔
896
  int32_t code = 0;
256✔
897
  int32_t lino = 0;
256✔
898

899
  while (writer->ctx->hasTomb) {
256✔
900
    STombRecord* record1 = tsdbIterMergerGetTombRecord(writer->ctx->tombIterMerger);
256✔
901
    if (record1 == NULL) {
256✔
902
      writer->ctx->hasTomb = false;
256✔
903
      break;
256✔
904
    }
905

906
    int32_t c = tTombRecordCompare(record1, record);
×
907
    if (c <= 0) {
×
908
      code = tsdbFSetWriteTombRecord(writer->ctx->fsetWriter, record1);
×
909
      TSDB_CHECK_CODE(code, lino, _exit);
×
910
    } else {
911
      break;
×
912
    }
913

914
    code = tsdbIterMergerNext(writer->ctx->tombIterMerger);
×
915
    TSDB_CHECK_CODE(code, lino, _exit);
×
916
  }
917

918
  if (record->suid == INT64_MAX) {
256✔
919
    goto _exit;
256✔
920
  }
921

922
  code = tsdbFSetWriteTombRecord(writer->ctx->fsetWriter, record);
×
923
  TSDB_CHECK_CODE(code, lino, _exit);
×
924

925
_exit:
×
926
  if (code) {
256✔
927
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
928
  }
929
  return code;
256✔
930
}
931

932
static int32_t tsdbSnapWriteFileSetEnd(STsdbSnapWriter* writer) {
615✔
933
  if (!writer->ctx->fsetWriteBegin) return 0;
615✔
934

935
  int32_t code = 0;
256✔
936
  int32_t lino = 0;
256✔
937

938
  // end timeseries data write
939
  SRowInfo row = {
256✔
940
      .suid = INT64_MAX,
941
      .uid = INT64_MAX,
942
  };
943

944
  code = tsdbSnapWriteTimeSeriesRow(writer, &row);
256✔
945
  TSDB_CHECK_CODE(code, lino, _exit);
256✔
946

947
  // end tombstone data write
948
  STombRecord record = {
256✔
949
      .suid = INT64_MAX,
950
      .uid = INT64_MAX,
951
  };
952

953
  code = tsdbSnapWriteTombRecord(writer, &record);
256✔
954
  TSDB_CHECK_CODE(code, lino, _exit);
256✔
955

956
  // close write
957
  code = tsdbSnapWriteFileSetCloseWriter(writer);
256✔
958
  TSDB_CHECK_CODE(code, lino, _exit);
256✔
959

960
  code = tsdbSnapWriteFileSetCloseIter(writer);
256✔
961
  TSDB_CHECK_CODE(code, lino, _exit);
256✔
962

963
  code = tsdbSnapWriteFileSetCloseReader(writer);
256✔
964
  TSDB_CHECK_CODE(code, lino, _exit);
256✔
965

966
  writer->ctx->fsetWriteBegin = false;
256✔
967

968
_exit:
256✔
969
  if (code) {
256✔
970
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
971
  } else {
972
    tsdbInfo("vgId:%d %s succeeded, fid:%d", TD_VID(writer->tsdb->pVnode), __func__, writer->ctx->fid);
256✔
973
  }
974
  return code;
256✔
975
}
976

977
static int32_t tsdbSnapWriteFileSetAbort(STsdbSnapWriter* writer) {
103✔
978
  if (!writer->ctx->fsetWriteBegin) return 0;
103✔
979

980
  int32_t code = 0;
103✔
981
  int32_t lino = 0;
103✔
982

983
  // close write
984
  code = tsdbSnapWriteFileSetCloseWriter(writer);
103✔
985
  TSDB_CHECK_CODE(code, lino, _exit);
103✔
986

987
  code = tsdbSnapWriteFileSetCloseIter(writer);
103✔
988
  TSDB_CHECK_CODE(code, lino, _exit);
103✔
989

990
  code = tsdbSnapWriteFileSetCloseReader(writer);
103✔
991
  TSDB_CHECK_CODE(code, lino, _exit);
103✔
992

993
  writer->ctx->fsetWriteBegin = false;
103✔
994

995
_exit:
103✔
996
  if (code) {
103✔
997
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
998
  }
999
  return code;
103✔
1000
}
1001

1002
static int32_t tsdbSnapWriteTimeSeriesData(STsdbSnapWriter* writer, SSnapDataHdr* hdr) {
1,542✔
1003
  int32_t code = 0;
1,542✔
1004
  int32_t lino = 0;
1,542✔
1005

1006
  SBlockData blockData[1] = {0};
1,542✔
1007

1008
  SBuffer buffer = {
1,542✔
1009
      .capacity = hdr->size,
1,542✔
1010
      .data = hdr->data,
1,542✔
1011
      .size = hdr->size,
1,542✔
1012
  };
1013
  SBufferReader br = BUFFER_READER_INITIALIZER(0, &buffer);
1,542✔
1014

1015
  code = tBlockDataDecompress(&br, blockData, &writer->buffers[0]);
1,542✔
1016
  TSDB_CHECK_CODE(code, lino, _exit);
1,542✔
1017

1018
  int32_t fid = tsdbKeyFid(blockData->aTSKEY[0], writer->minutes, writer->precision);
1,542✔
1019
  if (!writer->ctx->fsetWriteBegin || fid != writer->ctx->fid) {
1,542✔
1020
    code = tsdbSnapWriteFileSetEnd(writer);
359✔
1021
    TSDB_CHECK_CODE(code, lino, _exit);
359✔
1022

1023
    code = tsdbSnapWriteFileSetBegin(writer, fid);
359✔
1024
    TSDB_CHECK_CODE(code, lino, _exit);
359✔
1025
  }
1026

1027
  for (int32_t i = 0; i < blockData->nRow; ++i) {
16,787,032✔
1028
    SRowInfo rowInfo = {
16,785,490✔
1029
        .suid = blockData->suid,
16,785,490✔
1030
        .uid = blockData->uid ? blockData->uid : blockData->aUid[i],
16,785,490✔
1031
        .row = tsdbRowFromBlockData(blockData, i),
1032
    };
1033

1034
    code = tsdbSnapWriteTimeSeriesRow(writer, &rowInfo);
16,785,490✔
1035
    TSDB_CHECK_CODE(code, lino, _exit);
16,785,490✔
1036
  }
1037

1038
_exit:
1,542✔
1039
  if (code) {
1,542✔
1040
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
1041
  } else {
1042
    tsdbDebug("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64 " nRow:%d", TD_VID(writer->tsdb->pVnode), __func__,
1,542✔
1043
              blockData->suid, blockData->uid, blockData->nRow);
1044
  }
1045
  tBlockDataDestroy(blockData);
1,542✔
1046
  return code;
1,542✔
1047
}
1048

1049
static int32_t tsdbSnapWriteDecmprTombBlock(SSnapDataHdr* hdr, STombBlock* tombBlock) {
×
1050
  int32_t code = 0;
×
1051
  int32_t lino = 0;
×
1052

1053
  tTombBlockClear(tombBlock);
×
1054

1055
  int64_t size = hdr->size;
×
1056
  size = size / TOMB_RECORD_ELEM_NUM;
×
1057
  tombBlock->numOfRecords = size / sizeof(int64_t);
×
1058

1059
  // int64_t* data = (int64_t*)hdr->data;
1060
  uint8_t* data = hdr->data;
×
1061
  for (int32_t i = 0; i < TOMB_RECORD_ELEM_NUM; ++i) {
×
1062
    code = tBufferPut(tombBlock->buffers + i, data, size);
×
1063
    TSDB_CHECK_CODE(code, lino, _exit);
×
1064
    data += size;
×
1065
  }
1066

1067
_exit:
×
1068
  return code;
×
1069
}
1070

1071
static int32_t tsdbSnapWriteTombData(STsdbSnapWriter* writer, SSnapDataHdr* hdr) {
×
1072
  int32_t code = 0;
×
1073
  int32_t lino = 0;
×
1074

1075
  STombRecord record;
×
1076
  STombBlock  tombBlock[1] = {0};
×
1077

1078
  code = tsdbSnapWriteDecmprTombBlock(hdr, tombBlock);
×
1079
  TSDB_CHECK_CODE(code, lino, _exit);
×
1080

1081
  code = tTombBlockGet(tombBlock, 0, &record);
×
1082
  TSDB_CHECK_CODE(code, lino, _exit);
×
1083
  int32_t fid = tsdbKeyFid(record.skey, writer->minutes, writer->precision);
×
1084
  if (!writer->ctx->fsetWriteBegin || fid != writer->ctx->fid) {
×
1085
    code = tsdbSnapWriteFileSetEnd(writer);
×
1086
    TSDB_CHECK_CODE(code, lino, _exit);
×
1087

1088
    code = tsdbSnapWriteFileSetBegin(writer, fid);
×
1089
    TSDB_CHECK_CODE(code, lino, _exit);
×
1090
  }
1091

1092
  if (writer->ctx->hasData) {
×
1093
    SRowInfo row = {
×
1094
        .suid = INT64_MAX,
1095
        .uid = INT64_MAX,
1096
    };
1097

1098
    code = tsdbSnapWriteTimeSeriesRow(writer, &row);
×
1099
    TSDB_CHECK_CODE(code, lino, _exit);
×
1100
  }
1101

1102
  for (int32_t i = 0; i < TOMB_BLOCK_SIZE(tombBlock); ++i) {
×
1103
    code = tTombBlockGet(tombBlock, i, &record);
×
1104
    TSDB_CHECK_CODE(code, lino, _exit);
×
1105

1106
    code = tsdbSnapWriteTombRecord(writer, &record);
×
1107
    TSDB_CHECK_CODE(code, lino, _exit);
×
1108
  }
1109

1110
  tTombBlockDestroy(tombBlock);
×
1111

1112
_exit:
×
1113
  if (code) {
×
1114
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
1115
  }
1116
  return code;
×
1117
}
1118

1119
int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, void* pRanges, STsdbSnapWriter** writer) {
359✔
1120
  int32_t code = 0;
359✔
1121
  int32_t lino = 0;
359✔
1122

1123
  // start to write
1124
  writer[0] = taosMemoryCalloc(1, sizeof(*writer[0]));
359✔
1125
  if (writer[0] == NULL) return terrno;
359✔
1126

1127
  writer[0]->tsdb = pTsdb;
359✔
1128
  writer[0]->sver = sver;
359✔
1129
  writer[0]->ever = ever;
359✔
1130
  writer[0]->minutes = pTsdb->keepCfg.days;
359✔
1131
  writer[0]->precision = pTsdb->keepCfg.precision;
359✔
1132
  writer[0]->minRow = pTsdb->pVnode->config.tsdbCfg.minRows;
359✔
1133
  writer[0]->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows;
359✔
1134
  writer[0]->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression;
359✔
1135
  writer[0]->commitID = tsdbFSAllocEid(pTsdb->pFS);
359✔
1136
  writer[0]->szPage = pTsdb->pVnode->config.tsdbPageSize;
359✔
1137
  writer[0]->compactVersion = INT64_MAX;
359✔
1138
  writer[0]->now = taosGetTimestampMs();
718✔
1139

1140
  code =
1141
      tsdbFSCreateCopyRangedSnapshot(pTsdb->pFS, (TFileSetRangeArray*)pRanges, &writer[0]->fsetArr, writer[0]->fopArr);
359✔
1142
  TSDB_CHECK_CODE(code, lino, _exit);
359✔
1143

1144
_exit:
359✔
1145
  if (code) {
359✔
1146
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
×
1147
  } else {
1148
    tsdbInfo("vgId:%d %s done, sver:%" PRId64 " ever:%" PRId64, TD_VID(pTsdb->pVnode), __func__, sver, ever);
359✔
1149
  }
1150
  return code;
359✔
1151
}
1152

1153
int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* writer, bool rollback) {
359✔
1154
  int32_t code = 0;
359✔
1155
  int32_t lino = 0;
359✔
1156

1157
  if (!rollback) {
359✔
1158
    code = tsdbSnapWriteFileSetEnd(writer);
256✔
1159
    TSDB_CHECK_CODE(code, lino, _exit);
256✔
1160

1161
    code = tsdbFSEditBegin(writer->tsdb->pFS, writer->fopArr, TSDB_FEDIT_COMMIT);
256✔
1162
    TSDB_CHECK_CODE(code, lino, _exit);
256✔
1163
  } else {
1164
    code = tsdbSnapWriteFileSetAbort(writer);
103✔
1165
    TSDB_CHECK_CODE(code, lino, _exit);
103✔
1166

1167
    code = tsdbFSEditBegin(writer->tsdb->pFS, writer->fopArr, TSDB_FEDIT_COMMIT);
103✔
1168
    TSDB_CHECK_CODE(code, lino, _exit);
103✔
1169
  }
1170

1171
_exit:
103✔
1172
  if (code) {
359✔
1173
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
1174
  } else {
1175
    tsdbDebug("vgId:%d %s done", TD_VID(writer->tsdb->pVnode), __func__);
359✔
1176
  }
1177
  return code;
359✔
1178
}
1179

1180
int32_t tsdbSnapWriterClose(STsdbSnapWriter** writer, int8_t rollback) {
359✔
1181
  if (writer[0] == NULL) return 0;
359✔
1182

1183
  int32_t code = 0;
359✔
1184
  int32_t lino = 0;
359✔
1185

1186
  STsdb* tsdb = writer[0]->tsdb;
359✔
1187

1188
  if (rollback) {
359✔
1189
    code = tsdbFSEditAbort(writer[0]->tsdb->pFS);
103✔
1190
    TSDB_CHECK_CODE(code, lino, _exit);
103✔
1191
  } else {
1192
    (void)taosThreadMutexLock(&writer[0]->tsdb->mutex);
256✔
1193

1194
    code = tsdbFSEditCommit(writer[0]->tsdb->pFS);
256✔
1195
    if (code) {
256✔
1196
      (void)taosThreadMutexUnlock(&writer[0]->tsdb->mutex);
×
1197
      TSDB_CHECK_CODE(code, lino, _exit);
×
1198
    }
1199

1200
    writer[0]->tsdb->pFS->fsstate = TSDB_FS_STATE_NORMAL;
256✔
1201

1202
    (void)taosThreadMutexUnlock(&writer[0]->tsdb->mutex);
256✔
1203
  }
1204

1205
  tsdbIterMergerClose(&writer[0]->ctx->tombIterMerger);
359✔
1206
  tsdbIterMergerClose(&writer[0]->ctx->dataIterMerger);
359✔
1207
  TARRAY2_DESTROY(writer[0]->ctx->tombIterArr, tsdbIterClose);
359✔
1208
  TARRAY2_DESTROY(writer[0]->ctx->dataIterArr, tsdbIterClose);
359✔
1209
  TARRAY2_DESTROY(writer[0]->ctx->sttReaderArr, tsdbSttFileReaderClose);
359✔
1210
  tsdbDataFileReaderClose(&writer[0]->ctx->dataReader);
359✔
1211

1212
  TARRAY2_DESTROY(writer[0]->fopArr, NULL);
359✔
1213
  tsdbFSDestroyCopyRangedSnapshot(&writer[0]->fsetArr);
359✔
1214

1215
  for (int32_t i = 0; i < ARRAY_SIZE(writer[0]->buffers); ++i) {
3,949✔
1216
    tBufferDestroy(writer[0]->buffers + i);
3,590✔
1217
  }
1218

1219
  taosMemoryFree(writer[0]);
359✔
1220
  writer[0] = NULL;
359✔
1221

1222
_exit:
359✔
1223
  if (code) {
359✔
1224
    TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
×
1225
  } else {
1226
    tsdbInfo("vgId:%d %s done, rollback:%d", TD_VID(tsdb->pVnode), __func__, rollback);
359✔
1227
  }
1228
  return code;
359✔
1229
}
1230

1231
int32_t tsdbSnapWrite(STsdbSnapWriter* writer, SSnapDataHdr* hdr) {
1,542✔
1232
  int32_t code = 0;
1,542✔
1233
  int32_t lino = 0;
1,542✔
1234

1235
  if (hdr->type == SNAP_DATA_TSDB) {
1,542✔
1236
    code = tsdbSnapWriteTimeSeriesData(writer, hdr);
1,542✔
1237
    TSDB_CHECK_CODE(code, lino, _exit);
1,542✔
1238
  } else if (hdr->type == SNAP_DATA_DEL) {
×
1239
    code = tsdbSnapWriteTombData(writer, hdr);
×
1240
    TSDB_CHECK_CODE(code, lino, _exit);
×
1241
  } else {
1242
    TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_PARA, lino, _exit);
×
1243
  }
1244

1245
_exit:
1,542✔
1246
  if (code) {
1,542✔
1247
    tsdbError("vgId:%d %s failed at line %d since %s, type:%d index:%" PRId64 " size:%" PRId64,
×
1248
              TD_VID(writer->tsdb->pVnode), __func__, lino, tstrerror(code), hdr->type, hdr->index, hdr->size);
1249
  } else {
1250
    tsdbDebug("vgId:%d %s done, type:%d index:%" PRId64 " size:%" PRId64, TD_VID(writer->tsdb->pVnode), __func__,
1,542✔
1251
              hdr->type, hdr->index, hdr->size);
1252
  }
1253
  return code;
1,542✔
1254
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc