• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4850

14 Nov 2025 08:06AM UTC coverage: 63.728% (-0.1%) from 63.829%
#4850

push

travis-ci

guanshengliang
Merge branch '3.0' into cover/3.0

355 of 675 new or added lines in 18 files covered. (52.59%)

634 existing lines in 110 files now uncovered.

149066 of 233910 relevant lines covered (63.73%)

115676883.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.35
/source/dnode/vnode/src/vnd/vnodeSnapshot.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "bse.h"
17
#include "tsdb.h"
18
#include "vnd.h"
19

20
static int32_t vnodeExtractSnapInfoDiff(void *buf, int32_t bufLen, TFileSetRangeArray **ppRanges) {
38,655✔
21
  int32_t            code = 0;
38,655✔
22
  STsdbFSetPartList *pList = tsdbFSetPartListCreate();
38,655✔
23
  if (pList == NULL) {
38,655✔
24
    code = terrno;
×
25
    goto _out;
×
26
  }
27

28
  code = tDeserializeTsdbFSetPartList(buf, bufLen, pList);
38,655✔
29
  if (code) goto _out;
38,655✔
30

31
  code = tsdbFSetPartListToRangeDiff(pList, ppRanges);
38,655✔
32
  if (code) goto _out;
38,655✔
33

34
_out:
38,655✔
35
  tsdbFSetPartListDestroy(&pList);
38,655✔
36
  return code;
38,655✔
37
}
38

39
// SVSnapReader ========================================================
40
struct SVSnapReader {
41
  SVnode *pVnode;
42
  int64_t sver;
43
  int64_t ever;
44
  int64_t index;
45
  // config
46
  int8_t cfgDone;
47
  // meta
48
  int8_t           metaDone;
49
  SMetaSnapReader *pMetaReader;
50
  // tsdb
51
  int8_t              tsdbDone;
52
  TFileSetRangeArray *pRanges;
53
  STsdbSnapReader    *pTsdbReader;
54
  // tsdb raw
55
  int8_t              tsdbRAWDone;
56
  STsdbSnapRAWReader *pTsdbRAWReader;
57

58
  // tq
59
  int8_t         tqHandleDone;
60
  STqSnapReader *pTqSnapReader;
61
  int8_t         tqOffsetDone;
62
  STqSnapReader *pTqOffsetReader;
63
  int8_t         tqCheckInfoDone;
64
  STqSnapReader *pTqCheckInfoReader;
65
  // stream
66
  int8_t              streamTaskDone;
67
  SStreamTaskReader  *pStreamTaskReader;
68
  int8_t              streamStateDone;
69
  SStreamStateReader *pStreamStateReader;
70
  // rsma
71
  int8_t              rsmaDone;
72
  TFileSetRangeArray *pRsmaRanges[TSDB_RETENTION_L2];
73
  SRSmaSnapReader    *pRsmaReader;
74

75
  // bse
76
  int8_t          bseDone;
77
  SBseSnapReader *pBseReader;
78
};
79

80
static TFileSetRangeArray **vnodeSnapReaderGetTsdbRanges(SVSnapReader *pReader, int32_t tsdbTyp) {
77,368✔
81
  if (!(sizeof(pReader->pRsmaRanges) / sizeof(pReader->pRsmaRanges[0]) == 2)) {
82
    terrno = TSDB_CODE_INVALID_PARA;
83
    return NULL;
84
  }
85
  switch (tsdbTyp) {
77,368✔
86
    case SNAP_DATA_TSDB:
38,684✔
87
      return &pReader->pRanges;
38,684✔
88
    case SNAP_DATA_RSMA1:
19,342✔
89
      return &pReader->pRsmaRanges[0];
19,342✔
90
    case SNAP_DATA_RSMA2:
19,342✔
91
      return &pReader->pRsmaRanges[1];
19,342✔
92
    default:
×
93
      return NULL;
×
94
  }
95
}
96

97
static int32_t vnodeSnapReaderDealWithSnapInfo(SVSnapReader *pReader, SSnapshotParam *pParam) {
19,342✔
98
  int32_t code = 0;
19,342✔
99
  SVnode *pVnode = pReader->pVnode;
19,342✔
100

101
  if (pParam->data) {
19,342✔
102
    // decode
103
    SSyncTLV *datHead = (void *)pParam->data;
19,342✔
104
    if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
19,342✔
105
      code = TSDB_CODE_INVALID_DATA_FMT;
×
106
      terrno = code;
×
107
      goto _out;
×
108
    }
109

110
    STsdbRepOpts         tsdbOpts = {0};
19,342✔
111
    TFileSetRangeArray **ppRanges = NULL;
19,342✔
112
    int32_t              offset = 0;
19,342✔
113

114
    while (offset + sizeof(SSyncTLV) < datHead->len) {
58,026✔
115
      SSyncTLV *subField = (void *)(datHead->val + offset);
38,684✔
116
      offset += sizeof(SSyncTLV) + subField->len;
38,684✔
117
      void   *buf = subField->val;
38,684✔
118
      int32_t bufLen = subField->len;
38,684✔
119

120
      switch (subField->typ) {
38,684✔
121
        case SNAP_DATA_TSDB:
19,342✔
122
        case SNAP_DATA_RSMA1:
123
        case SNAP_DATA_RSMA2: {
124
          ppRanges = vnodeSnapReaderGetTsdbRanges(pReader, subField->typ);
19,342✔
125
          if (ppRanges == NULL) {
19,342✔
126
            vError("vgId:%d, unexpected subfield type in snapshot param. subtyp:%d", TD_VID(pVnode), subField->typ);
×
127
            code = TSDB_CODE_INVALID_DATA_FMT;
×
128
            goto _out;
×
129
          }
130
          code = vnodeExtractSnapInfoDiff(buf, bufLen, ppRanges);
19,342✔
131
          if (code) {
19,342✔
132
            vError("vgId:%d, failed to get range diff since %s", TD_VID(pVnode), terrstr());
×
133
            goto _out;
×
134
          }
135
        } break;
19,342✔
136
        case SNAP_DATA_RAW: {
19,342✔
137
          code = tDeserializeTsdbRepOpts(buf, bufLen, &tsdbOpts);
19,342✔
138
          if (code) {
19,342✔
139
            vError("vgId:%d, failed to deserialize tsdb rep opts since %s", TD_VID(pVnode), terrstr());
×
140
            goto _out;
×
141
          }
142
        } break;
19,342✔
143
        default:
×
144
          vError("vgId:%d, unexpected subfield type of snap info. typ:%d", TD_VID(pVnode), subField->typ);
×
145
          code = TSDB_CODE_INVALID_DATA_FMT;
×
146
          goto _out;
×
147
      }
148
    }
149

150
    // toggle snap replication mode
151
    vInfo("vgId:%d, vnode snap reader supported tsdb rep of format:%d", TD_VID(pVnode), tsdbOpts.format);
19,342✔
152
    if (pReader->sver == 0 && tsdbOpts.format == TSDB_SNAP_REP_FMT_RAW) {
19,342✔
153
      pReader->tsdbDone = true;
18,059✔
154
    } else {
155
      pReader->tsdbRAWDone = true;
1,283✔
156
    }
157

158
    vInfo("vgId:%d, vnode snap writer enabled replication mode: %s", TD_VID(pVnode),
19,342✔
159
          (pReader->tsdbDone ? "raw" : "normal"));
160
  }
161

162
_out:
×
163
  return code;
19,342✔
164
}
165

166
int32_t vnodeSnapReaderOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapReader **ppReader) {
19,342✔
167
  int32_t       code = 0;
19,342✔
168
  int64_t       sver = pParam->start;
19,342✔
169
  int64_t       ever = pParam->end;
19,342✔
170
  SVSnapReader *pReader = NULL;
19,342✔
171

172
  pReader = (SVSnapReader *)taosMemoryCalloc(1, sizeof(*pReader));
19,342✔
173
  if (pReader == NULL) {
19,342✔
174
    code = terrno;
×
175
    goto _exit;
×
176
  }
177
  pReader->pVnode = pVnode;
19,342✔
178
  pReader->sver = sver;
19,342✔
179
  pReader->ever = ever;
19,342✔
180

181
  // snapshot info
182
  code = vnodeSnapReaderDealWithSnapInfo(pReader, pParam);
19,342✔
183
  if (code) goto _exit;
19,342✔
184

185
  // open tsdb snapshot raw reader
186
  if (!pReader->tsdbRAWDone) {
19,342✔
187
    code = tsdbSnapRAWReaderOpen(pVnode->pTsdb, ever, SNAP_DATA_RAW, &pReader->pTsdbRAWReader);
18,059✔
188
    if (code) goto _exit;
18,059✔
189
  }
190

191
  // check snapshot ever
192
  SSnapshot snapshot = {0};
19,342✔
193
  code = vnodeGetSnapshot(pVnode, &snapshot);
19,342✔
194
  if (code) goto _exit;
19,342✔
195
  if (ever != snapshot.lastApplyIndex) {
19,342✔
196
    vError("vgId:%d, abort reader open due to vnode snapshot changed. ever:%" PRId64 ", commit ver:%" PRId64,
×
197
           TD_VID(pVnode), ever, snapshot.lastApplyIndex);
198
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
199
    goto _exit;
×
200
  }
201

202
_exit:
19,342✔
203
  if (code) {
19,342✔
204
    vError("vgId:%d, vnode snapshot reader open failed since %s", TD_VID(pVnode), tstrerror(code));
×
205
    *ppReader = NULL;
×
206
  } else {
207
    vInfo("vgId:%d, vnode snapshot reader opened, sver:%" PRId64 " ever:%" PRId64, TD_VID(pVnode), sver, ever);
19,342✔
208
    *ppReader = pReader;
19,342✔
209
  }
210
  return code;
19,342✔
211
}
212

213
static void vnodeSnapReaderDestroyTsdbRanges(SVSnapReader *pReader) {
19,342✔
214
  int32_t tsdbTyps[TSDB_RETENTION_MAX] = {SNAP_DATA_TSDB, SNAP_DATA_RSMA1, SNAP_DATA_RSMA2};
19,342✔
215
  for (int32_t j = 0; j < TSDB_RETENTION_MAX; ++j) {
77,368✔
216
    TFileSetRangeArray **ppRanges = vnodeSnapReaderGetTsdbRanges(pReader, tsdbTyps[j]);
58,026✔
217
    if (ppRanges == NULL) continue;
58,026✔
218
    tsdbTFileSetRangeArrayDestroy(ppRanges);
58,026✔
219
  }
220
}
19,342✔
221

222
void vnodeSnapReaderClose(SVSnapReader *pReader) {
19,342✔
223
  vInfo("vgId:%d, close vnode snapshot reader", TD_VID(pReader->pVnode));
19,342✔
224
  vnodeSnapReaderDestroyTsdbRanges(pReader);
19,342✔
225
#ifdef USE_RSMA_ORIGIN
226
  if (pReader->pRsmaReader) {
227
    rsmaSnapReaderClose(&pReader->pRsmaReader);
228
  }
229
#endif
230

231
  if (pReader->pTsdbReader) {
19,342✔
232
    tsdbSnapReaderClose(&pReader->pTsdbReader);
×
233
  }
234

235
  if (pReader->pTsdbRAWReader) {
19,342✔
236
    tsdbSnapRAWReaderClose(&pReader->pTsdbRAWReader);
×
237
  }
238

239
  if (pReader->pMetaReader) {
19,342✔
240
    metaSnapReaderClose(&pReader->pMetaReader);
×
241
  }
242
#ifdef USE_TQ
243
  if (pReader->pTqSnapReader) {
19,342✔
244
    tqSnapReaderClose(&pReader->pTqSnapReader);
×
245
  }
246

247
  if (pReader->pTqOffsetReader) {
19,342✔
248
    tqSnapReaderClose(&pReader->pTqOffsetReader);
×
249
  }
250

251
  if (pReader->pTqCheckInfoReader) {
19,342✔
252
    tqSnapReaderClose(&pReader->pTqCheckInfoReader);
×
253
  }
254

255
#endif
256

257
  if (pReader->pBseReader) {
19,342✔
258
    bseSnapReaderClose(&pReader->pBseReader);
×
259
  }
260
  taosMemoryFree(pReader);
19,342✔
261
}
19,342✔
262

263
int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) {
271,671✔
264
  int32_t code = 0;
271,671✔
265
  int32_t lino;
266
  SVnode *pVnode = pReader->pVnode;
271,671✔
267
  int32_t vgId = TD_VID(pReader->pVnode);
271,671✔
268

269
  // CONFIG ==============
270
  // FIXME: if commit multiple times and the config changed?
271
  if (!pReader->cfgDone) {
271,671✔
272
    char    fName[TSDB_FILENAME_LEN];
19,342✔
273
    int32_t offset = 0;
19,342✔
274

275
    vnodeGetPrimaryPath(pVnode, false, fName, TSDB_FILENAME_LEN);
19,342✔
276
    offset = strlen(fName);
19,342✔
277
    snprintf(fName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VND_INFO_FNAME);
19,342✔
278

279
    TdFilePtr pFile = taosOpenFile(fName, TD_FILE_READ);
19,342✔
280
    if (NULL == pFile) {
19,342✔
281
      code = terrno;
×
282
      TSDB_CHECK_CODE(code, lino, _exit);
×
283
    }
284

285
    int64_t size;
19,342✔
286
    code = taosFStatFile(pFile, &size, NULL);
19,342✔
287
    if (code != 0) {
19,342✔
288
      if (taosCloseFile(&pFile) != 0) {
×
289
        vError("vgId:%d, failed to close file", vgId);
×
290
      }
291
      TSDB_CHECK_CODE(code, lino, _exit);
×
292
    }
293

294
    *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + size + 1);
19,342✔
295
    if (*ppData == NULL) {
19,342✔
296
      if (taosCloseFile(&pFile) != 0) {
×
297
        vError("vgId:%d, failed to close file", vgId);
×
298
      }
299
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
300
    }
301
    ((SSnapDataHdr *)(*ppData))->type = SNAP_DATA_CFG;
19,342✔
302
    ((SSnapDataHdr *)(*ppData))->size = size + 1;
19,342✔
303
    ((SSnapDataHdr *)(*ppData))->data[size] = '\0';
19,342✔
304

305
    if (taosReadFile(pFile, ((SSnapDataHdr *)(*ppData))->data, size) < 0) {
19,342✔
306
      taosMemoryFree(*ppData);
×
307
      if (taosCloseFile(&pFile) != 0) {
×
308
        vError("vgId:%d, failed to close file", vgId);
×
309
      }
310
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
311
    }
312

313
    if (taosCloseFile(&pFile) != 0) {
19,342✔
314
      vError("vgId:%d, failed to close file", vgId);
×
315
    }
316

317
    pReader->cfgDone = 1;
19,342✔
318
    goto _exit;
19,342✔
319
  }
320

321
  // META ==============
322
  if (!pReader->metaDone) {
252,329✔
323
    // open reader if not
324
    if (pReader->pMetaReader == NULL) {
70,447✔
325
      code = metaSnapReaderOpen(pReader->pVnode->pMeta, pReader->sver, pReader->ever, &pReader->pMetaReader);
19,342✔
326
      TSDB_CHECK_CODE(code, lino, _exit);
19,342✔
327
    }
328

329
    code = metaSnapRead(pReader->pMetaReader, ppData);
70,447✔
330
    TSDB_CHECK_CODE(code, lino, _exit);
70,447✔
331

332
    if (*ppData) {
70,447✔
333
      goto _exit;
51,105✔
334
    } else {
335
      pReader->metaDone = 1;
19,342✔
336
      metaSnapReaderClose(&pReader->pMetaReader);
19,342✔
337
    }
338
  }
339

340
  // TSDB ==============
341
  if (!pReader->tsdbDone) {
201,224✔
342
    // open if not
343
    if (pReader->pTsdbReader == NULL) {
2,879✔
344
      code = tsdbSnapReaderOpen(pReader->pVnode->pTsdb, pReader->sver, pReader->ever, SNAP_DATA_TSDB, pReader->pRanges,
1,283✔
345
                                &pReader->pTsdbReader);
346
      TSDB_CHECK_CODE(code, lino, _exit);
1,283✔
347
    }
348

349
    code = tsdbSnapRead(pReader->pTsdbReader, ppData);
2,879✔
350
    TSDB_CHECK_CODE(code, lino, _exit);
2,879✔
351
    if (*ppData) {
2,879✔
352
      goto _exit;
1,596✔
353
    } else {
354
      pReader->tsdbDone = 1;
1,283✔
355
      tsdbSnapReaderClose(&pReader->pTsdbReader);
1,283✔
356
    }
357
  }
358

359
  if (!pReader->tsdbRAWDone) {
199,628✔
360
    // open if not
361
    if (pReader->pTsdbRAWReader == NULL) {
196,881✔
362
      code = tsdbSnapRAWReaderOpen(pReader->pVnode->pTsdb, pReader->ever, SNAP_DATA_RAW, &pReader->pTsdbRAWReader);
×
363
      TSDB_CHECK_CODE(code, lino, _exit);
×
364
    }
365

366
    code = tsdbSnapRAWRead(pReader->pTsdbRAWReader, ppData);
196,881✔
367
    TSDB_CHECK_CODE(code, lino, _exit);
196,881✔
368
    if (*ppData) {
196,881✔
369
      goto _exit;
178,822✔
370
    } else {
371
      pReader->tsdbRAWDone = 1;
18,059✔
372
      tsdbSnapRAWReaderClose(&pReader->pTsdbRAWReader);
18,059✔
373
    }
374
  }
375

376
  // TQ ================
377
#ifdef USE_TQ
378
  vInfo("vgId:%d tq transform start", vgId);
20,806✔
379
  if (!pReader->tqHandleDone) {
20,806✔
380
    if (pReader->pTqSnapReader == NULL) {
20,048✔
381
      code = tqSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->ever, SNAP_DATA_TQ_HANDLE,
19,342✔
382
                              &pReader->pTqSnapReader);
383
      TSDB_CHECK_CODE(code, lino, _exit);
19,342✔
384
    }
385

386
    code = tqSnapRead(pReader->pTqSnapReader, ppData);
20,048✔
387
    TSDB_CHECK_CODE(code, lino, _exit);
20,048✔
388
    if (*ppData) {
20,048✔
389
      goto _exit;
706✔
390
    } else {
391
      pReader->tqHandleDone = 1;
19,342✔
392
      tqSnapReaderClose(&pReader->pTqSnapReader);
19,342✔
393
    }
394
  }
395
  if (!pReader->tqCheckInfoDone) {
20,100✔
396
    if (pReader->pTqCheckInfoReader == NULL) {
19,432✔
397
      code = tqSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->ever, SNAP_DATA_TQ_CHECKINFO,
19,342✔
398
                              &pReader->pTqCheckInfoReader);
399
      TSDB_CHECK_CODE(code, lino, _exit);
19,342✔
400
    }
401

402
    code = tqSnapRead(pReader->pTqCheckInfoReader, ppData);
19,432✔
403
    TSDB_CHECK_CODE(code, lino, _exit);
19,432✔
404
    if (*ppData) {
19,432✔
405
      goto _exit;
90✔
406
    } else {
407
      pReader->tqCheckInfoDone = 1;
19,342✔
408
      tqSnapReaderClose(&pReader->pTqCheckInfoReader);
19,342✔
409
    }
410
  }
411
  if (!pReader->tqOffsetDone) {
20,010✔
412
    if (pReader->pTqOffsetReader == NULL) {
20,010✔
413
      code = tqSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->ever, SNAP_DATA_TQ_OFFSET,
19,342✔
414
                              &pReader->pTqOffsetReader);
415
      TSDB_CHECK_CODE(code, lino, _exit);
19,342✔
416
    }
417

418
    code = tqSnapRead(pReader->pTqOffsetReader, ppData);
20,010✔
419
    TSDB_CHECK_CODE(code, lino, _exit);
20,010✔
420
    if (*ppData) {
20,010✔
421
      goto _exit;
668✔
422
    } else {
423
      pReader->tqOffsetDone = 1;
19,342✔
424
      tqSnapReaderClose(&pReader->pTqOffsetReader);
19,342✔
425
    }
426
  }
427
#endif
428
  // RSMA ==============
429
#ifdef USE_RSMA_ORIGIN
430
  if (VND_IS_RSMA(pReader->pVnode) && !pReader->rsmaDone) {
431
    // open if not
432
    if (pReader->pRsmaReader == NULL) {
433
      code = rsmaSnapReaderOpen(pReader->pVnode->pSma, pReader->sver, pReader->ever, &pReader->pRsmaReader);
434
      TSDB_CHECK_CODE(code, lino, _exit);
435
    }
436

437
    code = rsmaSnapRead(pReader->pRsmaReader, ppData);
438
    TSDB_CHECK_CODE(code, lino, _exit);
439
    if (*ppData) {
440
      goto _exit;
441
    } else {
442
      pReader->rsmaDone = 1;
443
      rsmaSnapReaderClose(&pReader->pRsmaReader);
444
    }
445
  }
446

447
  if (!pReader->bseDone) {
448
    if (pReader->pBseReader == NULL) {
449
      code = bseSnapReaderOpen(pReader->pVnode->pBse, pReader->sver, pReader->ever, &pReader->pBseReader);
450
      TSDB_CHECK_CODE(code, lino, _exit);
451
    }
452
    int32_t len = 0;
453
    code = bseSnapReaderRead(pReader->pBseReader, ppData);
454
    TSDB_CHECK_CODE(code, lino, _exit);
455
    if (*ppData) {
456
      goto _exit;
457
    } else {
458
      pReader->bseDone = 1;
459
      bseSnapReaderClose(&pReader->pBseReader);
460
    }
461
  }
462

463
#endif
464
  *ppData = NULL;
19,342✔
465
  *nData = 0;
19,342✔
466

467
_exit:
271,671✔
468
  if (code) {
271,671✔
469
    vError("vgId:%d, vnode snapshot read failed at %s:%d since %s", vgId, __FILE__, lino, tstrerror(code));
×
470
  } else {
471
    if (*ppData) {
271,671✔
472
      SSnapDataHdr *pHdr = (SSnapDataHdr *)(*ppData);
252,329✔
473

474
      pReader->index++;
252,329✔
475
      *nData = sizeof(SSnapDataHdr) + pHdr->size;
252,329✔
476
      pHdr->index = pReader->index;
252,329✔
477
      vDebug("vgId:%d, vnode snapshot read data, index:%" PRId64 " type:%d blockLen:%d ", vgId, pReader->index,
252,329✔
478
             pHdr->type, *nData);
479
    } else {
480
      vInfo("vgId:%d, vnode snapshot read data end, index:%" PRId64, vgId, pReader->index);
19,342✔
481
    }
482
  }
483
  return code;
271,671✔
484
}
485

486
// SVSnapWriter ========================================================
487
struct SVSnapWriter {
488
  SVnode *pVnode;
489
  int64_t sver;
490
  int64_t ever;
491
  int64_t commitID;
492
  int64_t index;
493
  // config
494
  SVnodeInfo info;
495
  // meta
496
  SMetaSnapWriter *pMetaSnapWriter;
497
  // tsdb
498
  TFileSetRangeArray *pRanges;
499
  STsdbSnapWriter    *pTsdbSnapWriter;
500
  // tsdb raw
501
  STsdbSnapRAWWriter *pTsdbSnapRAWWriter;
502
  // tq
503
  STqSnapWriter *pTqSnapHandleWriter;
504
  STqSnapWriter *pTqSnapOffsetWriter;
505
  STqSnapWriter *pTqSnapCheckInfoWriter;
506
  // rsma
507
  TFileSetRangeArray *pRsmaRanges[TSDB_RETENTION_L2];
508
  SRSmaSnapWriter    *pRsmaSnapWriter;
509

510
  // bse
511
  SBseSnapWriter *pBseSnapWriter;
512
};
513

514
TFileSetRangeArray **vnodeSnapWriterGetTsdbRanges(SVSnapWriter *pWriter, int32_t tsdbTyp) {
77,252✔
515
  switch (tsdbTyp) {
77,252✔
516
    case SNAP_DATA_TSDB:
38,626✔
517
      return &pWriter->pRanges;
38,626✔
518
    case SNAP_DATA_RSMA1:
19,313✔
519
      return &pWriter->pRsmaRanges[0];
19,313✔
520
    case SNAP_DATA_RSMA2:
19,313✔
521
      return &pWriter->pRsmaRanges[1];
19,313✔
522
    default:
×
523
      return NULL;
×
524
  }
525
}
526

527
static int32_t vnodeSnapWriterDealWithSnapInfo(SVSnapWriter *pWriter, SSnapshotParam *pParam) {
19,313✔
528
  SVnode *pVnode = pWriter->pVnode;
19,313✔
529
  int32_t code = 0;
19,313✔
530
  int32_t lino;
531

532
  if (pParam->data) {
19,313✔
533
    SSyncTLV *datHead = (void *)pParam->data;
19,313✔
534
    if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
19,313✔
535
      TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_DATA_FMT, lino, _exit);
×
536
    }
537

538
    STsdbRepOpts         tsdbOpts = {0};
19,313✔
539
    TFileSetRangeArray **ppRanges = NULL;
19,313✔
540
    int32_t              offset = 0;
19,313✔
541

542
    while (offset + sizeof(SSyncTLV) < datHead->len) {
57,939✔
543
      SSyncTLV *subField = (void *)(datHead->val + offset);
38,626✔
544
      offset += sizeof(SSyncTLV) + subField->len;
38,626✔
545
      void   *buf = subField->val;
38,626✔
546
      int32_t bufLen = subField->len;
38,626✔
547

548
      switch (subField->typ) {
38,626✔
549
        case SNAP_DATA_TSDB:
19,313✔
550
        case SNAP_DATA_RSMA1:
551
        case SNAP_DATA_RSMA2: {
552
          ppRanges = vnodeSnapWriterGetTsdbRanges(pWriter, subField->typ);
19,313✔
553
          if (ppRanges == NULL) {
19,313✔
554
            vError("vgId:%d, unexpected subfield type in snapshot param. subtyp:%d", TD_VID(pVnode), subField->typ);
×
555
            TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
556
          }
557

558
          code = vnodeExtractSnapInfoDiff(buf, bufLen, ppRanges);
19,313✔
559
          TSDB_CHECK_CODE(code, lino, _exit);
19,313✔
560
        } break;
19,313✔
561
        case SNAP_DATA_RAW: {
19,313✔
562
          code = tDeserializeTsdbRepOpts(buf, bufLen, &tsdbOpts);
19,313✔
563
          TSDB_CHECK_CODE(code, lino, _exit);
19,313✔
564
        } break;
19,313✔
565
        default:
×
566
          vError("vgId:%d, unexpected subfield type of snap info. typ:%d", TD_VID(pVnode), subField->typ);
×
567
          TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_DATA_FMT, lino, _exit);
×
568
          goto _exit;
569
      }
570
    }
571

572
    vInfo("vgId:%d, vnode snap writer supported tsdb rep of format:%d", TD_VID(pVnode), tsdbOpts.format);
19,313✔
573
  }
574

575
_exit:
×
576
  if (code) {
19,313✔
577
    vError("vgId:%d %s failed at %s:%d since %s", TD_VID(pVnode), __func__, __FILE__, __LINE__, tstrerror(code));
×
578
  }
579
  return code;
19,313✔
580
}
581

582
extern int32_t tsdbDisableAndCancelAllBgTask(STsdb *pTsdb);
583
extern void    tsdbEnableBgTask(STsdb *pTsdb);
584

585
static int32_t vnodeCancelAndDisableAllBgTask(SVnode *pVnode) {
19,313✔
586
  TAOS_CHECK_RETURN(tsdbDisableAndCancelAllBgTask(pVnode->pTsdb));
19,313✔
587
  TAOS_CHECK_RETURN(vnodeSyncCommit(pVnode));
19,313✔
588
  return 0;
19,313✔
589
}
590

591
static int32_t vnodeEnableBgTask(SVnode *pVnode) {
19,313✔
592
  tsdbEnableBgTask(pVnode->pTsdb);
19,313✔
593
  return 0;
19,313✔
594
}
595

596
int32_t vnodeSnapWriterOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapWriter **ppWriter) {
19,313✔
597
  int32_t       code = 0;
19,313✔
598
  int32_t       lino;
599
  SVSnapWriter *pWriter = NULL;
19,313✔
600
  int64_t       sver = pParam->start;
19,313✔
601
  int64_t       ever = pParam->end;
19,313✔
602

603
  // disable write, cancel and disable all bg tasks
604
  (void)taosThreadMutexLock(&pVnode->mutex);
19,313✔
605
  pVnode->disableWrite = true;
19,313✔
606
  (void)taosThreadMutexUnlock(&pVnode->mutex);
19,313✔
607

608
  code = vnodeCancelAndDisableAllBgTask(pVnode);
19,313✔
609
  TSDB_CHECK_CODE(code, lino, _exit);
19,313✔
610

611
  // alloc
612
  pWriter = (SVSnapWriter *)taosMemoryCalloc(1, sizeof(*pWriter));
19,313✔
613
  if (pWriter == NULL) {
19,313✔
614
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
615
  }
616
  pWriter->pVnode = pVnode;
19,313✔
617
  pWriter->sver = sver;
19,313✔
618
  pWriter->ever = ever;
19,313✔
619

620
  // inc commit ID
621
  pWriter->commitID = ++pVnode->state.commitID;
19,313✔
622

623
  // snapshot info
624
  code = vnodeSnapWriterDealWithSnapInfo(pWriter, pParam);
19,313✔
625
  TSDB_CHECK_CODE(code, lino, _exit);
19,313✔
626

627
_exit:
19,313✔
628
  if (code) {
19,313✔
629
    vError("vgId:%d, vnode snapshot writer open failed since %s", TD_VID(pVnode), tstrerror(code));
×
630
    if (pWriter) taosMemoryFreeClear(pWriter);
×
631
    *ppWriter = NULL;
×
632
  } else {
633
    vInfo("vgId:%d, vnode snapshot writer opened, sver:%" PRId64 " ever:%" PRId64 " commit id:%" PRId64, TD_VID(pVnode),
19,313✔
634
          sver, ever, pWriter->commitID);
635
    *ppWriter = pWriter;
19,313✔
636
  }
637
  return code;
19,313✔
638
}
639

640
static void vnodeSnapWriterDestroyTsdbRanges(SVSnapWriter *pWriter) {
19,313✔
641
  int32_t tsdbTyps[TSDB_RETENTION_MAX] = {SNAP_DATA_TSDB, SNAP_DATA_RSMA1, SNAP_DATA_RSMA2};
19,313✔
642
  for (int32_t j = 0; j < TSDB_RETENTION_MAX; ++j) {
77,252✔
643
    TFileSetRangeArray **ppRanges = vnodeSnapWriterGetTsdbRanges(pWriter, tsdbTyps[j]);
57,939✔
644
    if (ppRanges == NULL) continue;
57,939✔
645
    tsdbTFileSetRangeArrayDestroy(ppRanges);
57,939✔
646
  }
647
}
19,313✔
648

649
int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *pSnapshot) {
19,313✔
650
  int32_t code = 0;
19,313✔
651
  SVnode *pVnode = pWriter->pVnode;
19,313✔
652

653
  vnodeSnapWriterDestroyTsdbRanges(pWriter);
19,313✔
654

655
  // prepare
656
  if (pWriter->pTsdbSnapWriter) {
19,313✔
657
    code = tsdbSnapWriterPrepareClose(pWriter->pTsdbSnapWriter, rollback);
1,283✔
658
    if (code) goto _exit;
1,283✔
659
  }
660

661
  if (pWriter->pTsdbSnapRAWWriter) {
19,313✔
662
    code = tsdbSnapRAWWriterPrepareClose(pWriter->pTsdbSnapRAWWriter);
15,402✔
663
    if (code) goto _exit;
15,402✔
664
  }
665
#ifdef USE_RSMA_ORIGIN
666
  if (pWriter->pRsmaSnapWriter) {
667
    code = rsmaSnapWriterPrepareClose(pWriter->pRsmaSnapWriter, rollback);
668
    if (code) goto _exit;
669
  }
670
#endif
671
  // commit json
672
  if (!rollback) {
19,313✔
673
    pWriter->info.state.committed = pWriter->ever;
19,313✔
674
    pVnode->config = pWriter->info.config;
19,313✔
675
    pVnode->state = (SVState){.committed = pWriter->info.state.committed,
38,626✔
676
                              .applied = pWriter->info.state.committed,
19,313✔
677
                              .commitID = pWriter->commitID,
19,313✔
678
                              .commitTerm = pWriter->info.state.commitTerm,
19,313✔
679
                              .applyTerm = pWriter->info.state.commitTerm};
19,313✔
680
    pVnode->statis = pWriter->info.statis;
19,313✔
681
    char dir[TSDB_FILENAME_LEN] = {0};
19,313✔
682
    vnodeGetPrimaryPath(pVnode, false, dir, TSDB_FILENAME_LEN);
19,313✔
683

684
    code = vnodeCommitInfo(dir);
19,313✔
685
    if (code) goto _exit;
19,313✔
686

687
  } else {
UNCOV
688
    vnodeRollback(pWriter->pVnode);
×
689
  }
690

691
  // commit/rollback sub-system
692
  if (pWriter->pMetaSnapWriter) {
19,313✔
693
    code = metaSnapWriterClose(&pWriter->pMetaSnapWriter, rollback);
15,362✔
694
    if (code) goto _exit;
15,362✔
695
  }
696

697
  if (pWriter->pTsdbSnapWriter) {
19,313✔
698
    code = tsdbSnapWriterClose(&pWriter->pTsdbSnapWriter, rollback);
1,283✔
699
    if (code) goto _exit;
1,283✔
700
  }
701

702
  if (pWriter->pTsdbSnapRAWWriter) {
19,313✔
703
    code = tsdbSnapRAWWriterClose(&pWriter->pTsdbSnapRAWWriter, rollback);
15,402✔
704
    if (code) goto _exit;
15,402✔
705
  }
706
#ifdef USE_TQ
707
  if (pWriter->pTqSnapHandleWriter) {
19,313✔
708
    code = tqSnapWriterClose(&pWriter->pTqSnapHandleWriter, rollback);
706✔
709
    if (code) goto _exit;
706✔
710
  }
711

712
  if (pWriter->pTqSnapCheckInfoWriter) {
19,313✔
713
    code = tqSnapWriterClose(&pWriter->pTqSnapCheckInfoWriter, rollback);
91✔
714
    if (code) goto _exit;
91✔
715
  }
716

717
  if (pWriter->pTqSnapOffsetWriter) {
19,313✔
718
    code = tqSnapWriterClose(&pWriter->pTqSnapOffsetWriter, rollback);
668✔
719
    if (code) goto _exit;
668✔
720
  }
721
#endif
722
#ifdef USE_RSMA_ORIGIN
723
  if (pWriter->pRsmaSnapWriter) {
724
    code = rsmaSnapWriterClose(&pWriter->pRsmaSnapWriter, rollback);
725
    if (code) goto _exit;
726
  }
727

728
  if (pWriter->pBseSnapWriter) {
729
    bseSnapWriterClose(&pWriter->pBseSnapWriter, rollback);
730
  }
731

732
#endif
733
  code = vnodeBegin(pVnode);
19,313✔
734
  if (code) goto _exit;
19,313✔
735

736
  (void)taosThreadMutexLock(&pVnode->mutex);
19,313✔
737
  pVnode->disableWrite = false;
19,313✔
738
  (void)taosThreadMutexUnlock(&pVnode->mutex);
19,313✔
739

740
_exit:
19,313✔
741
  if (code) {
19,313✔
742
    vError("vgId:%d, vnode snapshot writer close failed since %s", TD_VID(pWriter->pVnode), tstrerror(code));
×
743
  } else {
744
    vInfo("vgId:%d, vnode snapshot writer closed, rollback:%d", TD_VID(pVnode), rollback);
19,313✔
745
    taosMemoryFree(pWriter);
19,313✔
746
  }
747
  if (vnodeEnableBgTask(pVnode) != 0) {
19,313✔
748
    tsdbError("vgId:%d, failed to enable bg task", TD_VID(pVnode));
×
749
  }
750
  return code;
19,313✔
751
}
752

753
static int32_t vnodeSnapWriteInfo(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
19,313✔
754
  int32_t       code = 0;
19,313✔
755
  int32_t       lino;
756
  SVnode       *pVnode = pWriter->pVnode;
19,313✔
757
  SSnapDataHdr *pHdr = (SSnapDataHdr *)pData;
19,313✔
758

759
  // decode info
760
  code = vnodeDecodeInfo(pHdr->data, &pWriter->info);
19,313✔
761
  TSDB_CHECK_CODE(code, lino, _exit);
19,313✔
762

763
  // change some value
764
  pWriter->info.state.commitID = pWriter->commitID;
19,313✔
765

766
  // modify info as needed
767
  char dir[TSDB_FILENAME_LEN] = {0};
19,313✔
768
  vnodeGetPrimaryPath(pVnode, false, dir, TSDB_FILENAME_LEN);
19,313✔
769

770
  SVnodeStats vndStats = pWriter->info.config.vndStats;
19,313✔
771
  pWriter->info.config = pVnode->config;
19,313✔
772
  pWriter->info.config.vndStats = vndStats;
19,313✔
773
  vDebug("vgId:%d, save config while write snapshot", pWriter->pVnode->config.vgId);
19,313✔
774
  code = vnodeSaveInfo(dir, &pWriter->info);
19,313✔
775
  TSDB_CHECK_CODE(code, lino, _exit);
19,313✔
776

777
_exit:
19,313✔
778
  return code;
19,313✔
779
}
780

781
int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
250,481✔
782
  int32_t       code = 0;
250,481✔
783
  int32_t       lino;
784
  SSnapDataHdr *pHdr = (SSnapDataHdr *)pData;
250,481✔
785
  SVnode       *pVnode = pWriter->pVnode;
250,481✔
786

787
  if (!(pHdr->size + sizeof(SSnapDataHdr) == nData)) {
250,481✔
788
    return TSDB_CODE_INVALID_PARA;
×
789
  }
790

791
  if (pHdr->index != pWriter->index + 1) {
250,481✔
792
    vError("vgId:%d, unexpected vnode snapshot msg. index:%" PRId64 ", expected index:%" PRId64, TD_VID(pVnode),
×
793
           pHdr->index, pWriter->index + 1);
794
    TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_MSG, lino, _exit);
×
795
  }
796

797
  pWriter->index = pHdr->index;
250,481✔
798

799
  vDebug("vgId:%d, vnode snapshot write data, index:%" PRId64 " type:%d blockLen:%d", TD_VID(pVnode), pHdr->index,
250,481✔
800
         pHdr->type, nData);
801

802
  switch (pHdr->type) {
250,481✔
803
    case SNAP_DATA_CFG: {
19,313✔
804
      code = vnodeSnapWriteInfo(pWriter, pData, nData);
19,313✔
805
      TSDB_CHECK_CODE(code, lino, _exit);
19,313✔
806
    } break;
19,313✔
807
    case SNAP_DATA_META: {
50,988✔
808
      // meta
809
      if (pWriter->pMetaSnapWriter == NULL) {
50,988✔
810
        code = metaSnapWriterOpen(pVnode->pMeta, pWriter->sver, pWriter->ever, &pWriter->pMetaSnapWriter);
15,362✔
811
        TSDB_CHECK_CODE(code, lino, _exit);
15,362✔
812
      }
813

814
      code = metaSnapWrite(pWriter->pMetaSnapWriter, pData, nData);
50,988✔
815
      TSDB_CHECK_CODE(code, lino, _exit);
50,988✔
816
    } break;
50,988✔
817
    case SNAP_DATA_TSDB:
1,596✔
818
    case SNAP_DATA_DEL: {
819
      // tsdb
820
      if (pWriter->pTsdbSnapWriter == NULL) {
1,596✔
821
        code = tsdbSnapWriterOpen(pVnode->pTsdb, pWriter->sver, pWriter->ever, pWriter->pRanges,
1,283✔
822
                                  &pWriter->pTsdbSnapWriter);
823
        TSDB_CHECK_CODE(code, lino, _exit);
1,283✔
824
      }
825

826
      code = tsdbSnapWrite(pWriter->pTsdbSnapWriter, pHdr);
1,596✔
827
      TSDB_CHECK_CODE(code, lino, _exit);
1,596✔
828
    } break;
1,596✔
829
    case SNAP_DATA_RAW: {
177,119✔
830
      // tsdb
831
      if (pWriter->pTsdbSnapRAWWriter == NULL) {
177,119✔
832
        code = tsdbSnapRAWWriterOpen(pVnode->pTsdb, pWriter->ever, &pWriter->pTsdbSnapRAWWriter);
15,402✔
833
        TSDB_CHECK_CODE(code, lino, _exit);
15,402✔
834
      }
835

836
      code = tsdbSnapRAWWrite(pWriter->pTsdbSnapRAWWriter, pHdr);
177,119✔
837
      TSDB_CHECK_CODE(code, lino, _exit);
177,119✔
838
    } break;
177,119✔
839
#ifdef USE_TQ
840
    case SNAP_DATA_TQ_HANDLE: {
706✔
841
      // tq handle
842
      if (pWriter->pTqSnapHandleWriter == NULL) {
706✔
843
        code = tqSnapWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pTqSnapHandleWriter);
706✔
844
        TSDB_CHECK_CODE(code, lino, _exit);
706✔
845
      }
846

847
      code = tqSnapHandleWrite(pWriter->pTqSnapHandleWriter, pData, nData);
706✔
848
      TSDB_CHECK_CODE(code, lino, _exit);
706✔
849
    } break;
706✔
850
    case SNAP_DATA_TQ_CHECKINFO: {
91✔
851
      // tq checkinfo
852
      if (pWriter->pTqSnapCheckInfoWriter == NULL) {
91✔
853
        code = tqSnapWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pTqSnapCheckInfoWriter);
91✔
854
        TSDB_CHECK_CODE(code, lino, _exit);
91✔
855
      }
856

857
      code = tqSnapCheckInfoWrite(pWriter->pTqSnapCheckInfoWriter, pData, nData);
91✔
858
      TSDB_CHECK_CODE(code, lino, _exit);
91✔
859
    } break;
91✔
860
    case SNAP_DATA_TQ_OFFSET: {
668✔
861
      // tq offset
862
      if (pWriter->pTqSnapOffsetWriter == NULL) {
668✔
863
        code = tqSnapWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pTqSnapOffsetWriter);
668✔
864
        TSDB_CHECK_CODE(code, lino, _exit);
668✔
865
      }
866

867
      code = tqSnapOffsetWrite(pWriter->pTqSnapOffsetWriter, pData, nData);
668✔
868
      TSDB_CHECK_CODE(code, lino, _exit);
668✔
869
    } break;
668✔
870
#endif
871
#ifdef USE_RSMA_ORIGIN
872
    case SNAP_DATA_RSMA1:
873
    case SNAP_DATA_RSMA2:
874
    case SNAP_DATA_QTASK: {
875
      // rsma1/rsma2/qtask for rsma
876
      if (pWriter->pRsmaSnapWriter == NULL) {
877
        code = rsmaSnapWriterOpen(pVnode->pSma, pWriter->sver, pWriter->ever, (void **)pWriter->pRsmaRanges,
878
                                  &pWriter->pRsmaSnapWriter);
879
        TSDB_CHECK_CODE(code, lino, _exit);
880
      }
881

882
      code = rsmaSnapWrite(pWriter->pRsmaSnapWriter, pData, nData);
883
      TSDB_CHECK_CODE(code, lino, _exit);
884
    } break;
885
    case SNAP_DATA_BSE: {
886
      if (pWriter->pBseSnapWriter == NULL) {
887
        code = bseSnapWriterOpen(pVnode->pBse, pWriter->sver, pWriter->ever, &pWriter->pBseSnapWriter);
888
        TSDB_CHECK_CODE(code, lino, _exit);
889
      }
890
      code = bseSnapWriterWrite(pWriter->pBseSnapWriter, pData, nData);
891
      TSDB_CHECK_CODE(code, lino, _exit);
892
    } break;
893
#endif
894
    default:
×
895
      break;
×
896
  }
897
_exit:
250,481✔
898
  if (code) {
250,481✔
899
    vError("vgId:%d, vnode snapshot write failed since %s, index:%" PRId64 " type:%d nData:%d", TD_VID(pVnode),
×
900
           tstrerror(code), pHdr->index, pHdr->type, nData);
901
  }
902
  return code;
250,481✔
903
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc