• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4913

06 Jan 2026 01:30AM UTC coverage: 64.884% (-0.004%) from 64.888%
#4913

push

travis-ci

web-flow
merge: from main to 3.0 branch #34167

180 of 319 new or added lines in 14 files covered. (56.43%)

571 existing lines in 128 files now uncovered.

195016 of 300563 relevant lines covered (64.88%)

117540852.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.86
/source/dnode/vnode/src/vnd/vnodeSnapshot.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "bse.h"
17
#include "tencrypt.h"
18
#include "tsdb.h"
19
#include "vnd.h"
20

21
static int32_t vnodeExtractSnapInfoDiff(void *buf, int32_t bufLen, TFileSetRangeArray **ppRanges) {
57,443✔
22
  int32_t            code = 0;
57,443✔
23
  STsdbFSetPartList *pList = tsdbFSetPartListCreate();
57,443✔
24
  if (pList == NULL) {
57,443✔
25
    code = terrno;
×
26
    goto _out;
×
27
  }
28

29
  code = tDeserializeTsdbFSetPartList(buf, bufLen, pList);
57,443✔
30
  if (code) goto _out;
57,443✔
31

32
  code = tsdbFSetPartListToRangeDiff(pList, ppRanges);
57,443✔
33
  if (code) goto _out;
57,443✔
34

35
_out:
57,443✔
36
  tsdbFSetPartListDestroy(&pList);
57,443✔
37
  return code;
57,443✔
38
}
39

40
// SVSnapReader ========================================================
41
struct SVSnapReader {
42
  SVnode *pVnode;
43
  int64_t sver;
44
  int64_t ever;
45
  int64_t index;
46
  // config
47
  int8_t cfgDone;
48
  // meta
49
  int8_t           metaDone;
50
  SMetaSnapReader *pMetaReader;
51
  // tsdb
52
  int8_t              tsdbDone;
53
  TFileSetRangeArray *pRanges;
54
  STsdbSnapReader    *pTsdbReader;
55
  // tsdb raw
56
  int8_t              tsdbRAWDone;
57
  STsdbSnapRAWReader *pTsdbRAWReader;
58

59
  // tq
60
  int8_t         tqHandleDone;
61
  STqSnapReader *pTqSnapReader;
62
  int8_t         tqOffsetDone;
63
  STqSnapReader *pTqOffsetReader;
64
  // rsma
65
  int8_t              rsmaDone;
66
  TFileSetRangeArray *pRsmaRanges[TSDB_RETENTION_L2];
67
  SRSmaSnapReader    *pRsmaReader;
68

69
  // bse
70
  int8_t          bseDone;
71
  SBseSnapReader *pBseReader;
72
};
73

74
static TFileSetRangeArray **vnodeSnapReaderGetTsdbRanges(SVSnapReader *pReader, int32_t tsdbTyp) {
114,884✔
75
  if (!(sizeof(pReader->pRsmaRanges) / sizeof(pReader->pRsmaRanges[0]) == 2)) {
76
    terrno = TSDB_CODE_INVALID_PARA;
77
    return NULL;
78
  }
79
  switch (tsdbTyp) {
114,884✔
80
    case SNAP_DATA_TSDB:
57,442✔
81
      return &pReader->pRanges;
57,442✔
82
    case SNAP_DATA_RSMA1:
28,721✔
83
      return &pReader->pRsmaRanges[0];
28,721✔
84
    case SNAP_DATA_RSMA2:
28,721✔
85
      return &pReader->pRsmaRanges[1];
28,721✔
86
    default:
×
87
      return NULL;
×
88
  }
89
}
90

91
static int32_t vnodeSnapReaderDealWithSnapInfo(SVSnapReader *pReader, SSnapshotParam *pParam) {
28,721✔
92
  int32_t code = 0;
28,721✔
93
  SVnode *pVnode = pReader->pVnode;
28,721✔
94

95
  if (pParam->data) {
28,721✔
96
    // decode
97
    SSyncTLV *datHead = (void *)pParam->data;
28,721✔
98
    if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
28,721✔
99
      code = TSDB_CODE_INVALID_DATA_FMT;
×
100
      terrno = code;
×
101
      goto _out;
×
102
    }
103

104
    STsdbRepOpts         tsdbOpts = {0};
28,721✔
105
    TFileSetRangeArray **ppRanges = NULL;
28,721✔
106
    int32_t              offset = 0;
28,721✔
107

108
    while (offset + sizeof(SSyncTLV) < datHead->len) {
86,163✔
109
      SSyncTLV *subField = (void *)(datHead->val + offset);
57,442✔
110
      offset += sizeof(SSyncTLV) + subField->len;
57,442✔
111
      void   *buf = subField->val;
57,442✔
112
      int32_t bufLen = subField->len;
57,442✔
113

114
      switch (subField->typ) {
57,442✔
115
        case SNAP_DATA_TSDB:
28,721✔
116
        case SNAP_DATA_RSMA1:
117
        case SNAP_DATA_RSMA2: {
118
          ppRanges = vnodeSnapReaderGetTsdbRanges(pReader, subField->typ);
28,721✔
119
          if (ppRanges == NULL) {
28,721✔
120
            vError("vgId:%d, unexpected subfield type in snapshot param. subtyp:%d", TD_VID(pVnode), subField->typ);
×
121
            code = TSDB_CODE_INVALID_DATA_FMT;
×
122
            goto _out;
×
123
          }
124
          code = vnodeExtractSnapInfoDiff(buf, bufLen, ppRanges);
28,721✔
125
          if (code) {
28,721✔
126
            vError("vgId:%d, failed to get range diff since %s", TD_VID(pVnode), terrstr());
×
127
            goto _out;
×
128
          }
129
        } break;
28,721✔
130
        case SNAP_DATA_RAW: {
28,721✔
131
          code = tDeserializeTsdbRepOpts(buf, bufLen, &tsdbOpts);
28,721✔
132
          if (code) {
28,721✔
133
            vError("vgId:%d, failed to deserialize tsdb rep opts since %s", TD_VID(pVnode), terrstr());
×
134
            goto _out;
×
135
          }
136
        } break;
28,721✔
137
        default:
×
138
          vError("vgId:%d, unexpected subfield type of snap info. typ:%d", TD_VID(pVnode), subField->typ);
×
139
          code = TSDB_CODE_INVALID_DATA_FMT;
×
140
          goto _out;
×
141
      }
142
    }
143

144
    // toggle snap replication mode
145
    vInfo("vgId:%d, vnode snap reader supported tsdb rep of format:%d", TD_VID(pVnode), tsdbOpts.format);
28,721✔
146
    if (pReader->sver == 0 && tsdbOpts.format == TSDB_SNAP_REP_FMT_RAW) {
28,721✔
147
      pReader->tsdbDone = true;
28,360✔
148
    } else {
149
      pReader->tsdbRAWDone = true;
361✔
150
    }
151

152
    vInfo("vgId:%d, vnode snap writer enabled replication mode: %s", TD_VID(pVnode),
28,721✔
153
          (pReader->tsdbDone ? "raw" : "normal"));
154
  }
155

156
_out:
×
157
  return code;
28,721✔
158
}
159

160
int32_t vnodeSnapReaderOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapReader **ppReader) {
28,721✔
161
  int32_t       code = 0;
28,721✔
162
  int64_t       sver = pParam->start;
28,721✔
163
  int64_t       ever = pParam->end;
28,721✔
164
  SVSnapReader *pReader = NULL;
28,721✔
165

166
  pReader = (SVSnapReader *)taosMemoryCalloc(1, sizeof(*pReader));
28,721✔
167
  if (pReader == NULL) {
28,721✔
168
    code = terrno;
×
169
    goto _exit;
×
170
  }
171
  pReader->pVnode = pVnode;
28,721✔
172
  pReader->sver = sver;
28,721✔
173
  pReader->ever = ever;
28,721✔
174

175
  // snapshot info
176
  code = vnodeSnapReaderDealWithSnapInfo(pReader, pParam);
28,721✔
177
  if (code) goto _exit;
28,721✔
178

179
  // open tsdb snapshot raw reader
180
  if (!pReader->tsdbRAWDone) {
28,721✔
181
    code = tsdbSnapRAWReaderOpen(pVnode->pTsdb, ever, SNAP_DATA_RAW, &pReader->pTsdbRAWReader);
28,360✔
182
    if (code) goto _exit;
28,360✔
183
  }
184

185
  // check snapshot ever
186
  SSnapshot snapshot = {0};
28,721✔
187
  code = vnodeGetSnapshot(pVnode, &snapshot);
28,721✔
188
  if (code) goto _exit;
28,721✔
189
  if (ever != snapshot.lastApplyIndex) {
28,721✔
UNCOV
190
    vError("vgId:%d, abort reader open due to vnode snapshot changed. ever:%" PRId64 ", commit ver:%" PRId64,
×
191
           TD_VID(pVnode), ever, snapshot.lastApplyIndex);
UNCOV
192
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
UNCOV
193
    goto _exit;
×
194
  }
195

196
_exit:
28,721✔
197
  if (code) {
28,721✔
UNCOV
198
    vError("vgId:%d, vnode snapshot reader open failed since %s", TD_VID(pVnode), tstrerror(code));
×
UNCOV
199
    *ppReader = NULL;
×
200
  } else {
201
    vInfo("vgId:%d, vnode snapshot reader opened, sver:%" PRId64 " ever:%" PRId64, TD_VID(pVnode), sver, ever);
28,721✔
202
    *ppReader = pReader;
28,721✔
203
  }
204
  return code;
28,721✔
205
}
206

207
static void vnodeSnapReaderDestroyTsdbRanges(SVSnapReader *pReader) {
28,721✔
208
  int32_t tsdbTyps[TSDB_RETENTION_MAX] = {SNAP_DATA_TSDB, SNAP_DATA_RSMA1, SNAP_DATA_RSMA2};
28,721✔
209
  for (int32_t j = 0; j < TSDB_RETENTION_MAX; ++j) {
114,884✔
210
    TFileSetRangeArray **ppRanges = vnodeSnapReaderGetTsdbRanges(pReader, tsdbTyps[j]);
86,163✔
211
    if (ppRanges == NULL) continue;
86,163✔
212
    tsdbTFileSetRangeArrayDestroy(ppRanges);
86,163✔
213
  }
214
}
28,721✔
215

216
void vnodeSnapReaderClose(SVSnapReader *pReader) {
28,721✔
217
  vInfo("vgId:%d, close vnode snapshot reader", TD_VID(pReader->pVnode));
28,721✔
218
  vnodeSnapReaderDestroyTsdbRanges(pReader);
28,721✔
219
#ifdef USE_RSMA_ORIGIN
220
  if (pReader->pRsmaReader) {
221
    rsmaSnapReaderClose(&pReader->pRsmaReader);
222
  }
223
#endif
224

225
  if (pReader->pTsdbReader) {
28,721✔
226
    tsdbSnapReaderClose(&pReader->pTsdbReader);
×
227
  }
228

229
  if (pReader->pTsdbRAWReader) {
28,721✔
230
    tsdbSnapRAWReaderClose(&pReader->pTsdbRAWReader);
×
231
  }
232

233
  if (pReader->pMetaReader) {
28,721✔
234
    metaSnapReaderClose(&pReader->pMetaReader);
×
235
  }
236
#ifdef USE_TQ
237
  if (pReader->pTqSnapReader) {
28,721✔
238
    tqSnapReaderClose(&pReader->pTqSnapReader);
×
239
  }
240

241
  if (pReader->pTqOffsetReader) {
28,721✔
242
    tqSnapReaderClose(&pReader->pTqOffsetReader);
×
243
  }
244

245
#endif
246

247
  if (pReader->pBseReader) {
28,721✔
248
    bseSnapReaderClose(&pReader->pBseReader);
×
249
  }
250
  taosMemoryFree(pReader);
28,721✔
251
}
28,721✔
252

253
int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) {
369,083✔
254
  int32_t code = 0;
369,083✔
255
  int32_t lino;
256
  SVnode *pVnode = pReader->pVnode;
369,083✔
257
  int32_t vgId = TD_VID(pReader->pVnode);
369,083✔
258

259
  // CONFIG ==============
260
  // FIXME: if commit multiple times and the config changed?
261
  if (!pReader->cfgDone) {
369,083✔
262
    char    fName[TSDB_FILENAME_LEN];
28,721✔
263
    int32_t offset = 0;
28,721✔
264

265
    vnodeGetPrimaryPath(pVnode, false, fName, TSDB_FILENAME_LEN);
28,721✔
266
    offset = strlen(fName);
28,721✔
267
    snprintf(fName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VND_INFO_FNAME);
28,721✔
268

269
    TdFilePtr pFile = taosOpenFile(fName, TD_FILE_READ);
28,721✔
270
    if (NULL == pFile) {
28,721✔
271
      code = terrno;
×
272
      TSDB_CHECK_CODE(code, lino, _exit);
×
273
    }
274

275
    int64_t size;
28,721✔
276
    code = taosFStatFile(pFile, &size, NULL);
28,721✔
277
    if (code != 0) {
28,721✔
278
      if (taosCloseFile(&pFile) != 0) {
×
279
        vError("vgId:%d, failed to close file", vgId);
×
280
      }
281
      TSDB_CHECK_CODE(code, lino, _exit);
×
282
    }
283

284
    *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + size + 1);
28,721✔
285
    if (*ppData == NULL) {
28,721✔
286
      if (taosCloseFile(&pFile) != 0) {
×
287
        vError("vgId:%d, failed to close file", vgId);
×
288
      }
289
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
290
    }
291
    ((SSnapDataHdr *)(*ppData))->type = SNAP_DATA_CFG;
28,721✔
292
    ((SSnapDataHdr *)(*ppData))->size = size + 1;
28,721✔
293
    ((SSnapDataHdr *)(*ppData))->data[size] = '\0';
28,721✔
294

295
    if (taosReadFile(pFile, ((SSnapDataHdr *)(*ppData))->data, size) < 0) {
28,721✔
296
      taosMemoryFree(*ppData);
×
297
      if (taosCloseFile(&pFile) != 0) {
×
298
        vError("vgId:%d, failed to close file", vgId);
×
299
      }
300
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
301
    }
302

303
    if (taosCloseFile(&pFile) != 0) {
28,721✔
304
      vError("vgId:%d, failed to close file", vgId);
×
305
    }
306

307
    pReader->cfgDone = 1;
28,721✔
308
    goto _exit;
28,721✔
309
  }
310

311
  // META ==============
312
  if (!pReader->metaDone) {
340,362✔
313
    // open reader if not
314
    if (pReader->pMetaReader == NULL) {
202,973✔
315
      code = metaSnapReaderOpen(pReader->pVnode->pMeta, pReader->sver, pReader->ever, &pReader->pMetaReader);
28,721✔
316
      TSDB_CHECK_CODE(code, lino, _exit);
28,721✔
317
    }
318

319
    code = metaSnapRead(pReader->pMetaReader, ppData);
202,973✔
320
    TSDB_CHECK_CODE(code, lino, _exit);
202,973✔
321

322
    if (*ppData) {
202,973✔
323
      goto _exit;
174,252✔
324
    } else {
325
      pReader->metaDone = 1;
28,721✔
326
      metaSnapReaderClose(&pReader->pMetaReader);
28,721✔
327
    }
328
  }
329

330
  // TSDB ==============
331
  if (!pReader->tsdbDone) {
166,110✔
332
    // open if not
333
    if (pReader->pTsdbReader == NULL) {
2,973✔
334
      code = tsdbSnapReaderOpen(pReader->pVnode->pTsdb, pReader->sver, pReader->ever, SNAP_DATA_TSDB, pReader->pRanges,
361✔
335
                                &pReader->pTsdbReader);
336
      TSDB_CHECK_CODE(code, lino, _exit);
361✔
337
    }
338

339
    code = tsdbSnapRead(pReader->pTsdbReader, ppData);
2,973✔
340
    TSDB_CHECK_CODE(code, lino, _exit);
2,973✔
341
    if (*ppData) {
2,973✔
342
      goto _exit;
2,612✔
343
    } else {
344
      pReader->tsdbDone = 1;
361✔
345
      tsdbSnapReaderClose(&pReader->pTsdbReader);
361✔
346
    }
347
  }
348

349
  if (!pReader->tsdbRAWDone) {
163,498✔
350
    // open if not
351
    if (pReader->pTsdbRAWReader == NULL) {
139,913✔
352
      code = tsdbSnapRAWReaderOpen(pReader->pVnode->pTsdb, pReader->ever, SNAP_DATA_RAW, &pReader->pTsdbRAWReader);
×
353
      TSDB_CHECK_CODE(code, lino, _exit);
×
354
    }
355

356
    code = tsdbSnapRAWRead(pReader->pTsdbRAWReader, ppData);
139,913✔
357
    TSDB_CHECK_CODE(code, lino, _exit);
139,913✔
358
    if (*ppData) {
139,913✔
359
      goto _exit;
111,553✔
360
    } else {
361
      pReader->tsdbRAWDone = 1;
28,360✔
362
      tsdbSnapRAWReaderClose(&pReader->pTsdbRAWReader);
28,360✔
363
    }
364
  }
365

366
  // TQ ================
367
#ifdef USE_TQ
368
  vInfo("vgId:%d tq transform start", vgId);
51,945✔
369
  if (!pReader->tqHandleDone) {
51,945✔
370
    if (pReader->pTqSnapReader == NULL) {
40,599✔
371
      code = tqSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->ever, SNAP_DATA_TQ_HANDLE,
28,721✔
372
                              &pReader->pTqSnapReader);
373
      TSDB_CHECK_CODE(code, lino, _exit);
28,721✔
374
    }
375

376
    code = tqSnapRead(pReader->pTqSnapReader, ppData);
40,599✔
377
    TSDB_CHECK_CODE(code, lino, _exit);
40,599✔
378
    if (*ppData) {
40,599✔
379
      goto _exit;
11,878✔
380
    } else {
381
      pReader->tqHandleDone = 1;
28,721✔
382
      tqSnapReaderClose(&pReader->pTqSnapReader);
28,721✔
383
    }
384
  }
385
  if (!pReader->tqOffsetDone) {
40,067✔
386
    if (pReader->pTqOffsetReader == NULL) {
40,067✔
387
      code = tqSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->ever, SNAP_DATA_TQ_OFFSET,
28,721✔
388
                              &pReader->pTqOffsetReader);
389
      TSDB_CHECK_CODE(code, lino, _exit);
28,721✔
390
    }
391

392
    code = tqSnapRead(pReader->pTqOffsetReader, ppData);
40,067✔
393
    TSDB_CHECK_CODE(code, lino, _exit);
40,067✔
394
    if (*ppData) {
40,067✔
395
      goto _exit;
11,346✔
396
    } else {
397
      pReader->tqOffsetDone = 1;
28,721✔
398
      tqSnapReaderClose(&pReader->pTqOffsetReader);
28,721✔
399
    }
400
  }
401
#endif
402
  // RSMA ==============
403
#ifdef USE_RSMA_ORIGIN
404
  if (VND_IS_RSMA(pReader->pVnode) && !pReader->rsmaDone) {
405
    // open if not
406
    if (pReader->pRsmaReader == NULL) {
407
      code = rsmaSnapReaderOpen(pReader->pVnode->pSma, pReader->sver, pReader->ever, &pReader->pRsmaReader);
408
      TSDB_CHECK_CODE(code, lino, _exit);
409
    }
410

411
    code = rsmaSnapRead(pReader->pRsmaReader, ppData);
412
    TSDB_CHECK_CODE(code, lino, _exit);
413
    if (*ppData) {
414
      goto _exit;
415
    } else {
416
      pReader->rsmaDone = 1;
417
      rsmaSnapReaderClose(&pReader->pRsmaReader);
418
    }
419
  }
420

421
  if (!pReader->bseDone) {
422
    if (pReader->pBseReader == NULL) {
423
      code = bseSnapReaderOpen(pReader->pVnode->pBse, pReader->sver, pReader->ever, &pReader->pBseReader);
424
      TSDB_CHECK_CODE(code, lino, _exit);
425
    }
426
    int32_t len = 0;
427
    code = bseSnapReaderRead(pReader->pBseReader, ppData);
428
    TSDB_CHECK_CODE(code, lino, _exit);
429
    if (*ppData) {
430
      goto _exit;
431
    } else {
432
      pReader->bseDone = 1;
433
      bseSnapReaderClose(&pReader->pBseReader);
434
    }
435
  }
436

437
#endif
438
  *ppData = NULL;
28,721✔
439
  *nData = 0;
28,721✔
440

441
_exit:
369,083✔
442
  if (code) {
369,083✔
443
    vError("vgId:%d, vnode snapshot read failed at %s:%d since %s", vgId, __FILE__, lino, tstrerror(code));
×
444
  } else {
445
    if (*ppData) {
369,083✔
446
      SSnapDataHdr *pHdr = (SSnapDataHdr *)(*ppData);
340,362✔
447

448
      pReader->index++;
340,362✔
449
      *nData = sizeof(SSnapDataHdr) + pHdr->size;
340,362✔
450
      pHdr->index = pReader->index;
340,362✔
451
      vDebug("vgId:%d, vnode snapshot read data, index:%" PRId64 " type:%d blockLen:%d ", vgId, pReader->index,
340,362✔
452
             pHdr->type, *nData);
453
    } else {
454
      vInfo("vgId:%d, vnode snapshot read data end, index:%" PRId64, vgId, pReader->index);
28,721✔
455
    }
456
  }
457
  return code;
369,083✔
458
}
459

460
// SVSnapWriter ========================================================
461
struct SVSnapWriter {
462
  SVnode *pVnode;
463
  int64_t sver;
464
  int64_t ever;
465
  int64_t commitID;
466
  int64_t index;
467
  // config
468
  SVnodeInfo info;
469
  // meta
470
  SMetaSnapWriter *pMetaSnapWriter;
471
  // tsdb
472
  TFileSetRangeArray *pRanges;
473
  STsdbSnapWriter    *pTsdbSnapWriter;
474
  // tsdb raw
475
  STsdbSnapRAWWriter *pTsdbSnapRAWWriter;
476
  // tq
477
  STqSnapWriter *pTqSnapHandleWriter;
478
  STqSnapWriter *pTqSnapOffsetWriter;
479
  // rsma
480
  TFileSetRangeArray *pRsmaRanges[TSDB_RETENTION_L2];
481
  SRSmaSnapWriter    *pRsmaSnapWriter;
482

483
  // bse
484
  SBseSnapWriter *pBseSnapWriter;
485
};
486

487
TFileSetRangeArray **vnodeSnapWriterGetTsdbRanges(SVSnapWriter *pWriter, int32_t tsdbTyp) {
114,888✔
488
  switch (tsdbTyp) {
114,888✔
489
    case SNAP_DATA_TSDB:
57,444✔
490
      return &pWriter->pRanges;
57,444✔
491
    case SNAP_DATA_RSMA1:
28,722✔
492
      return &pWriter->pRsmaRanges[0];
28,722✔
493
    case SNAP_DATA_RSMA2:
28,722✔
494
      return &pWriter->pRsmaRanges[1];
28,722✔
495
    default:
×
496
      return NULL;
×
497
  }
498
}
499

500
static int32_t vnodeSnapWriterDealWithSnapInfo(SVSnapWriter *pWriter, SSnapshotParam *pParam) {
28,722✔
501
  SVnode *pVnode = pWriter->pVnode;
28,722✔
502
  int32_t code = 0;
28,722✔
503
  int32_t lino;
504

505
  if (pParam->data) {
28,722✔
506
    SSyncTLV *datHead = (void *)pParam->data;
28,722✔
507
    if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
28,722✔
508
      TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_DATA_FMT, lino, _exit);
×
509
    }
510

511
    STsdbRepOpts         tsdbOpts = {0};
28,722✔
512
    TFileSetRangeArray **ppRanges = NULL;
28,722✔
513
    int32_t              offset = 0;
28,722✔
514

515
    while (offset + sizeof(SSyncTLV) < datHead->len) {
86,166✔
516
      SSyncTLV *subField = (void *)(datHead->val + offset);
57,444✔
517
      offset += sizeof(SSyncTLV) + subField->len;
57,444✔
518
      void   *buf = subField->val;
57,444✔
519
      int32_t bufLen = subField->len;
57,444✔
520

521
      switch (subField->typ) {
57,444✔
522
        case SNAP_DATA_TSDB:
28,722✔
523
        case SNAP_DATA_RSMA1:
524
        case SNAP_DATA_RSMA2: {
525
          ppRanges = vnodeSnapWriterGetTsdbRanges(pWriter, subField->typ);
28,722✔
526
          if (ppRanges == NULL) {
28,722✔
527
            vError("vgId:%d, unexpected subfield type in snapshot param. subtyp:%d", TD_VID(pVnode), subField->typ);
×
528
            TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
529
          }
530

531
          code = vnodeExtractSnapInfoDiff(buf, bufLen, ppRanges);
28,722✔
532
          TSDB_CHECK_CODE(code, lino, _exit);
28,722✔
533
        } break;
28,722✔
534
        case SNAP_DATA_RAW: {
28,722✔
535
          code = tDeserializeTsdbRepOpts(buf, bufLen, &tsdbOpts);
28,722✔
536
          TSDB_CHECK_CODE(code, lino, _exit);
28,722✔
537
        } break;
28,722✔
538
        default:
×
539
          vError("vgId:%d, unexpected subfield type of snap info. typ:%d", TD_VID(pVnode), subField->typ);
×
540
          TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_DATA_FMT, lino, _exit);
×
541
          goto _exit;
542
      }
543
    }
544

545
    vInfo("vgId:%d, vnode snap writer supported tsdb rep of format:%d", TD_VID(pVnode), tsdbOpts.format);
28,722✔
546
  }
547

548
_exit:
×
549
  if (code) {
28,722✔
550
    vError("vgId:%d %s failed at %s:%d since %s", TD_VID(pVnode), __func__, __FILE__, __LINE__, tstrerror(code));
×
551
  }
552
  return code;
28,722✔
553
}
554

555
extern int32_t tsdbDisableAndCancelAllBgTask(STsdb *pTsdb);
556
extern void    tsdbEnableBgTask(STsdb *pTsdb);
557

558
static int32_t vnodeCancelAndDisableAllBgTask(SVnode *pVnode) {
28,722✔
559
  TAOS_CHECK_RETURN(tsdbDisableAndCancelAllBgTask(pVnode->pTsdb));
28,722✔
560
  TAOS_CHECK_RETURN(vnodeSyncCommit(pVnode));
28,722✔
561
  return 0;
28,722✔
562
}
563

564
static int32_t vnodeEnableBgTask(SVnode *pVnode) {
28,722✔
565
  tsdbEnableBgTask(pVnode->pTsdb);
28,722✔
566
  return 0;
28,722✔
567
}
568

569
int32_t vnodeSnapWriterOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapWriter **ppWriter) {
28,722✔
570
  int32_t       code = 0;
28,722✔
571
  int32_t       lino;
572
  SVSnapWriter *pWriter = NULL;
28,722✔
573
  int64_t       sver = pParam->start;
28,722✔
574
  int64_t       ever = pParam->end;
28,722✔
575

576
  // disable write, cancel and disable all bg tasks
577
  (void)taosThreadMutexLock(&pVnode->mutex);
28,722✔
578
  pVnode->disableWrite = true;
28,722✔
579
  (void)taosThreadMutexUnlock(&pVnode->mutex);
28,722✔
580

581
  code = vnodeCancelAndDisableAllBgTask(pVnode);
28,722✔
582
  TSDB_CHECK_CODE(code, lino, _exit);
28,722✔
583

584
  // alloc
585
  pWriter = (SVSnapWriter *)taosMemoryCalloc(1, sizeof(*pWriter));
28,722✔
586
  if (pWriter == NULL) {
28,722✔
587
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
588
  }
589
  pWriter->pVnode = pVnode;
28,722✔
590
  pWriter->sver = sver;
28,722✔
591
  pWriter->ever = ever;
28,722✔
592

593
  // inc commit ID
594
  pWriter->commitID = ++pVnode->state.commitID;
28,722✔
595

596
  // snapshot info
597
  code = vnodeSnapWriterDealWithSnapInfo(pWriter, pParam);
28,722✔
598
  TSDB_CHECK_CODE(code, lino, _exit);
28,722✔
599

600
_exit:
28,722✔
601
  if (code) {
28,722✔
602
    vError("vgId:%d, vnode snapshot writer open failed since %s", TD_VID(pVnode), tstrerror(code));
×
603
    if (pWriter) taosMemoryFreeClear(pWriter);
×
604
    *ppWriter = NULL;
×
605
  } else {
606
    vInfo("vgId:%d, vnode snapshot writer opened, sver:%" PRId64 " ever:%" PRId64 " commit id:%" PRId64, TD_VID(pVnode),
28,722✔
607
          sver, ever, pWriter->commitID);
608
    *ppWriter = pWriter;
28,722✔
609
  }
610
  return code;
28,722✔
611
}
612

613
static void vnodeSnapWriterDestroyTsdbRanges(SVSnapWriter *pWriter) {
28,722✔
614
  int32_t tsdbTyps[TSDB_RETENTION_MAX] = {SNAP_DATA_TSDB, SNAP_DATA_RSMA1, SNAP_DATA_RSMA2};
28,722✔
615
  for (int32_t j = 0; j < TSDB_RETENTION_MAX; ++j) {
114,888✔
616
    TFileSetRangeArray **ppRanges = vnodeSnapWriterGetTsdbRanges(pWriter, tsdbTyps[j]);
86,166✔
617
    if (ppRanges == NULL) continue;
86,166✔
618
    tsdbTFileSetRangeArrayDestroy(ppRanges);
86,166✔
619
  }
620
}
28,722✔
621

622
int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *pSnapshot) {
28,722✔
623
  int32_t code = 0;
28,722✔
624
  SVnode *pVnode = pWriter->pVnode;
28,722✔
625

626
  vnodeSnapWriterDestroyTsdbRanges(pWriter);
28,722✔
627

628
  // prepare
629
  if (pWriter->pTsdbSnapWriter) {
28,722✔
630
    code = tsdbSnapWriterPrepareClose(pWriter->pTsdbSnapWriter, rollback);
361✔
631
    if (code) goto _exit;
361✔
632
  }
633

634
  if (pWriter->pTsdbSnapRAWWriter) {
28,722✔
635
    code = tsdbSnapRAWWriterPrepareClose(pWriter->pTsdbSnapRAWWriter);
25,607✔
636
    if (code) goto _exit;
25,607✔
637
  }
638
#ifdef USE_RSMA_ORIGIN
639
  if (pWriter->pRsmaSnapWriter) {
640
    code = rsmaSnapWriterPrepareClose(pWriter->pRsmaSnapWriter, rollback);
641
    if (code) goto _exit;
642
  }
643
#endif
644
  // commit json
645
  if (!rollback) {
28,722✔
646
    pWriter->info.state.committed = pWriter->ever;
28,617✔
647
    pVnode->config = pWriter->info.config;
28,617✔
648
    pVnode->state = (SVState){.committed = pWriter->info.state.committed,
57,234✔
649
                              .applied = pWriter->info.state.committed,
28,617✔
650
                              .commitID = pWriter->commitID,
28,617✔
651
                              .commitTerm = pWriter->info.state.commitTerm,
28,617✔
652
                              .applyTerm = pWriter->info.state.commitTerm};
28,617✔
653
    pVnode->statis = pWriter->info.statis;
28,617✔
654
    char dir[TSDB_FILENAME_LEN] = {0};
28,617✔
655
    vnodeGetPrimaryPath(pVnode, false, dir, TSDB_FILENAME_LEN);
28,617✔
656

657
    code = vnodeCommitInfo(dir);
28,617✔
658
    if (code) goto _exit;
28,617✔
659

660
  } else {
661
    vnodeRollback(pWriter->pVnode);
105✔
662
  }
663

664
  // commit/rollback sub-system
665
  if (pWriter->pMetaSnapWriter) {
28,722✔
666
    code = metaSnapWriterClose(&pWriter->pMetaSnapWriter, rollback);
25,297✔
667
    if (code) goto _exit;
25,297✔
668
  }
669

670
  if (pWriter->pTsdbSnapWriter) {
28,722✔
671
    code = tsdbSnapWriterClose(&pWriter->pTsdbSnapWriter, rollback);
361✔
672
    if (code) goto _exit;
361✔
673
  }
674

675
  if (pWriter->pTsdbSnapRAWWriter) {
28,722✔
676
    code = tsdbSnapRAWWriterClose(&pWriter->pTsdbSnapRAWWriter, rollback);
25,607✔
677
    if (code) goto _exit;
25,607✔
678
  }
679
#ifdef USE_TQ
680
  if (pWriter->pTqSnapHandleWriter) {
28,722✔
681
    code = tqSnapWriterClose(&pWriter->pTqSnapHandleWriter, rollback);
11,881✔
682
    if (code) goto _exit;
11,881✔
683
  }
684

685
  if (pWriter->pTqSnapOffsetWriter) {
28,722✔
686
    code = tqSnapWriterClose(&pWriter->pTqSnapOffsetWriter, rollback);
11,348✔
687
    if (code) goto _exit;
11,348✔
688
  }
689
#endif
690
#ifdef USE_RSMA_ORIGIN
691
  if (pWriter->pRsmaSnapWriter) {
692
    code = rsmaSnapWriterClose(&pWriter->pRsmaSnapWriter, rollback);
693
    if (code) goto _exit;
694
  }
695

696
  if (pWriter->pBseSnapWriter) {
697
    bseSnapWriterClose(&pWriter->pBseSnapWriter, rollback);
698
  }
699

700
#endif
701
  code = vnodeBegin(pVnode);
28,722✔
702
  if (code) goto _exit;
28,722✔
703

704
  (void)taosThreadMutexLock(&pVnode->mutex);
28,722✔
705
  pVnode->disableWrite = false;
28,722✔
706
  (void)taosThreadMutexUnlock(&pVnode->mutex);
28,722✔
707

708
_exit:
28,722✔
709
  if (code) {
28,722✔
710
    vError("vgId:%d, vnode snapshot writer close failed since %s", TD_VID(pWriter->pVnode), tstrerror(code));
×
711
  } else {
712
    vInfo("vgId:%d, vnode snapshot writer closed, rollback:%d", TD_VID(pVnode), rollback);
28,722✔
713
    taosMemoryFree(pWriter);
28,722✔
714
  }
715
  if (vnodeEnableBgTask(pVnode) != 0) {
28,722✔
716
    tsdbError("vgId:%d, failed to enable bg task", TD_VID(pVnode));
×
717
  }
718
  return code;
28,722✔
719
}
720

721
static int32_t vnodeSnapWriteInfo(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
28,722✔
722
  int32_t       code = 0;
28,722✔
723
  int32_t       lino;
724
  SVnode       *pVnode = pWriter->pVnode;
28,722✔
725
  SSnapDataHdr *pHdr = (SSnapDataHdr *)pData;
28,722✔
726

727
  if (taosWaitCfgKeyLoaded() != 0) {
28,722✔
728
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
729
  }
730

731
  // decode info
732
  code = vnodeDecodeInfo(pHdr->data, &pWriter->info);
28,722✔
733
  TSDB_CHECK_CODE(code, lino, _exit);
28,722✔
734

735
  // change some value
736
  pWriter->info.state.commitID = pWriter->commitID;
28,722✔
737

738
  // modify info as needed
739
  char dir[TSDB_FILENAME_LEN] = {0};
28,722✔
740
  vnodeGetPrimaryPath(pVnode, false, dir, TSDB_FILENAME_LEN);
28,722✔
741

742
  SVnodeStats vndStats = pWriter->info.config.vndStats;
28,722✔
743
  pWriter->info.config = pVnode->config;
28,722✔
744
  pWriter->info.config.vndStats = vndStats;
28,722✔
745
  vDebug("vgId:%d, save config while write snapshot", pWriter->pVnode->config.vgId);
28,722✔
746
  code = vnodeSaveInfo(dir, &pWriter->info);
28,722✔
747
  TSDB_CHECK_CODE(code, lino, _exit);
28,722✔
748

749
_exit:
28,722✔
750
  return code;
28,722✔
751
}
752

753
int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
339,620✔
754
  int32_t       code = 0;
339,620✔
755
  int32_t       lino;
756
  SSnapDataHdr *pHdr = (SSnapDataHdr *)pData;
339,620✔
757
  SVnode       *pVnode = pWriter->pVnode;
339,620✔
758

759
  if (!(pHdr->size + sizeof(SSnapDataHdr) == nData)) {
339,620✔
760
    return TSDB_CODE_INVALID_PARA;
×
761
  }
762

763
  if (pHdr->index != pWriter->index + 1) {
339,620✔
764
    vError("vgId:%d, unexpected vnode snapshot msg. index:%" PRId64 ", expected index:%" PRId64, TD_VID(pVnode),
×
765
           pHdr->index, pWriter->index + 1);
766
    TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_MSG, lino, _exit);
×
767
  }
768

769
  pWriter->index = pHdr->index;
339,620✔
770

771
  vDebug("vgId:%d, vnode snapshot write data, index:%" PRId64 " type:%d blockLen:%d", TD_VID(pVnode), pHdr->index,
339,620✔
772
         pHdr->type, nData);
773

774
  switch (pHdr->type) {
339,620✔
775
    case SNAP_DATA_CFG: {
28,722✔
776
      code = vnodeSnapWriteInfo(pWriter, pData, nData);
28,722✔
777
      TSDB_CHECK_CODE(code, lino, _exit);
28,722✔
778
    } break;
28,722✔
779
    case SNAP_DATA_META: {
174,313✔
780
      // meta
781
      if (pWriter->pMetaSnapWriter == NULL) {
174,313✔
782
        code = metaSnapWriterOpen(pVnode->pMeta, pWriter->sver, pWriter->ever, &pWriter->pMetaSnapWriter);
25,297✔
783
        TSDB_CHECK_CODE(code, lino, _exit);
25,297✔
784
      }
785

786
      code = metaSnapWrite(pWriter->pMetaSnapWriter, pData, nData);
174,313✔
787
      TSDB_CHECK_CODE(code, lino, _exit);
174,313✔
788
    } break;
174,313✔
789
    case SNAP_DATA_TSDB:
1,562✔
790
    case SNAP_DATA_DEL: {
791
      // tsdb
792
      if (pWriter->pTsdbSnapWriter == NULL) {
1,562✔
793
        code = tsdbSnapWriterOpen(pVnode->pTsdb, pWriter->sver, pWriter->ever, pWriter->pRanges,
361✔
794
                                  &pWriter->pTsdbSnapWriter);
795
        TSDB_CHECK_CODE(code, lino, _exit);
361✔
796
      }
797

798
      code = tsdbSnapWrite(pWriter->pTsdbSnapWriter, pHdr);
1,562✔
799
      TSDB_CHECK_CODE(code, lino, _exit);
1,562✔
800
    } break;
1,562✔
801
    case SNAP_DATA_RAW: {
111,794✔
802
      // tsdb
803
      if (pWriter->pTsdbSnapRAWWriter == NULL) {
111,794✔
804
        code = tsdbSnapRAWWriterOpen(pVnode->pTsdb, pWriter->ever, &pWriter->pTsdbSnapRAWWriter);
25,607✔
805
        TSDB_CHECK_CODE(code, lino, _exit);
25,607✔
806
      }
807

808
      code = tsdbSnapRAWWrite(pWriter->pTsdbSnapRAWWriter, pHdr);
111,794✔
809
      TSDB_CHECK_CODE(code, lino, _exit);
111,794✔
810
    } break;
111,794✔
811
#ifdef USE_TQ
812
    case SNAP_DATA_TQ_HANDLE: {
11,881✔
813
      // tq handle
814
      if (pWriter->pTqSnapHandleWriter == NULL) {
11,881✔
815
        code = tqSnapWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pTqSnapHandleWriter);
11,881✔
816
        TSDB_CHECK_CODE(code, lino, _exit);
11,881✔
817
      }
818

819
      code = tqSnapHandleWrite(pWriter->pTqSnapHandleWriter, pData, nData);
11,881✔
820
      TSDB_CHECK_CODE(code, lino, _exit);
11,881✔
821
    } break;
11,881✔
822
    case SNAP_DATA_TQ_OFFSET: {
11,348✔
823
      // tq offset
824
      if (pWriter->pTqSnapOffsetWriter == NULL) {
11,348✔
825
        code = tqSnapWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pTqSnapOffsetWriter);
11,348✔
826
        TSDB_CHECK_CODE(code, lino, _exit);
11,348✔
827
      }
828

829
      code = tqSnapOffsetWrite(pWriter->pTqSnapOffsetWriter, pData, nData);
11,348✔
830
      TSDB_CHECK_CODE(code, lino, _exit);
11,348✔
831
    } break;
11,348✔
832
#endif
833
#ifdef USE_RSMA_ORIGIN
834
    case SNAP_DATA_RSMA1:
835
    case SNAP_DATA_RSMA2:
836
    case SNAP_DATA_QTASK: {
837
      // rsma1/rsma2/qtask for rsma
838
      if (pWriter->pRsmaSnapWriter == NULL) {
839
        code = rsmaSnapWriterOpen(pVnode->pSma, pWriter->sver, pWriter->ever, (void **)pWriter->pRsmaRanges,
840
                                  &pWriter->pRsmaSnapWriter);
841
        TSDB_CHECK_CODE(code, lino, _exit);
842
      }
843

844
      code = rsmaSnapWrite(pWriter->pRsmaSnapWriter, pData, nData);
845
      TSDB_CHECK_CODE(code, lino, _exit);
846
    } break;
847
    case SNAP_DATA_BSE: {
848
      if (pWriter->pBseSnapWriter == NULL) {
849
        code = bseSnapWriterOpen(pVnode->pBse, pWriter->sver, pWriter->ever, &pWriter->pBseSnapWriter);
850
        TSDB_CHECK_CODE(code, lino, _exit);
851
      }
852
      code = bseSnapWriterWrite(pWriter->pBseSnapWriter, pData, nData);
853
      TSDB_CHECK_CODE(code, lino, _exit);
854
    } break;
855
#endif
856
    default:
×
857
      break;
×
858
  }
859
_exit:
339,620✔
860
  if (code) {
339,620✔
861
    vError("vgId:%d, vnode snapshot write failed since %s, index:%" PRId64 " type:%d nData:%d", TD_VID(pVnode),
×
862
           tstrerror(code), pHdr->index, pHdr->type, nData);
863
  }
864
  return code;
339,620✔
865
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc