• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4850

14 Nov 2025 08:06AM UTC coverage: 63.728% (-0.1%) from 63.829%
#4850

push

travis-ci

guanshengliang
Merge branch '3.0' into cover/3.0

355 of 675 new or added lines in 18 files covered. (52.59%)

634 existing lines in 110 files now uncovered.

149066 of 233910 relevant lines covered (63.73%)

115676883.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.93
/source/dnode/vnode/src/tsdb/tsdbSnapshot.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tsdb.h"
17
#include "tsdbDataFileRW.h"
18
#include "tsdbFS2.h"
19
#include "tsdbFSetRW.h"
20
#include "tsdbIter.h"
21
#include "tsdbSttFileRW.h"
22

23
extern int32_t tsdbUpdateTableSchema(SMeta* pMeta, int64_t suid, int64_t uid, SSkmInfo* pSkmInfo);
24

25
// STsdbSnapReader ========================================
26
struct STsdbSnapReader {
27
  STsdb*  tsdb;
28
  int64_t sver;
29
  int64_t ever;
30
  int8_t  type;
31

32
  SBuffer  buffers[10];
33
  SSkmInfo skmTb[1];
34

35
  TFileSetRangeArray* fsrArr;
36

37
  // context
38
  struct {
39
    int32_t         fsrArrIdx;
40
    STFileSetRange* fsr;
41
    bool            isDataDone;
42
    bool            isTombDone;
43
  } ctx[1];
44

45
  // reader
46
  SDataFileReader*    dataReader;
47
  TSttFileReaderArray sttReaderArr[1];
48

49
  // iter
50
  TTsdbIterArray dataIterArr[1];
51
  SIterMerger*   dataIterMerger;
52
  TTsdbIterArray tombIterArr[1];
53
  SIterMerger*   tombIterMerger;
54

55
  // data
56
  SBlockData blockData[1];
57
  STombBlock tombBlock[1];
58
};
59

60
static void tsdbSnapReadFileSetCloseReader(STsdbSnapReader* reader) {
1,283✔
61
  TARRAY2_CLEAR(reader->sttReaderArr, tsdbSttFileReaderClose);
1,940✔
62
  tsdbDataFileReaderClose(&reader->dataReader);
1,283✔
63
  return;
1,283✔
64
}
65

66
static int32_t tsdbSnapReadFileSetOpenReader(STsdbSnapReader* reader) {
1,283✔
67
  int32_t code = 0;
1,283✔
68
  int32_t lino = 0;
1,283✔
69

70
  // data
71
  SDataFileReaderConfig config = {
1,283✔
72
      .tsdb = reader->tsdb,
1,283✔
73
      .szPage = reader->tsdb->pVnode->config.tsdbPageSize,
1,283✔
74
      .buffers = reader->buffers,
1,283✔
75
  };
76
  bool hasDataFile = false;
1,283✔
77
  for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ftype++) {
6,415✔
78
    if (reader->ctx->fsr->fset->farr[ftype] != NULL) {
5,132✔
79
      hasDataFile = true;
3,849✔
80
      config.files[ftype].exist = true;
3,849✔
81
      config.files[ftype].file = reader->ctx->fsr->fset->farr[ftype]->f[0];
3,849✔
82
    }
83
  }
84

85
  if (hasDataFile) {
1,283✔
86
    code = tsdbDataFileReaderOpen(NULL, &config, &reader->dataReader);
1,283✔
87
    TSDB_CHECK_CODE(code, lino, _exit);
1,283✔
88
  }
89

90
  // stt
91
  SSttLvl* lvl;
92
  TARRAY2_FOREACH(reader->ctx->fsr->fset->lvlArr, lvl) {
1,940✔
93
    STFileObj* fobj;
94
    TARRAY2_FOREACH(lvl->fobjArr, fobj) {
1,314✔
95
      SSttFileReader*      sttReader;
657✔
96
      SSttFileReaderConfig config = {
1,314✔
97
          .tsdb = reader->tsdb,
657✔
98
          .szPage = reader->tsdb->pVnode->config.tsdbPageSize,
657✔
99
          .file = fobj->f[0],
100
          .buffers = reader->buffers,
657✔
101
      };
102

103
      code = tsdbSttFileReaderOpen(fobj->fname, &config, &sttReader);
657✔
104
      TSDB_CHECK_CODE(code, lino, _exit);
657✔
105

106
      if ((code = TARRAY2_APPEND(reader->sttReaderArr, sttReader))) {
1,314✔
107
        tsdbSttFileReaderClose(&sttReader);
×
108
        TSDB_CHECK_CODE(code, lino, _exit);
×
109
      }
110
    }
111
  }
112

113
_exit:
1,283✔
114
  if (code) {
1,283✔
115
    tsdbSnapReadFileSetCloseReader(reader);
×
116
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
×
117
  }
118
  return code;
1,283✔
119
}
120

121
static int32_t tsdbSnapReadFileSetOpenIter(STsdbSnapReader* reader) {
1,283✔
122
  int32_t code = 0;
1,283✔
123
  int32_t lino = 0;
1,283✔
124

125
  STsdbIter*      iter;
1,283✔
126
  STsdbIterConfig config = {
1,283✔
127
      .filterByVersion = true,
128
      .verRange[0] = reader->ctx->fsr->sver,
1,283✔
129
      .verRange[1] = reader->ctx->fsr->ever,
1,283✔
130
  };
131

132
  // data file
133
  if (reader->dataReader) {
1,283✔
134
    // data
135
    config.type = TSDB_ITER_TYPE_DATA;
1,283✔
136
    config.dataReader = reader->dataReader;
1,283✔
137

138
    code = tsdbIterOpen(&config, &iter);
1,283✔
139
    TSDB_CHECK_CODE(code, lino, _exit);
1,283✔
140

141
    code = TARRAY2_APPEND(reader->dataIterArr, iter);
1,283✔
142
    TSDB_CHECK_CODE(code, lino, _exit);
1,283✔
143

144
    // tomb
145
    config.type = TSDB_ITER_TYPE_DATA_TOMB;
1,283✔
146
    config.dataReader = reader->dataReader;
1,283✔
147

148
    code = tsdbIterOpen(&config, &iter);
1,283✔
149
    TSDB_CHECK_CODE(code, lino, _exit);
1,283✔
150

151
    code = TARRAY2_APPEND(reader->tombIterArr, iter);
1,283✔
152
    TSDB_CHECK_CODE(code, lino, _exit);
1,283✔
153
  }
154

155
  // stt file
156
  SSttFileReader* sttReader;
157
  TARRAY2_FOREACH(reader->sttReaderArr, sttReader) {
1,940✔
158
    // data
159
    config.type = TSDB_ITER_TYPE_STT;
657✔
160
    config.sttReader = sttReader;
657✔
161

162
    code = tsdbIterOpen(&config, &iter);
657✔
163
    TSDB_CHECK_CODE(code, lino, _exit);
657✔
164

165
    code = TARRAY2_APPEND(reader->dataIterArr, iter);
657✔
166
    TSDB_CHECK_CODE(code, lino, _exit);
657✔
167

168
    // tomb
169
    config.type = TSDB_ITER_TYPE_STT_TOMB;
657✔
170
    config.sttReader = sttReader;
657✔
171

172
    code = tsdbIterOpen(&config, &iter);
657✔
173
    TSDB_CHECK_CODE(code, lino, _exit);
657✔
174

175
    code = TARRAY2_APPEND(reader->tombIterArr, iter);
657✔
176
    TSDB_CHECK_CODE(code, lino, _exit);
657✔
177
  }
178

179
  // merger
180
  code = tsdbIterMergerOpen(reader->dataIterArr, &reader->dataIterMerger, false);
1,283✔
181
  TSDB_CHECK_CODE(code, lino, _exit);
1,283✔
182

183
  code = tsdbIterMergerOpen(reader->tombIterArr, &reader->tombIterMerger, true);
1,283✔
184
  TSDB_CHECK_CODE(code, lino, _exit);
1,283✔
185

186
_exit:
1,283✔
187
  if (code) {
1,283✔
188
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
×
189
  }
190
  return code;
1,283✔
191
}
192

193
static void tsdbSnapReadFileSetCloseIter(STsdbSnapReader* reader) {
1,283✔
194
  tsdbIterMergerClose(&reader->dataIterMerger);
1,283✔
195
  tsdbIterMergerClose(&reader->tombIterMerger);
1,283✔
196
  TARRAY2_CLEAR(reader->dataIterArr, tsdbIterClose);
3,223✔
197
  TARRAY2_CLEAR(reader->tombIterArr, tsdbIterClose);
3,223✔
198
  return;
1,283✔
199
}
200

201
static int32_t tsdbSnapReadRangeBegin(STsdbSnapReader* reader) {
2,566✔
202
  int32_t code = 0;
2,566✔
203
  int32_t lino = 0;
2,566✔
204

205
  if (reader->ctx->fsrArrIdx < TARRAY2_SIZE(reader->fsrArr)) {
2,566✔
206
    reader->ctx->fsr = TARRAY2_GET(reader->fsrArr, reader->ctx->fsrArrIdx++);
1,283✔
207
    reader->ctx->isDataDone = false;
1,283✔
208
    reader->ctx->isTombDone = false;
1,283✔
209

210
    code = tsdbSnapReadFileSetOpenReader(reader);
1,283✔
211
    TSDB_CHECK_CODE(code, lino, _exit);
1,283✔
212

213
    code = tsdbSnapReadFileSetOpenIter(reader);
1,283✔
214
    TSDB_CHECK_CODE(code, lino, _exit);
1,283✔
215
  }
216

217
_exit:
2,566✔
218
  if (code) {
2,566✔
219
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
×
220
  }
221
  return code;
2,566✔
222
}
223

224
static int32_t tsdbSnapReadRangeEnd(STsdbSnapReader* reader) {
1,283✔
225
  tsdbSnapReadFileSetCloseIter(reader);
1,283✔
226
  tsdbSnapReadFileSetCloseReader(reader);
1,283✔
227
  reader->ctx->fsr = NULL;
1,283✔
228
  return 0;
1,283✔
229
}
230

231
static int32_t tsdbSnapCmprData(STsdbSnapReader* reader, uint8_t** data) {
1,596✔
232
  int32_t code = 0;
1,596✔
233
  int32_t lino = 0;
1,596✔
234

235
  SColCompressInfo info;
236

237
  SColCompressInfo cmprInfo = {.pColCmpr = NULL, .defaultCmprAlg = NO_COMPRESSION};
1,596✔
238
  code = tBlockDataCompress(reader->blockData, (void*)&cmprInfo, reader->buffers, reader->buffers + 4);
1,596✔
239
  TSDB_CHECK_CODE(code, lino, _exit);
1,596✔
240

241
  int32_t size = 0;
1,596✔
242
  for (int i = 0; i < 4; i++) {
7,980✔
243
    size += reader->buffers[i].size;
6,384✔
244
  }
245
  *data = taosMemoryMalloc(sizeof(SSnapDataHdr) + size);
1,596✔
246
  if (*data == NULL) {
1,596✔
247
    code = terrno;
×
248
    TSDB_CHECK_CODE(code, lino, _exit);
×
249
  }
250

251
  SSnapDataHdr* pHdr = (SSnapDataHdr*)*data;
1,596✔
252
  uint8_t*      pBuf = pHdr->data;
1,596✔
253

254
  pHdr->type = reader->type;
1,596✔
255
  pHdr->size = size;
1,596✔
256
  for (int i = 0; i < 4; i++) {
7,980✔
257
    memcpy(pBuf, reader->buffers[i].data, reader->buffers[i].size);
6,384✔
258
    pBuf += reader->buffers[i].size;
6,384✔
259
  }
260

261
_exit:
1,596✔
262
  if (code) {
1,596✔
263
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), lino, code);
×
264
  }
265
  return code;
1,596✔
266
}
267

268
static int64_t tBlockDataSize(SBlockData* pBlockData) {
977,812✔
269
  int64_t nData = 0;
977,812✔
270
  for (int32_t iCol = 0; iCol < pBlockData->nColData; iCol++) {
9,778,120✔
271
    SColData* pColData = tBlockDataGetColDataByIdx(pBlockData, iCol);
8,797,804✔
272
    if (pColData->flag == HAS_NONE || pColData->flag == HAS_NULL) {
8,798,117✔
273
      continue;
×
274
    }
275

276
    if (pColData->flag != HAS_VALUE) {
8,797,804✔
277
      if (pColData->flag == (HAS_NONE | HAS_NULL | HAS_VALUE)) {
×
278
        nData += BIT2_SIZE(pColData->nVal);
×
279
      } else {
280
        nData += BIT1_SIZE(pColData->nVal);
×
281
      }
282
    }
283

284
    if (pColData->flag == (HAS_NONE | HAS_NULL)) {
8,798,117✔
285
      continue;
×
286
    }
287

288
    if (IS_VAR_DATA_TYPE(pColData->type)) {
8,797,491✔
289
      nData += pColData->nVal * sizeof(int32_t);  // var data offset
×
290
    }
291

292
    nData += pColData->nData;
8,800,308✔
293
  }
294
  return nData;
977,812✔
295
}
296

297
static int32_t tsdbSnapReadTimeSeriesData(STsdbSnapReader* reader, uint8_t** data) {
2,879✔
298
  int32_t   code = 0;
2,879✔
299
  int32_t   lino = 0;
2,879✔
300
  SMetaInfo info;
2,879✔
301

302
  tBlockDataReset(reader->blockData);
2,879✔
303

304
  TABLEID tbid[1] = {0};
2,879✔
305
  for (SRowInfo* row; (row = tsdbIterMergerGetData(reader->dataIterMerger));) {
15,634,756✔
306
    // skip dropped table
307
    if (row->uid != tbid->uid) {
15,629,686✔
308
      tbid->suid = row->suid;
4,100✔
309
      tbid->uid = row->uid;
4,100✔
310
      if (metaGetInfo(reader->tsdb->pVnode->pMeta, tbid->uid, &info, NULL) != 0) {
4,100✔
311
        code = tsdbIterMergerSkipTableData(reader->dataIterMerger, tbid);
×
312
        TSDB_CHECK_CODE(code, lino, _exit);
×
313
        continue;
×
314
      }
315
    }
316

317
    if (reader->blockData->suid == 0 && reader->blockData->uid == 0) {
15,630,312✔
318
      code = tsdbUpdateSkmTb(reader->tsdb, (TABLEID*)row, reader->skmTb);
1,596✔
319
      TSDB_CHECK_CODE(code, lino, _exit);
1,596✔
320

321
      TABLEID tbid1 = {
1,596✔
322
          .suid = row->suid,
1,596✔
323
          .uid = row->suid ? 0 : row->uid,
1,596✔
324
      };
325
      code = tBlockDataInit(reader->blockData, &tbid1, reader->skmTb->pTSchema, NULL, 0);
1,596✔
326
      TSDB_CHECK_CODE(code, lino, _exit);
1,596✔
327
    }
328

329
    if (!TABLE_SAME_SCHEMA(reader->blockData->suid, reader->blockData->uid, row->suid, row->uid)) {
15,624,052✔
330
      break;
×
331
    }
332

333
    code = tBlockDataAppendRow(reader->blockData, &row->row, NULL, row->uid);
15,636,259✔
334
    TSDB_CHECK_CODE(code, lino, _exit);
15,641,893✔
335

336
    code = tsdbIterMergerNext(reader->dataIterMerger);
15,641,893✔
337
    TSDB_CHECK_CODE(code, lino, _exit);
15,632,503✔
338

339
    if (!(reader->blockData->nRow % 16)) {
15,632,503✔
340
      int64_t nData = tBlockDataSize(reader->blockData);
977,812✔
341
      if (nData >= TSDB_SNAP_DATA_PAYLOAD_SIZE) {
977,812✔
342
        break;
313✔
343
      }
344
    }
345
  }
346

347
  if (reader->blockData->nRow > 0) {
1,940✔
348
    code = tsdbSnapCmprData(reader, data);
1,596✔
349
    TSDB_CHECK_CODE(code, lino, _exit);
1,596✔
350
  }
351

352
_exit:
2,879✔
353
  if (code) {
2,879✔
354
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
×
355
  } else {
356
    tsdbDebug("vgId:%d, tsdb snapshot read time-series data done, data size:%" PRId64 "", TD_VID(reader->tsdb->pVnode),
2,879✔
357
              data[0] ? ((SSnapDataHdr*)(data[0]))->size : 0);
358
  }
359
  return code;
2,879✔
360
}
361

362
static int32_t tsdbSnapCmprTombData(STsdbSnapReader* reader, uint8_t** data) {
×
363
  int32_t code = 0;
×
364
  int32_t lino = 0;
×
365

366
  int64_t size = 0;
×
367
  for (int32_t i = 0; i < ARRAY_SIZE(reader->tombBlock->buffers); i++) {
×
368
    size += reader->tombBlock->buffers[i].size;
×
369
  }
370

371
  data[0] = taosMemoryMalloc(size + sizeof(SSnapDataHdr));
×
372
  if (data[0] == NULL) {
×
373
    code = terrno;
×
374
    TSDB_CHECK_CODE(code, lino, _exit);
×
375
  }
376

377
  SSnapDataHdr* hdr = (SSnapDataHdr*)(data[0]);
×
378
  hdr->type = SNAP_DATA_DEL;
×
379
  hdr->size = size;
×
380

381
  uint8_t* tdata = hdr->data;
×
382
  for (int32_t i = 0; i < ARRAY_SIZE(reader->tombBlock->buffers); i++) {
×
383
    memcpy(tdata, reader->tombBlock->buffers[i].data, reader->tombBlock->buffers[i].size);
×
384
    tdata += reader->tombBlock->buffers[i].size;
×
385
  }
386

387
_exit:
×
388
  if (code) {
×
389
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
×
390
  }
391
  return code;
×
392
}
393

394
static int32_t tsdbSnapReadTombData(STsdbSnapReader* reader, uint8_t** data) {
1,283✔
395
  int32_t   code = 0;
1,283✔
396
  int32_t   lino = 0;
1,283✔
397
  SMetaInfo info;
1,283✔
398

399
  tTombBlockClear(reader->tombBlock);
1,283✔
400

401
  TABLEID tbid[1] = {0};
1,283✔
402
  for (STombRecord* record; (record = tsdbIterMergerGetTombRecord(reader->tombIterMerger)) != NULL;) {
1,283✔
403
    if (record->uid != tbid->uid) {
×
404
      tbid->suid = record->suid;
×
405
      tbid->uid = record->uid;
×
406
      if (metaGetInfo(reader->tsdb->pVnode->pMeta, tbid->uid, &info, NULL) != 0) {
×
407
        code = tsdbIterMergerSkipTableData(reader->tombIterMerger, tbid);
×
408
        TSDB_CHECK_CODE(code, lino, _exit);
×
409
        continue;
×
410
      }
411
    }
412

413
    code = tTombBlockPut(reader->tombBlock, record);
×
414
    TSDB_CHECK_CODE(code, lino, _exit);
×
415

416
    code = tsdbIterMergerNext(reader->tombIterMerger);
×
417
    TSDB_CHECK_CODE(code, lino, _exit);
×
418

419
    if (TOMB_BLOCK_SIZE(reader->tombBlock) >= 81920) {
×
420
      break;
×
421
    }
422
  }
423

424
  if (TOMB_BLOCK_SIZE(reader->tombBlock) > 0) {
1,283✔
425
    code = tsdbSnapCmprTombData(reader, data);
×
426
    TSDB_CHECK_CODE(code, lino, _exit);
×
427
  }
428

429
_exit:
1,283✔
430
  if (code) {
1,283✔
431
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
×
432
  }
433
  return code;
1,283✔
434
}
435

436
int32_t tsdbSnapReaderOpen(STsdb* tsdb, int64_t sver, int64_t ever, int8_t type, void* pRanges,
1,283✔
437
                           STsdbSnapReader** reader) {
438
  int32_t code = 0;
1,283✔
439
  int32_t lino = 0;
1,283✔
440

441
  reader[0] = (STsdbSnapReader*)taosMemoryCalloc(1, sizeof(*reader[0]));
1,283✔
442
  if (reader[0] == NULL) return terrno;
1,283✔
443

444
  reader[0]->tsdb = tsdb;
1,283✔
445
  reader[0]->sver = sver;
1,283✔
446
  reader[0]->ever = ever;
1,283✔
447
  reader[0]->type = type;
1,283✔
448

449
  code = tsdbFSCreateRefRangedSnapshot(tsdb->pFS, sver, ever, (TFileSetRangeArray*)pRanges, &reader[0]->fsrArr);
1,283✔
450
  TSDB_CHECK_CODE(code, lino, _exit);
1,283✔
451

452
_exit:
1,283✔
453
  if (code) {
1,283✔
454
    tsdbError("vgId:%d %s failed at %s:%d since %s, sver:%" PRId64 " ever:%" PRId64 " type:%d", TD_VID(tsdb->pVnode),
×
455
              __func__, __FILE__, lino, tstrerror(code), sver, ever, type);
456
    tsdbTFileSetRangeArrayDestroy(&reader[0]->fsrArr);
×
457
    taosMemoryFree(reader[0]);
×
458
    reader[0] = NULL;
×
459
  } else {
460
    tsdbInfo("vgId:%d, tsdb snapshot incremental reader opened. sver:%" PRId64 " ever:%" PRId64 " type:%d",
1,283✔
461
             TD_VID(tsdb->pVnode), sver, ever, type);
462
  }
463
  return code;
1,283✔
464
}
465

466
void tsdbSnapReaderClose(STsdbSnapReader** reader) {
1,283✔
467
  if (reader[0] == NULL) {
1,283✔
468
    return;
×
469
  }
470

471
  int32_t code = 0;
1,283✔
472

473
  STsdb* tsdb = reader[0]->tsdb;
1,283✔
474

475
  tTombBlockDestroy(reader[0]->tombBlock);
1,283✔
476
  tBlockDataDestroy(reader[0]->blockData);
1,283✔
477

478
  tsdbIterMergerClose(&reader[0]->dataIterMerger);
1,283✔
479
  tsdbIterMergerClose(&reader[0]->tombIterMerger);
1,283✔
480
  TARRAY2_DESTROY(reader[0]->dataIterArr, tsdbIterClose);
1,283✔
481
  TARRAY2_DESTROY(reader[0]->tombIterArr, tsdbIterClose);
1,283✔
482
  TARRAY2_DESTROY(reader[0]->sttReaderArr, tsdbSttFileReaderClose);
1,283✔
483
  tsdbDataFileReaderClose(&reader[0]->dataReader);
1,283✔
484

485
  tsdbFSDestroyRefRangedSnapshot(&reader[0]->fsrArr);
1,283✔
486
  tDestroyTSchema(reader[0]->skmTb->pTSchema);
1,283✔
487

488
  for (int32_t i = 0; i < ARRAY_SIZE(reader[0]->buffers); ++i) {
14,113✔
489
    tBufferDestroy(reader[0]->buffers + i);
12,830✔
490
  }
491

492
  taosMemoryFree(reader[0]);
1,283✔
493
  reader[0] = NULL;
1,283✔
494

495
  return;
1,283✔
496
}
497

498
int32_t tsdbSnapRead(STsdbSnapReader* reader, uint8_t** data) {
2,879✔
499
  int32_t code = 0;
2,879✔
500
  int32_t lino = 0;
2,879✔
501

502
  data[0] = NULL;
2,879✔
503

504
  for (;;) {
505
    if (reader->ctx->fsr == NULL) {
4,162✔
506
      code = tsdbSnapReadRangeBegin(reader);
2,566✔
507
      TSDB_CHECK_CODE(code, lino, _exit);
2,566✔
508

509
      if (reader->ctx->fsr == NULL) {
2,566✔
510
        break;
1,283✔
511
      }
512
    }
513

514
    if (!reader->ctx->isDataDone) {
2,879✔
515
      code = tsdbSnapReadTimeSeriesData(reader, data);
2,879✔
516
      TSDB_CHECK_CODE(code, lino, _exit);
2,879✔
517
      if (data[0]) {
2,879✔
518
        goto _exit;
1,596✔
519
      } else {
520
        reader->ctx->isDataDone = true;
1,283✔
521
      }
522
    }
523

524
    if (!reader->ctx->isTombDone) {
1,283✔
525
      code = tsdbSnapReadTombData(reader, data);
1,283✔
526
      TSDB_CHECK_CODE(code, lino, _exit);
1,283✔
527
      if (data[0]) {
1,283✔
528
        goto _exit;
×
529
      } else {
530
        reader->ctx->isTombDone = true;
1,283✔
531
      }
532
    }
533

534
    code = tsdbSnapReadRangeEnd(reader);
1,283✔
535
    TSDB_CHECK_CODE(code, lino, _exit);
1,283✔
536
  }
537

538
_exit:
2,879✔
539
  if (code) {
2,879✔
540
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
×
541
  } else {
542
    tsdbDebug("vgId:%d %s done", TD_VID(reader->tsdb->pVnode), __func__);
2,879✔
543
  }
544
  return code;
2,879✔
545
}
546

547
// STsdbSnapWriter ========================================
548
struct STsdbSnapWriter {
549
  STsdb*  tsdb;
550
  int64_t sver;
551
  int64_t ever;
552
  int32_t minutes;
553
  int8_t  precision;
554
  int32_t minRow;
555
  int32_t maxRow;
556
  int8_t  cmprAlg;
557
  int64_t commitID;
558
  int32_t szPage;
559
  int64_t compactVersion;
560
  int64_t now;
561
  SBuffer buffers[10];
562

563
  TFileSetArray* fsetArr;
564
  TFileOpArray   fopArr[1];
565

566
  struct {
567
    bool       fsetWriteBegin;
568
    int32_t    fid;
569
    STFileSet* fset;
570
    int32_t    expLevel;
571
    bool       hasData;  // if have time series data
572
    bool       hasTomb;  // if have tomb data
573

574
    // reader
575
    SDataFileReader*    dataReader;
576
    TSttFileReaderArray sttReaderArr[1];
577

578
    // iter/merger
579
    TTsdbIterArray dataIterArr[1];
580
    SIterMerger*   dataIterMerger;
581
    TTsdbIterArray tombIterArr[1];
582
    SIterMerger*   tombIterMerger;
583

584
    // writer
585
    bool         toSttOnly;
586
    SFSetWriter* fsetWriter;
587
  } ctx[1];
588
};
589

590
// APIs
591
static int32_t tsdbSnapWriteTimeSeriesRow(STsdbSnapWriter* writer, SRowInfo* row) {
15,563,672✔
592
  int32_t   code = 0;
15,563,672✔
593
  int32_t   lino = 0;
15,563,672✔
594
  TABLEID   tbid = {0};
15,563,672✔
595
  SMetaInfo info;
15,651,625✔
596

597
  while (writer->ctx->hasData) {
18,781,625✔
598
    SRowInfo* row1;
599
    for (;;) {
600
      row1 = tsdbIterMergerGetData(writer->ctx->dataIterMerger);
15,649,121✔
601
      if (row1 == NULL) {
15,649,121✔
602
        writer->ctx->hasData = false;
1,282✔
603
      } else if (row1->uid != tbid.uid) {
15,647,839✔
604
        tbid.suid = row1->suid;
12,522,847✔
605
        tbid.uid = row1->uid;
12,522,847✔
606
        if (metaGetInfo(writer->tsdb->pVnode->pMeta, tbid.uid, &info, NULL) != 0) {
12,522,847✔
607
          code = tsdbIterMergerSkipTableData(writer->ctx->dataIterMerger, &tbid);
656✔
608
          TSDB_CHECK_CODE(code, lino, _exit);
656✔
609
          continue;
656✔
610
        }
611
      }
612
      break;
15,649,091✔
613
    }
614

615
    if (writer->ctx->hasData == false) {
15,649,091✔
616
      break;
1,282✔
617
    }
618

619
    int32_t c = tRowInfoCmprFn(row1, row);
15,648,122✔
620
    if (c <= 0) {
15,647,183✔
621
      code = tsdbFSetWriteRow(writer->ctx->fsetWriter, row1);
3,127,183✔
622
      TSDB_CHECK_CODE(code, lino, _exit);
3,129,687✔
623

624
      code = tsdbIterMergerNext(writer->ctx->dataIterMerger);
3,129,687✔
625
      TSDB_CHECK_CODE(code, lino, _exit);
3,130,000✔
626
    } else {
627
      break;
12,520,000✔
628
    }
629
  }
630

631
  if (row->suid == INT64_MAX) {
15,652,877✔
632
    goto _exit;
1,282✔
633
  }
634

635
  code = tsdbFSetWriteRow(writer->ctx->fsetWriter, row);
15,650,656✔
636
  TSDB_CHECK_CODE(code, lino, _exit);
15,649,717✔
637

638
_exit:
15,650,999✔
639
  if (code) {
15,650,999✔
640
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
641
  }
642
  return code;
15,650,999✔
643
}
644

645
static int32_t tsdbSnapWriteFileSetOpenReader(STsdbSnapWriter* writer) {
1,282✔
646
  int32_t code = 0;
1,282✔
647
  int32_t lino = 0;
1,282✔
648

649
  writer->ctx->toSttOnly = false;
1,282✔
650
  if (writer->ctx->fset) {
1,282✔
651
#if 0
652
    // open data reader
653
    SDataFileReaderConfig dataFileReaderConfig = {
654
        .tsdb = writer->tsdb,
655
        .buffers = writer->buffers,
656
        .szPage = writer->szPage,
657
    };
658

659
    for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ++ftype) {
660
      if (writer->ctx->fset->farr[ftype] == NULL) {
661
        continue;
662
      }
663

664
      dataFileReaderConfig.files[ftype].exist = true;
665
      dataFileReaderConfig.files[ftype].file = writer->ctx->fset->farr[ftype]->f[0];
666

667
      STFileOp fileOp = {
668
          .optype = TSDB_FOP_REMOVE,
669
          .fid = writer->ctx->fset->fid,
670
          .of = writer->ctx->fset->farr[ftype]->f[0],
671
      };
672

673
      code = TARRAY2_APPEND(writer->fopArr, fileOp);
674
      TSDB_CHECK_CODE(code, lino, _exit);
675
    }
676

677
    code = tsdbDataFileReaderOpen(NULL, &dataFileReaderConfig, &writer->ctx->dataReader);
678
    TSDB_CHECK_CODE(code, lino, _exit);
679
#endif
680

681
    // open stt reader array
682
    SSttLvl* lvl;
683
    TARRAY2_FOREACH(writer->ctx->fset->lvlArr, lvl) {
3,220✔
684
      if (lvl->level != 0) {
1,938✔
685
        if (TARRAY2_SIZE(lvl->fobjArr) > 0) {
656✔
686
          writer->ctx->toSttOnly = true;
656✔
687
        }
688

689
        continue;  // Only merge level 0
656✔
690
      }
691

692
      STFileObj* fobj;
693
      TARRAY2_FOREACH(lvl->fobjArr, fobj) {
2,564✔
694
        SSttFileReader*      reader;
1,282✔
695
        SSttFileReaderConfig sttFileReaderConfig = {
2,564✔
696
            .tsdb = writer->tsdb,
1,282✔
697
            .szPage = writer->szPage,
1,282✔
698
            .buffers = writer->buffers,
1,282✔
699
            .file = fobj->f[0],
700
        };
701

702
        code = tsdbSttFileReaderOpen(fobj->fname, &sttFileReaderConfig, &reader);
1,282✔
703
        TSDB_CHECK_CODE(code, lino, _exit);
1,282✔
704

705
        code = TARRAY2_APPEND(writer->ctx->sttReaderArr, reader);
1,282✔
706
        TSDB_CHECK_CODE(code, lino, _exit);
1,282✔
707

708
        STFileOp fileOp = {
2,564✔
709
            .optype = TSDB_FOP_REMOVE,
710
            .fid = fobj->f->fid,
1,282✔
711
            .of = fobj->f[0],
712
        };
713

714
        code = TARRAY2_APPEND(writer->fopArr, fileOp);
1,282✔
715
        TSDB_CHECK_CODE(code, lino, _exit);
1,282✔
716
      }
717
    }
718
  }
719

720
_exit:
1,282✔
721
  if (code) {
1,282✔
722
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
723
  }
724
  return code;
1,282✔
725
}
726

727
static int32_t tsdbSnapWriteFileSetCloseReader(STsdbSnapWriter* writer) {
1,282✔
728
  TARRAY2_CLEAR(writer->ctx->sttReaderArr, tsdbSttFileReaderClose);
2,564✔
729
  tsdbDataFileReaderClose(&writer->ctx->dataReader);
1,282✔
730
  return 0;
1,282✔
731
}
732

733
static int32_t tsdbSnapWriteFileSetOpenIter(STsdbSnapWriter* writer) {
1,282✔
734
  int32_t code = 0;
1,282✔
735
  int32_t lino = 0;
1,282✔
736

737
  // data ieter
738
  if (writer->ctx->dataReader) {
1,282✔
739
    STsdbIter*      iter;
×
740
    STsdbIterConfig config = {0};
×
741

742
    // data
743
    config.type = TSDB_ITER_TYPE_DATA;
×
744
    config.dataReader = writer->ctx->dataReader;
×
745

746
    code = tsdbIterOpen(&config, &iter);
×
747
    TSDB_CHECK_CODE(code, lino, _exit);
×
748

749
    code = TARRAY2_APPEND(writer->ctx->dataIterArr, iter);
×
750
    TSDB_CHECK_CODE(code, lino, _exit);
×
751

752
    // tome
753
    config.type = TSDB_ITER_TYPE_DATA_TOMB;
×
754
    config.dataReader = writer->ctx->dataReader;
×
755

756
    code = tsdbIterOpen(&config, &iter);
×
757
    TSDB_CHECK_CODE(code, lino, _exit);
×
758

759
    code = TARRAY2_APPEND(writer->ctx->tombIterArr, iter);
×
760
    TSDB_CHECK_CODE(code, lino, _exit);
×
761
  }
762

763
  // stt iter
764
  SSttFileReader* sttFileReader;
765
  TARRAY2_FOREACH(writer->ctx->sttReaderArr, sttFileReader) {
2,564✔
766
    STsdbIter*      iter;
1,282✔
767
    STsdbIterConfig config = {0};
1,282✔
768

769
    // data
770
    config.type = TSDB_ITER_TYPE_STT;
1,282✔
771
    config.sttReader = sttFileReader;
1,282✔
772

773
    code = tsdbIterOpen(&config, &iter);
1,282✔
774
    TSDB_CHECK_CODE(code, lino, _exit);
1,282✔
775

776
    code = TARRAY2_APPEND(writer->ctx->dataIterArr, iter);
1,282✔
777
    TSDB_CHECK_CODE(code, lino, _exit);
1,282✔
778

779
    // tomb
780
    config.type = TSDB_ITER_TYPE_STT_TOMB;
1,282✔
781
    config.sttReader = sttFileReader;
1,282✔
782

783
    code = tsdbIterOpen(&config, &iter);
1,282✔
784
    TSDB_CHECK_CODE(code, lino, _exit);
1,282✔
785

786
    code = TARRAY2_APPEND(writer->ctx->tombIterArr, iter);
1,282✔
787
    TSDB_CHECK_CODE(code, lino, _exit);
1,282✔
788
  }
789

790
  // open merger
791
  code = tsdbIterMergerOpen(writer->ctx->dataIterArr, &writer->ctx->dataIterMerger, false);
1,282✔
792
  TSDB_CHECK_CODE(code, lino, _exit);
1,282✔
793

794
  code = tsdbIterMergerOpen(writer->ctx->tombIterArr, &writer->ctx->tombIterMerger, true);
1,282✔
795
  TSDB_CHECK_CODE(code, lino, _exit);
1,282✔
796

797
_exit:
1,282✔
798
  if (code) {
1,282✔
799
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
800
  }
801
  return code;
1,282✔
802
}
803

804
static int32_t tsdbSnapWriteFileSetCloseIter(STsdbSnapWriter* writer) {
1,282✔
805
  tsdbIterMergerClose(&writer->ctx->dataIterMerger);
1,282✔
806
  tsdbIterMergerClose(&writer->ctx->tombIterMerger);
1,282✔
807
  TARRAY2_CLEAR(writer->ctx->dataIterArr, tsdbIterClose);
2,564✔
808
  TARRAY2_CLEAR(writer->ctx->tombIterArr, tsdbIterClose);
2,564✔
809
  return 0;
1,282✔
810
}
811

812
static int32_t tsdbSnapWriteFileSetOpenWriter(STsdbSnapWriter* writer) {
1,282✔
813
  int32_t code = 0;
1,282✔
814
  int32_t lino = 0;
1,282✔
815

816
  SFSetWriterConfig config = {
1,282✔
817
      .tsdb = writer->tsdb,
1,282✔
818
      .toSttOnly = writer->ctx->toSttOnly,
1,282✔
819
      .compactVersion = writer->compactVersion,
1,282✔
820
      .minRow = writer->minRow,
1,282✔
821
      .maxRow = writer->maxRow,
1,282✔
822
      .szPage = writer->szPage,
1,282✔
823
      .cmprAlg = writer->cmprAlg,
1,282✔
824
      .fid = writer->ctx->fid,
1,282✔
825
      .cid = writer->commitID,
1,282✔
826
      .expLevel = writer->ctx->expLevel,
1,282✔
827
      .level = writer->ctx->toSttOnly ? 1 : 0,
1,282✔
828
  };
829
  // merge stt files to either data or a new stt file
830
  if (writer->ctx->fset) {
1,282✔
831
    for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ++ftype) {
6,410✔
832
      if (writer->ctx->fset->farr[ftype] != NULL) {
5,128✔
833
        config.files[ftype].exist = true;
1,968✔
834
        config.files[ftype].file = writer->ctx->fset->farr[ftype]->f[0];
1,968✔
835
      }
836
    }
837
  }
838

839
  code = tsdbFSetWriterOpen(&config, &writer->ctx->fsetWriter);
1,282✔
840
  TSDB_CHECK_CODE(code, lino, _exit);
1,282✔
841

842
_exit:
1,282✔
843
  if (code) {
1,282✔
844
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
845
  }
846
  return code;
1,282✔
847
}
848

849
static int32_t tsdbSnapWriteFileSetCloseWriter(STsdbSnapWriter* writer) {
1,282✔
850
  return tsdbFSetWriterClose(&writer->ctx->fsetWriter, 0, writer->fopArr);
1,282✔
851
}
852

853
static int32_t tsdbSnapWriteFileSetBegin(STsdbSnapWriter* writer, int32_t fid) {
1,282✔
854
  int32_t code = 0;
1,282✔
855
  int32_t lino = 0;
1,282✔
856

857
  STFileSet* fset = &(STFileSet){.fid = fid};
1,282✔
858

859
  writer->ctx->fid = fid;
1,282✔
860
  STFileSet** fsetPtr = TARRAY2_SEARCH(writer->fsetArr, &fset, tsdbTFileSetCmprFn, TD_EQ);
1,282✔
861
  writer->ctx->fset = (fsetPtr == NULL) ? NULL : *fsetPtr;
1,282✔
862

863
  writer->ctx->expLevel = tsdbFidLevel(fid, &writer->tsdb->keepCfg, taosGetTimestampSec());
1,282✔
864

865
  writer->ctx->hasData = true;
1,282✔
866
  writer->ctx->hasTomb = true;
1,282✔
867

868
  code = tsdbSnapWriteFileSetOpenReader(writer);
1,282✔
869
  TSDB_CHECK_CODE(code, lino, _exit);
1,282✔
870

871
  code = tsdbSnapWriteFileSetOpenIter(writer);
1,282✔
872
  TSDB_CHECK_CODE(code, lino, _exit);
1,282✔
873

874
  code = tsdbSnapWriteFileSetOpenWriter(writer);
1,282✔
875
  TSDB_CHECK_CODE(code, lino, _exit);
1,282✔
876

877
  writer->ctx->fsetWriteBegin = true;
1,282✔
878

879
_exit:
1,282✔
880
  if (code) {
1,282✔
881
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
882
  } else {
883
    tsdbInfo("vgId:%d %s succeeded, fid:%d", TD_VID(writer->tsdb->pVnode), __func__, fid);
1,282✔
884
  }
885
  return code;
1,282✔
886
}
887

888
static int32_t tsdbSnapWriteTombRecord(STsdbSnapWriter* writer, const STombRecord* record) {
1,282✔
889
  int32_t code = 0;
1,282✔
890
  int32_t lino = 0;
1,282✔
891

892
  while (writer->ctx->hasTomb) {
1,282✔
893
    STombRecord* record1 = tsdbIterMergerGetTombRecord(writer->ctx->tombIterMerger);
1,282✔
894
    if (record1 == NULL) {
1,282✔
895
      writer->ctx->hasTomb = false;
1,282✔
896
      break;
1,282✔
897
    }
898

899
    int32_t c = tTombRecordCompare(record1, record);
×
900
    if (c <= 0) {
×
901
      code = tsdbFSetWriteTombRecord(writer->ctx->fsetWriter, record1);
×
902
      TSDB_CHECK_CODE(code, lino, _exit);
×
903
    } else {
904
      break;
×
905
    }
906

907
    code = tsdbIterMergerNext(writer->ctx->tombIterMerger);
×
908
    TSDB_CHECK_CODE(code, lino, _exit);
×
909
  }
910

911
  if (record->suid == INT64_MAX) {
1,282✔
912
    goto _exit;
1,282✔
913
  }
914

915
  code = tsdbFSetWriteTombRecord(writer->ctx->fsetWriter, record);
×
916
  TSDB_CHECK_CODE(code, lino, _exit);
×
917

918
_exit:
×
919
  if (code) {
1,282✔
920
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
921
  }
922
  return code;
1,282✔
923
}
924

925
static int32_t tsdbSnapWriteFileSetEnd(STsdbSnapWriter* writer) {
2,564✔
926
  if (!writer->ctx->fsetWriteBegin) return 0;
2,564✔
927

928
  int32_t code = 0;
1,282✔
929
  int32_t lino = 0;
1,282✔
930

931
  // end timeseries data write
932
  SRowInfo row = {
1,282✔
933
      .suid = INT64_MAX,
934
      .uid = INT64_MAX,
935
  };
936

937
  code = tsdbSnapWriteTimeSeriesRow(writer, &row);
1,282✔
938
  TSDB_CHECK_CODE(code, lino, _exit);
1,282✔
939

940
  // end tombstone data write
941
  STombRecord record = {
1,282✔
942
      .suid = INT64_MAX,
943
      .uid = INT64_MAX,
944
  };
945

946
  code = tsdbSnapWriteTombRecord(writer, &record);
1,282✔
947
  TSDB_CHECK_CODE(code, lino, _exit);
1,282✔
948

949
  // close write
950
  code = tsdbSnapWriteFileSetCloseWriter(writer);
1,282✔
951
  TSDB_CHECK_CODE(code, lino, _exit);
1,282✔
952

953
  code = tsdbSnapWriteFileSetCloseIter(writer);
1,282✔
954
  TSDB_CHECK_CODE(code, lino, _exit);
1,282✔
955

956
  code = tsdbSnapWriteFileSetCloseReader(writer);
1,282✔
957
  TSDB_CHECK_CODE(code, lino, _exit);
1,282✔
958

959
  writer->ctx->fsetWriteBegin = false;
1,282✔
960

961
_exit:
1,282✔
962
  if (code) {
1,282✔
963
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
964
  } else {
965
    tsdbInfo("vgId:%d %s succeeded, fid:%d", TD_VID(writer->tsdb->pVnode), __func__, writer->ctx->fid);
1,282✔
966
  }
967
  return code;
1,282✔
968
}
969

UNCOV
970
static int32_t tsdbSnapWriteFileSetAbort(STsdbSnapWriter* writer) {
×
UNCOV
971
  if (!writer->ctx->fsetWriteBegin) return 0;
×
972

UNCOV
973
  int32_t code = 0;
×
UNCOV
974
  int32_t lino = 0;
×
975

976
  // close write
UNCOV
977
  code = tsdbSnapWriteFileSetCloseWriter(writer);
×
UNCOV
978
  TSDB_CHECK_CODE(code, lino, _exit);
×
979

UNCOV
980
  code = tsdbSnapWriteFileSetCloseIter(writer);
×
UNCOV
981
  TSDB_CHECK_CODE(code, lino, _exit);
×
982

UNCOV
983
  code = tsdbSnapWriteFileSetCloseReader(writer);
×
UNCOV
984
  TSDB_CHECK_CODE(code, lino, _exit);
×
985

UNCOV
986
  writer->ctx->fsetWriteBegin = false;
×
987

UNCOV
988
_exit:
×
UNCOV
989
  if (code) {
×
990
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
991
  }
UNCOV
992
  return code;
×
993
}
994

995
static int32_t tsdbSnapWriteTimeSeriesData(STsdbSnapWriter* writer, SSnapDataHdr* hdr) {
1,595✔
996
  int32_t code = 0;
1,595✔
997
  int32_t lino = 0;
1,595✔
998

999
  SBlockData blockData[1] = {0};
1,595✔
1000

1001
  SBuffer buffer = {
1,595✔
1002
      .capacity = hdr->size,
1,595✔
1003
      .data = hdr->data,
1,595✔
1004
      .size = hdr->size,
1,595✔
1005
  };
1006
  SBufferReader br = BUFFER_READER_INITIALIZER(0, &buffer);
1,595✔
1007

1008
  code = tBlockDataDecompress(&br, blockData, &writer->buffers[0]);
1,595✔
1009
  TSDB_CHECK_CODE(code, lino, _exit);
1,595✔
1010

1011
  int32_t fid = tsdbKeyFid(blockData->aTSKEY[0], writer->minutes, writer->precision);
1,595✔
1012
  if (!writer->ctx->fsetWriteBegin || fid != writer->ctx->fid) {
1,595✔
1013
    code = tsdbSnapWriteFileSetEnd(writer);
1,282✔
1014
    TSDB_CHECK_CODE(code, lino, _exit);
1,282✔
1015

1016
    code = tsdbSnapWriteFileSetBegin(writer, fid);
1,282✔
1017
    TSDB_CHECK_CODE(code, lino, _exit);
1,282✔
1018
  }
1019

1020
  for (int32_t i = 0; i < blockData->nRow; ++i) {
15,651,625✔
1021
    SRowInfo rowInfo = {
15,650,030✔
1022
        .suid = blockData->suid,
15,650,030✔
1023
        .uid = blockData->uid ? blockData->uid : blockData->aUid[i],
15,650,030✔
1024
        .row = tsdbRowFromBlockData(blockData, i),
1025
    };
1026

1027
    code = tsdbSnapWriteTimeSeriesRow(writer, &rowInfo);
15,650,343✔
1028
    TSDB_CHECK_CODE(code, lino, _exit);
15,650,030✔
1029
  }
1030

1031
_exit:
1,595✔
1032
  if (code) {
1,595✔
1033
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
1034
  } else {
1035
    tsdbDebug("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64 " nRow:%d", TD_VID(writer->tsdb->pVnode), __func__,
1,595✔
1036
              blockData->suid, blockData->uid, blockData->nRow);
1037
  }
1038
  tBlockDataDestroy(blockData);
1,595✔
1039
  return code;
1,595✔
1040
}
1041

1042
static int32_t tsdbSnapWriteDecmprTombBlock(SSnapDataHdr* hdr, STombBlock* tombBlock) {
×
1043
  int32_t code = 0;
×
1044
  int32_t lino = 0;
×
1045

1046
  tTombBlockClear(tombBlock);
×
1047

1048
  int64_t size = hdr->size;
×
1049
  size = size / TOMB_RECORD_ELEM_NUM;
×
1050
  tombBlock->numOfRecords = size / sizeof(int64_t);
×
1051

1052
  // int64_t* data = (int64_t*)hdr->data;
1053
  uint8_t* data = hdr->data;
×
1054
  for (int32_t i = 0; i < TOMB_RECORD_ELEM_NUM; ++i) {
×
1055
    code = tBufferPut(tombBlock->buffers + i, data, size);
×
1056
    TSDB_CHECK_CODE(code, lino, _exit);
×
1057
    data += size;
×
1058
  }
1059

1060
_exit:
×
1061
  return code;
×
1062
}
1063

1064
static int32_t tsdbSnapWriteTombData(STsdbSnapWriter* writer, SSnapDataHdr* hdr) {
×
1065
  int32_t code = 0;
×
1066
  int32_t lino = 0;
×
1067

1068
  STombRecord record;
×
1069
  STombBlock  tombBlock[1] = {0};
×
1070

1071
  code = tsdbSnapWriteDecmprTombBlock(hdr, tombBlock);
×
1072
  TSDB_CHECK_CODE(code, lino, _exit);
×
1073

1074
  code = tTombBlockGet(tombBlock, 0, &record);
×
1075
  TSDB_CHECK_CODE(code, lino, _exit);
×
1076
  int32_t fid = tsdbKeyFid(record.skey, writer->minutes, writer->precision);
×
1077
  if (!writer->ctx->fsetWriteBegin || fid != writer->ctx->fid) {
×
1078
    code = tsdbSnapWriteFileSetEnd(writer);
×
1079
    TSDB_CHECK_CODE(code, lino, _exit);
×
1080

1081
    code = tsdbSnapWriteFileSetBegin(writer, fid);
×
1082
    TSDB_CHECK_CODE(code, lino, _exit);
×
1083
  }
1084

1085
  if (writer->ctx->hasData) {
×
1086
    SRowInfo row = {
×
1087
        .suid = INT64_MAX,
1088
        .uid = INT64_MAX,
1089
    };
1090

1091
    code = tsdbSnapWriteTimeSeriesRow(writer, &row);
×
1092
    TSDB_CHECK_CODE(code, lino, _exit);
×
1093
  }
1094

1095
  for (int32_t i = 0; i < TOMB_BLOCK_SIZE(tombBlock); ++i) {
×
1096
    code = tTombBlockGet(tombBlock, i, &record);
×
1097
    TSDB_CHECK_CODE(code, lino, _exit);
×
1098

1099
    code = tsdbSnapWriteTombRecord(writer, &record);
×
1100
    TSDB_CHECK_CODE(code, lino, _exit);
×
1101
  }
1102

1103
  tTombBlockDestroy(tombBlock);
×
1104

1105
_exit:
×
1106
  if (code) {
×
1107
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
1108
  }
1109
  return code;
×
1110
}
1111

1112
int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, void* pRanges, STsdbSnapWriter** writer) {
1,282✔
1113
  int32_t code = 0;
1,282✔
1114
  int32_t lino = 0;
1,282✔
1115

1116
  // start to write
1117
  writer[0] = taosMemoryCalloc(1, sizeof(*writer[0]));
1,282✔
1118
  if (writer[0] == NULL) return terrno;
1,282✔
1119

1120
  writer[0]->tsdb = pTsdb;
1,282✔
1121
  writer[0]->sver = sver;
1,282✔
1122
  writer[0]->ever = ever;
1,282✔
1123
  writer[0]->minutes = pTsdb->keepCfg.days;
1,282✔
1124
  writer[0]->precision = pTsdb->keepCfg.precision;
1,282✔
1125
  writer[0]->minRow = pTsdb->pVnode->config.tsdbCfg.minRows;
1,282✔
1126
  writer[0]->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows;
1,282✔
1127
  writer[0]->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression;
1,282✔
1128
  writer[0]->commitID = tsdbFSAllocEid(pTsdb->pFS);
1,282✔
1129
  writer[0]->szPage = pTsdb->pVnode->config.tsdbPageSize;
1,282✔
1130
  writer[0]->compactVersion = INT64_MAX;
1,282✔
1131
  writer[0]->now = taosGetTimestampMs();
2,564✔
1132

1133
  code =
1134
      tsdbFSCreateCopyRangedSnapshot(pTsdb->pFS, (TFileSetRangeArray*)pRanges, &writer[0]->fsetArr, writer[0]->fopArr);
1,282✔
1135
  TSDB_CHECK_CODE(code, lino, _exit);
1,282✔
1136

1137
_exit:
1,282✔
1138
  if (code) {
1,282✔
1139
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
×
1140
  } else {
1141
    tsdbInfo("vgId:%d %s done, sver:%" PRId64 " ever:%" PRId64, TD_VID(pTsdb->pVnode), __func__, sver, ever);
1,282✔
1142
  }
1143
  return code;
1,282✔
1144
}
1145

1146
int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* writer, bool rollback) {
1,282✔
1147
  int32_t code = 0;
1,282✔
1148
  int32_t lino = 0;
1,282✔
1149

1150
  if (!rollback) {
1,282✔
1151
    code = tsdbSnapWriteFileSetEnd(writer);
1,282✔
1152
    TSDB_CHECK_CODE(code, lino, _exit);
1,282✔
1153

1154
    code = tsdbFSEditBegin(writer->tsdb->pFS, writer->fopArr, TSDB_FEDIT_COMMIT);
1,282✔
1155
    TSDB_CHECK_CODE(code, lino, _exit);
1,282✔
1156
  } else {
UNCOV
1157
    code = tsdbSnapWriteFileSetAbort(writer);
×
UNCOV
1158
    TSDB_CHECK_CODE(code, lino, _exit);
×
1159

UNCOV
1160
    code = tsdbFSEditBegin(writer->tsdb->pFS, writer->fopArr, TSDB_FEDIT_COMMIT);
×
UNCOV
1161
    TSDB_CHECK_CODE(code, lino, _exit);
×
1162
  }
1163

UNCOV
1164
_exit:
×
1165
  if (code) {
1,282✔
1166
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
1167
  } else {
1168
    tsdbDebug("vgId:%d %s done", TD_VID(writer->tsdb->pVnode), __func__);
1,282✔
1169
  }
1170
  return code;
1,282✔
1171
}
1172

1173
int32_t tsdbSnapWriterClose(STsdbSnapWriter** writer, int8_t rollback) {
1,282✔
1174
  if (writer[0] == NULL) return 0;
1,282✔
1175

1176
  int32_t code = 0;
1,282✔
1177
  int32_t lino = 0;
1,282✔
1178

1179
  STsdb* tsdb = writer[0]->tsdb;
1,282✔
1180

1181
  if (rollback) {
1,282✔
UNCOV
1182
    code = tsdbFSEditAbort(writer[0]->tsdb->pFS);
×
UNCOV
1183
    TSDB_CHECK_CODE(code, lino, _exit);
×
1184
  } else {
1185
    (void)taosThreadMutexLock(&writer[0]->tsdb->mutex);
1,282✔
1186

1187
    code = tsdbFSEditCommit(writer[0]->tsdb->pFS);
1,282✔
1188
    if (code) {
1,282✔
1189
      (void)taosThreadMutexUnlock(&writer[0]->tsdb->mutex);
×
1190
      TSDB_CHECK_CODE(code, lino, _exit);
×
1191
    }
1192

1193
    writer[0]->tsdb->pFS->fsstate = TSDB_FS_STATE_NORMAL;
1,282✔
1194

1195
    (void)taosThreadMutexUnlock(&writer[0]->tsdb->mutex);
1,282✔
1196
  }
1197

1198
  tsdbIterMergerClose(&writer[0]->ctx->tombIterMerger);
1,282✔
1199
  tsdbIterMergerClose(&writer[0]->ctx->dataIterMerger);
1,282✔
1200
  TARRAY2_DESTROY(writer[0]->ctx->tombIterArr, tsdbIterClose);
1,282✔
1201
  TARRAY2_DESTROY(writer[0]->ctx->dataIterArr, tsdbIterClose);
1,282✔
1202
  TARRAY2_DESTROY(writer[0]->ctx->sttReaderArr, tsdbSttFileReaderClose);
1,282✔
1203
  tsdbDataFileReaderClose(&writer[0]->ctx->dataReader);
1,282✔
1204

1205
  TARRAY2_DESTROY(writer[0]->fopArr, NULL);
1,282✔
1206
  tsdbFSDestroyCopyRangedSnapshot(&writer[0]->fsetArr);
1,282✔
1207

1208
  for (int32_t i = 0; i < ARRAY_SIZE(writer[0]->buffers); ++i) {
14,102✔
1209
    tBufferDestroy(writer[0]->buffers + i);
12,820✔
1210
  }
1211

1212
  taosMemoryFree(writer[0]);
1,282✔
1213
  writer[0] = NULL;
1,282✔
1214

1215
_exit:
1,282✔
1216
  if (code) {
1,282✔
1217
    TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
×
1218
  } else {
1219
    tsdbInfo("vgId:%d %s done, rollback:%d", TD_VID(tsdb->pVnode), __func__, rollback);
1,282✔
1220
  }
1221
  return code;
1,282✔
1222
}
1223

1224
int32_t tsdbSnapWrite(STsdbSnapWriter* writer, SSnapDataHdr* hdr) {
1,595✔
1225
  int32_t code = 0;
1,595✔
1226
  int32_t lino = 0;
1,595✔
1227

1228
  if (hdr->type == SNAP_DATA_TSDB) {
1,595✔
1229
    code = tsdbSnapWriteTimeSeriesData(writer, hdr);
1,595✔
1230
    TSDB_CHECK_CODE(code, lino, _exit);
1,595✔
1231
  } else if (hdr->type == SNAP_DATA_DEL) {
×
1232
    code = tsdbSnapWriteTombData(writer, hdr);
×
1233
    TSDB_CHECK_CODE(code, lino, _exit);
×
1234
  } else {
1235
    TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_PARA, lino, _exit);
×
1236
  }
1237

1238
_exit:
1,595✔
1239
  if (code) {
1,595✔
1240
    tsdbError("vgId:%d %s failed at line %d since %s, type:%d index:%" PRId64 " size:%" PRId64,
×
1241
              TD_VID(writer->tsdb->pVnode), __func__, lino, tstrerror(code), hdr->type, hdr->index, hdr->size);
1242
  } else {
1243
    tsdbDebug("vgId:%d %s done, type:%d index:%" PRId64 " size:%" PRId64, TD_VID(writer->tsdb->pVnode), __func__,
1,595✔
1244
              hdr->type, hdr->index, hdr->size);
1245
  }
1246
  return code;
1,595✔
1247
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc