• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4888

17 Dec 2025 07:43AM UTC coverage: 65.281% (-0.008%) from 65.289%
#4888

push

travis-ci

web-flow
test: update ci env (#33959)

178960 of 274136 relevant lines covered (65.28%)

101703773.12 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.79
/source/dnode/vnode/src/vnd/vnodeSnapshot.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "bse.h"
17
#include "tsdb.h"
18
#include "vnd.h"
19

20
static int32_t vnodeExtractSnapInfoDiff(void *buf, int32_t bufLen, TFileSetRangeArray **ppRanges) {
40,331✔
21
  int32_t            code = 0;
40,331✔
22
  STsdbFSetPartList *pList = tsdbFSetPartListCreate();
40,331✔
23
  if (pList == NULL) {
40,331✔
24
    code = terrno;
×
25
    goto _out;
×
26
  }
27

28
  code = tDeserializeTsdbFSetPartList(buf, bufLen, pList);
40,331✔
29
  if (code) goto _out;
40,331✔
30

31
  code = tsdbFSetPartListToRangeDiff(pList, ppRanges);
40,331✔
32
  if (code) goto _out;
40,331✔
33

34
_out:
40,331✔
35
  tsdbFSetPartListDestroy(&pList);
40,331✔
36
  return code;
40,331✔
37
}
38

39
// SVSnapReader ========================================================
40
struct SVSnapReader {
41
  SVnode *pVnode;
42
  int64_t sver;
43
  int64_t ever;
44
  int64_t index;
45
  // config
46
  int8_t cfgDone;
47
  // meta
48
  int8_t           metaDone;
49
  SMetaSnapReader *pMetaReader;
50
  // tsdb
51
  int8_t              tsdbDone;
52
  TFileSetRangeArray *pRanges;
53
  STsdbSnapReader    *pTsdbReader;
54
  // tsdb raw
55
  int8_t              tsdbRAWDone;
56
  STsdbSnapRAWReader *pTsdbRAWReader;
57

58
  // tq
59
  int8_t         tqHandleDone;
60
  STqSnapReader *pTqSnapReader;
61
  int8_t         tqOffsetDone;
62
  STqSnapReader *pTqOffsetReader;
63
  // rsma
64
  int8_t              rsmaDone;
65
  TFileSetRangeArray *pRsmaRanges[TSDB_RETENTION_L2];
66
  SRSmaSnapReader    *pRsmaReader;
67

68
  // bse
69
  int8_t          bseDone;
70
  SBseSnapReader *pBseReader;
71
};
72

73
static TFileSetRangeArray **vnodeSnapReaderGetTsdbRanges(SVSnapReader *pReader, int32_t tsdbTyp) {
80,672✔
74
  if (!(sizeof(pReader->pRsmaRanges) / sizeof(pReader->pRsmaRanges[0]) == 2)) {
75
    terrno = TSDB_CODE_INVALID_PARA;
76
    return NULL;
77
  }
78
  switch (tsdbTyp) {
80,672✔
79
    case SNAP_DATA_TSDB:
40,336✔
80
      return &pReader->pRanges;
40,336✔
81
    case SNAP_DATA_RSMA1:
20,168✔
82
      return &pReader->pRsmaRanges[0];
20,168✔
83
    case SNAP_DATA_RSMA2:
20,168✔
84
      return &pReader->pRsmaRanges[1];
20,168✔
85
    default:
×
86
      return NULL;
×
87
  }
88
}
89

90
static int32_t vnodeSnapReaderDealWithSnapInfo(SVSnapReader *pReader, SSnapshotParam *pParam) {
20,168✔
91
  int32_t code = 0;
20,168✔
92
  SVnode *pVnode = pReader->pVnode;
20,168✔
93

94
  if (pParam->data) {
20,168✔
95
    // decode
96
    SSyncTLV *datHead = (void *)pParam->data;
20,168✔
97
    if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
20,168✔
98
      code = TSDB_CODE_INVALID_DATA_FMT;
×
99
      terrno = code;
×
100
      goto _out;
×
101
    }
102

103
    STsdbRepOpts         tsdbOpts = {0};
20,168✔
104
    TFileSetRangeArray **ppRanges = NULL;
20,168✔
105
    int32_t              offset = 0;
20,168✔
106

107
    while (offset + sizeof(SSyncTLV) < datHead->len) {
60,504✔
108
      SSyncTLV *subField = (void *)(datHead->val + offset);
40,336✔
109
      offset += sizeof(SSyncTLV) + subField->len;
40,336✔
110
      void   *buf = subField->val;
40,336✔
111
      int32_t bufLen = subField->len;
40,336✔
112

113
      switch (subField->typ) {
40,336✔
114
        case SNAP_DATA_TSDB:
20,168✔
115
        case SNAP_DATA_RSMA1:
116
        case SNAP_DATA_RSMA2: {
117
          ppRanges = vnodeSnapReaderGetTsdbRanges(pReader, subField->typ);
20,168✔
118
          if (ppRanges == NULL) {
20,168✔
119
            vError("vgId:%d, unexpected subfield type in snapshot param. subtyp:%d", TD_VID(pVnode), subField->typ);
×
120
            code = TSDB_CODE_INVALID_DATA_FMT;
×
121
            goto _out;
×
122
          }
123
          code = vnodeExtractSnapInfoDiff(buf, bufLen, ppRanges);
20,168✔
124
          if (code) {
20,168✔
125
            vError("vgId:%d, failed to get range diff since %s", TD_VID(pVnode), terrstr());
×
126
            goto _out;
×
127
          }
128
        } break;
20,168✔
129
        case SNAP_DATA_RAW: {
20,168✔
130
          code = tDeserializeTsdbRepOpts(buf, bufLen, &tsdbOpts);
20,168✔
131
          if (code) {
20,168✔
132
            vError("vgId:%d, failed to deserialize tsdb rep opts since %s", TD_VID(pVnode), terrstr());
×
133
            goto _out;
×
134
          }
135
        } break;
20,168✔
136
        default:
×
137
          vError("vgId:%d, unexpected subfield type of snap info. typ:%d", TD_VID(pVnode), subField->typ);
×
138
          code = TSDB_CODE_INVALID_DATA_FMT;
×
139
          goto _out;
×
140
      }
141
    }
142

143
    // toggle snap replication mode
144
    vInfo("vgId:%d, vnode snap reader supported tsdb rep of format:%d", TD_VID(pVnode), tsdbOpts.format);
20,168✔
145
    if (pReader->sver == 0 && tsdbOpts.format == TSDB_SNAP_REP_FMT_RAW) {
20,168✔
146
      pReader->tsdbDone = true;
19,529✔
147
    } else {
148
      pReader->tsdbRAWDone = true;
639✔
149
    }
150

151
    vInfo("vgId:%d, vnode snap writer enabled replication mode: %s", TD_VID(pVnode),
20,168✔
152
          (pReader->tsdbDone ? "raw" : "normal"));
153
  }
154

155
_out:
×
156
  return code;
20,168✔
157
}
158

159
int32_t vnodeSnapReaderOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapReader **ppReader) {
20,168✔
160
  int32_t       code = 0;
20,168✔
161
  int64_t       sver = pParam->start;
20,168✔
162
  int64_t       ever = pParam->end;
20,168✔
163
  SVSnapReader *pReader = NULL;
20,168✔
164

165
  pReader = (SVSnapReader *)taosMemoryCalloc(1, sizeof(*pReader));
20,168✔
166
  if (pReader == NULL) {
20,168✔
167
    code = terrno;
×
168
    goto _exit;
×
169
  }
170
  pReader->pVnode = pVnode;
20,168✔
171
  pReader->sver = sver;
20,168✔
172
  pReader->ever = ever;
20,168✔
173

174
  // snapshot info
175
  code = vnodeSnapReaderDealWithSnapInfo(pReader, pParam);
20,168✔
176
  if (code) goto _exit;
20,168✔
177

178
  // open tsdb snapshot raw reader
179
  if (!pReader->tsdbRAWDone) {
20,168✔
180
    code = tsdbSnapRAWReaderOpen(pVnode->pTsdb, ever, SNAP_DATA_RAW, &pReader->pTsdbRAWReader);
19,529✔
181
    if (code) goto _exit;
19,529✔
182
  }
183

184
  // check snapshot ever
185
  SSnapshot snapshot = {0};
20,168✔
186
  code = vnodeGetSnapshot(pVnode, &snapshot);
20,168✔
187
  if (code) goto _exit;
20,168✔
188
  if (ever != snapshot.lastApplyIndex) {
20,168✔
189
    vError("vgId:%d, abort reader open due to vnode snapshot changed. ever:%" PRId64 ", commit ver:%" PRId64,
×
190
           TD_VID(pVnode), ever, snapshot.lastApplyIndex);
191
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
192
    goto _exit;
×
193
  }
194

195
_exit:
20,168✔
196
  if (code) {
20,168✔
197
    vError("vgId:%d, vnode snapshot reader open failed since %s", TD_VID(pVnode), tstrerror(code));
×
198
    *ppReader = NULL;
×
199
  } else {
200
    vInfo("vgId:%d, vnode snapshot reader opened, sver:%" PRId64 " ever:%" PRId64, TD_VID(pVnode), sver, ever);
20,168✔
201
    *ppReader = pReader;
20,168✔
202
  }
203
  return code;
20,168✔
204
}
205

206
static void vnodeSnapReaderDestroyTsdbRanges(SVSnapReader *pReader) {
20,168✔
207
  int32_t tsdbTyps[TSDB_RETENTION_MAX] = {SNAP_DATA_TSDB, SNAP_DATA_RSMA1, SNAP_DATA_RSMA2};
20,168✔
208
  for (int32_t j = 0; j < TSDB_RETENTION_MAX; ++j) {
80,672✔
209
    TFileSetRangeArray **ppRanges = vnodeSnapReaderGetTsdbRanges(pReader, tsdbTyps[j]);
60,504✔
210
    if (ppRanges == NULL) continue;
60,504✔
211
    tsdbTFileSetRangeArrayDestroy(ppRanges);
60,504✔
212
  }
213
}
20,168✔
214

215
void vnodeSnapReaderClose(SVSnapReader *pReader) {
20,168✔
216
  vInfo("vgId:%d, close vnode snapshot reader", TD_VID(pReader->pVnode));
20,168✔
217
  vnodeSnapReaderDestroyTsdbRanges(pReader);
20,168✔
218
#ifdef USE_RSMA_ORIGIN
219
  if (pReader->pRsmaReader) {
220
    rsmaSnapReaderClose(&pReader->pRsmaReader);
221
  }
222
#endif
223

224
  if (pReader->pTsdbReader) {
20,168✔
225
    tsdbSnapReaderClose(&pReader->pTsdbReader);
×
226
  }
227

228
  if (pReader->pTsdbRAWReader) {
20,168✔
229
    tsdbSnapRAWReaderClose(&pReader->pTsdbRAWReader);
×
230
  }
231

232
  if (pReader->pMetaReader) {
20,168✔
233
    metaSnapReaderClose(&pReader->pMetaReader);
×
234
  }
235
#ifdef USE_TQ
236
  if (pReader->pTqSnapReader) {
20,168✔
237
    tqSnapReaderClose(&pReader->pTqSnapReader);
×
238
  }
239

240
  if (pReader->pTqOffsetReader) {
20,168✔
241
    tqSnapReaderClose(&pReader->pTqOffsetReader);
×
242
  }
243

244
#endif
245

246
  if (pReader->pBseReader) {
20,168✔
247
    bseSnapReaderClose(&pReader->pBseReader);
×
248
  }
249
  taosMemoryFree(pReader);
20,168✔
250
}
20,168✔
251

252
int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) {
238,920✔
253
  int32_t code = 0;
238,920✔
254
  int32_t lino;
255
  SVnode *pVnode = pReader->pVnode;
238,920✔
256
  int32_t vgId = TD_VID(pReader->pVnode);
238,920✔
257

258
  // CONFIG ==============
259
  // FIXME: if commit multiple times and the config changed?
260
  if (!pReader->cfgDone) {
238,920✔
261
    char    fName[TSDB_FILENAME_LEN];
20,168✔
262
    int32_t offset = 0;
20,168✔
263

264
    vnodeGetPrimaryPath(pVnode, false, fName, TSDB_FILENAME_LEN);
20,168✔
265
    offset = strlen(fName);
20,168✔
266
    snprintf(fName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VND_INFO_FNAME);
20,168✔
267

268
    TdFilePtr pFile = taosOpenFile(fName, TD_FILE_READ);
20,168✔
269
    if (NULL == pFile) {
20,168✔
270
      code = terrno;
×
271
      TSDB_CHECK_CODE(code, lino, _exit);
×
272
    }
273

274
    int64_t size;
20,168✔
275
    code = taosFStatFile(pFile, &size, NULL);
20,168✔
276
    if (code != 0) {
20,168✔
277
      if (taosCloseFile(&pFile) != 0) {
×
278
        vError("vgId:%d, failed to close file", vgId);
×
279
      }
280
      TSDB_CHECK_CODE(code, lino, _exit);
×
281
    }
282

283
    *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + size + 1);
20,168✔
284
    if (*ppData == NULL) {
20,168✔
285
      if (taosCloseFile(&pFile) != 0) {
×
286
        vError("vgId:%d, failed to close file", vgId);
×
287
      }
288
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
289
    }
290
    ((SSnapDataHdr *)(*ppData))->type = SNAP_DATA_CFG;
20,168✔
291
    ((SSnapDataHdr *)(*ppData))->size = size + 1;
20,168✔
292
    ((SSnapDataHdr *)(*ppData))->data[size] = '\0';
20,168✔
293

294
    if (taosReadFile(pFile, ((SSnapDataHdr *)(*ppData))->data, size) < 0) {
20,168✔
295
      taosMemoryFree(*ppData);
×
296
      if (taosCloseFile(&pFile) != 0) {
×
297
        vError("vgId:%d, failed to close file", vgId);
×
298
      }
299
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
300
    }
301

302
    if (taosCloseFile(&pFile) != 0) {
20,168✔
303
      vError("vgId:%d, failed to close file", vgId);
×
304
    }
305

306
    pReader->cfgDone = 1;
20,168✔
307
    goto _exit;
20,168✔
308
  }
309

310
  // META ==============
311
  if (!pReader->metaDone) {
218,752✔
312
    // open reader if not
313
    if (pReader->pMetaReader == NULL) {
72,258✔
314
      code = metaSnapReaderOpen(pReader->pVnode->pMeta, pReader->sver, pReader->ever, &pReader->pMetaReader);
20,168✔
315
      TSDB_CHECK_CODE(code, lino, _exit);
20,168✔
316
    }
317

318
    code = metaSnapRead(pReader->pMetaReader, ppData);
72,258✔
319
    TSDB_CHECK_CODE(code, lino, _exit);
72,258✔
320

321
    if (*ppData) {
72,258✔
322
      goto _exit;
52,090✔
323
    } else {
324
      pReader->metaDone = 1;
20,168✔
325
      metaSnapReaderClose(&pReader->pMetaReader);
20,168✔
326
    }
327
  }
328

329
  // TSDB ==============
330
  if (!pReader->tsdbDone) {
166,662✔
331
    // open if not
332
    if (pReader->pTsdbReader == NULL) {
1,916✔
333
      code = tsdbSnapReaderOpen(pReader->pVnode->pTsdb, pReader->sver, pReader->ever, SNAP_DATA_TSDB, pReader->pRanges,
639✔
334
                                &pReader->pTsdbReader);
335
      TSDB_CHECK_CODE(code, lino, _exit);
639✔
336
    }
337

338
    code = tsdbSnapRead(pReader->pTsdbReader, ppData);
1,916✔
339
    TSDB_CHECK_CODE(code, lino, _exit);
1,916✔
340
    if (*ppData) {
1,916✔
341
      goto _exit;
1,277✔
342
    } else {
343
      pReader->tsdbDone = 1;
639✔
344
      tsdbSnapReaderClose(&pReader->pTsdbReader);
639✔
345
    }
346
  }
347

348
  if (!pReader->tsdbRAWDone) {
165,385✔
349
    // open if not
350
    if (pReader->pTsdbRAWReader == NULL) {
163,762✔
351
      code = tsdbSnapRAWReaderOpen(pReader->pVnode->pTsdb, pReader->ever, SNAP_DATA_RAW, &pReader->pTsdbRAWReader);
×
352
      TSDB_CHECK_CODE(code, lino, _exit);
×
353
    }
354

355
    code = tsdbSnapRAWRead(pReader->pTsdbRAWReader, ppData);
163,762✔
356
    TSDB_CHECK_CODE(code, lino, _exit);
163,762✔
357
    if (*ppData) {
163,762✔
358
      goto _exit;
144,233✔
359
    } else {
360
      pReader->tsdbRAWDone = 1;
19,529✔
361
      tsdbSnapRAWReaderClose(&pReader->pTsdbRAWReader);
19,529✔
362
    }
363
  }
364

365
  // TQ ================
366
#ifdef USE_TQ
367
  vInfo("vgId:%d tq transform start", vgId);
21,152✔
368
  if (!pReader->tqHandleDone) {
21,152✔
369
    if (pReader->pTqSnapReader == NULL) {
20,660✔
370
      code = tqSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->ever, SNAP_DATA_TQ_HANDLE,
20,168✔
371
                              &pReader->pTqSnapReader);
372
      TSDB_CHECK_CODE(code, lino, _exit);
20,168✔
373
    }
374

375
    code = tqSnapRead(pReader->pTqSnapReader, ppData);
20,660✔
376
    TSDB_CHECK_CODE(code, lino, _exit);
20,660✔
377
    if (*ppData) {
20,660✔
378
      goto _exit;
492✔
379
    } else {
380
      pReader->tqHandleDone = 1;
20,168✔
381
      tqSnapReaderClose(&pReader->pTqSnapReader);
20,168✔
382
    }
383
  }
384
  if (!pReader->tqOffsetDone) {
20,660✔
385
    if (pReader->pTqOffsetReader == NULL) {
20,660✔
386
      code = tqSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->ever, SNAP_DATA_TQ_OFFSET,
20,168✔
387
                              &pReader->pTqOffsetReader);
388
      TSDB_CHECK_CODE(code, lino, _exit);
20,168✔
389
    }
390

391
    code = tqSnapRead(pReader->pTqOffsetReader, ppData);
20,660✔
392
    TSDB_CHECK_CODE(code, lino, _exit);
20,660✔
393
    if (*ppData) {
20,660✔
394
      goto _exit;
492✔
395
    } else {
396
      pReader->tqOffsetDone = 1;
20,168✔
397
      tqSnapReaderClose(&pReader->pTqOffsetReader);
20,168✔
398
    }
399
  }
400
#endif
401
  // RSMA ==============
402
#ifdef USE_RSMA_ORIGIN
403
  if (VND_IS_RSMA(pReader->pVnode) && !pReader->rsmaDone) {
404
    // open if not
405
    if (pReader->pRsmaReader == NULL) {
406
      code = rsmaSnapReaderOpen(pReader->pVnode->pSma, pReader->sver, pReader->ever, &pReader->pRsmaReader);
407
      TSDB_CHECK_CODE(code, lino, _exit);
408
    }
409

410
    code = rsmaSnapRead(pReader->pRsmaReader, ppData);
411
    TSDB_CHECK_CODE(code, lino, _exit);
412
    if (*ppData) {
413
      goto _exit;
414
    } else {
415
      pReader->rsmaDone = 1;
416
      rsmaSnapReaderClose(&pReader->pRsmaReader);
417
    }
418
  }
419

420
  if (!pReader->bseDone) {
421
    if (pReader->pBseReader == NULL) {
422
      code = bseSnapReaderOpen(pReader->pVnode->pBse, pReader->sver, pReader->ever, &pReader->pBseReader);
423
      TSDB_CHECK_CODE(code, lino, _exit);
424
    }
425
    int32_t len = 0;
426
    code = bseSnapReaderRead(pReader->pBseReader, ppData);
427
    TSDB_CHECK_CODE(code, lino, _exit);
428
    if (*ppData) {
429
      goto _exit;
430
    } else {
431
      pReader->bseDone = 1;
432
      bseSnapReaderClose(&pReader->pBseReader);
433
    }
434
  }
435

436
#endif
437
  *ppData = NULL;
20,168✔
438
  *nData = 0;
20,168✔
439

440
_exit:
238,920✔
441
  if (code) {
238,920✔
442
    vError("vgId:%d, vnode snapshot read failed at %s:%d since %s", vgId, __FILE__, lino, tstrerror(code));
×
443
  } else {
444
    if (*ppData) {
238,920✔
445
      SSnapDataHdr *pHdr = (SSnapDataHdr *)(*ppData);
218,752✔
446

447
      pReader->index++;
218,752✔
448
      *nData = sizeof(SSnapDataHdr) + pHdr->size;
218,752✔
449
      pHdr->index = pReader->index;
218,752✔
450
      vDebug("vgId:%d, vnode snapshot read data, index:%" PRId64 " type:%d blockLen:%d ", vgId, pReader->index,
218,752✔
451
             pHdr->type, *nData);
452
    } else {
453
      vInfo("vgId:%d, vnode snapshot read data end, index:%" PRId64, vgId, pReader->index);
20,168✔
454
    }
455
  }
456
  return code;
238,920✔
457
}
458

459
// SVSnapWriter ========================================================
460
struct SVSnapWriter {
461
  SVnode *pVnode;
462
  int64_t sver;
463
  int64_t ever;
464
  int64_t commitID;
465
  int64_t index;
466
  // config
467
  SVnodeInfo info;
468
  // meta
469
  SMetaSnapWriter *pMetaSnapWriter;
470
  // tsdb
471
  TFileSetRangeArray *pRanges;
472
  STsdbSnapWriter    *pTsdbSnapWriter;
473
  // tsdb raw
474
  STsdbSnapRAWWriter *pTsdbSnapRAWWriter;
475
  // tq
476
  STqSnapWriter *pTqSnapHandleWriter;
477
  STqSnapWriter *pTqSnapOffsetWriter;
478
  // rsma
479
  TFileSetRangeArray *pRsmaRanges[TSDB_RETENTION_L2];
480
  SRSmaSnapWriter    *pRsmaSnapWriter;
481

482
  // bse
483
  SBseSnapWriter *pBseSnapWriter;
484
};
485

486
TFileSetRangeArray **vnodeSnapWriterGetTsdbRanges(SVSnapWriter *pWriter, int32_t tsdbTyp) {
80,652✔
487
  switch (tsdbTyp) {
80,652✔
488
    case SNAP_DATA_TSDB:
40,326✔
489
      return &pWriter->pRanges;
40,326✔
490
    case SNAP_DATA_RSMA1:
20,163✔
491
      return &pWriter->pRsmaRanges[0];
20,163✔
492
    case SNAP_DATA_RSMA2:
20,163✔
493
      return &pWriter->pRsmaRanges[1];
20,163✔
494
    default:
×
495
      return NULL;
×
496
  }
497
}
498

499
static int32_t vnodeSnapWriterDealWithSnapInfo(SVSnapWriter *pWriter, SSnapshotParam *pParam) {
20,163✔
500
  SVnode *pVnode = pWriter->pVnode;
20,163✔
501
  int32_t code = 0;
20,163✔
502
  int32_t lino;
503

504
  if (pParam->data) {
20,163✔
505
    SSyncTLV *datHead = (void *)pParam->data;
20,163✔
506
    if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
20,163✔
507
      TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_DATA_FMT, lino, _exit);
×
508
    }
509

510
    STsdbRepOpts         tsdbOpts = {0};
20,163✔
511
    TFileSetRangeArray **ppRanges = NULL;
20,163✔
512
    int32_t              offset = 0;
20,163✔
513

514
    while (offset + sizeof(SSyncTLV) < datHead->len) {
60,489✔
515
      SSyncTLV *subField = (void *)(datHead->val + offset);
40,326✔
516
      offset += sizeof(SSyncTLV) + subField->len;
40,326✔
517
      void   *buf = subField->val;
40,326✔
518
      int32_t bufLen = subField->len;
40,326✔
519

520
      switch (subField->typ) {
40,326✔
521
        case SNAP_DATA_TSDB:
20,163✔
522
        case SNAP_DATA_RSMA1:
523
        case SNAP_DATA_RSMA2: {
524
          ppRanges = vnodeSnapWriterGetTsdbRanges(pWriter, subField->typ);
20,163✔
525
          if (ppRanges == NULL) {
20,163✔
526
            vError("vgId:%d, unexpected subfield type in snapshot param. subtyp:%d", TD_VID(pVnode), subField->typ);
×
527
            TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
528
          }
529

530
          code = vnodeExtractSnapInfoDiff(buf, bufLen, ppRanges);
20,163✔
531
          TSDB_CHECK_CODE(code, lino, _exit);
20,163✔
532
        } break;
20,163✔
533
        case SNAP_DATA_RAW: {
20,163✔
534
          code = tDeserializeTsdbRepOpts(buf, bufLen, &tsdbOpts);
20,163✔
535
          TSDB_CHECK_CODE(code, lino, _exit);
20,163✔
536
        } break;
20,163✔
537
        default:
×
538
          vError("vgId:%d, unexpected subfield type of snap info. typ:%d", TD_VID(pVnode), subField->typ);
×
539
          TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_DATA_FMT, lino, _exit);
×
540
          goto _exit;
541
      }
542
    }
543

544
    vInfo("vgId:%d, vnode snap writer supported tsdb rep of format:%d", TD_VID(pVnode), tsdbOpts.format);
20,163✔
545
  }
546

547
_exit:
×
548
  if (code) {
20,163✔
549
    vError("vgId:%d %s failed at %s:%d since %s", TD_VID(pVnode), __func__, __FILE__, __LINE__, tstrerror(code));
×
550
  }
551
  return code;
20,163✔
552
}
553

554
extern int32_t tsdbDisableAndCancelAllBgTask(STsdb *pTsdb);
555
extern void    tsdbEnableBgTask(STsdb *pTsdb);
556

557
static int32_t vnodeCancelAndDisableAllBgTask(SVnode *pVnode) {
20,163✔
558
  TAOS_CHECK_RETURN(tsdbDisableAndCancelAllBgTask(pVnode->pTsdb));
20,163✔
559
  TAOS_CHECK_RETURN(vnodeSyncCommit(pVnode));
20,163✔
560
  return 0;
20,163✔
561
}
562

563
static int32_t vnodeEnableBgTask(SVnode *pVnode) {
20,163✔
564
  tsdbEnableBgTask(pVnode->pTsdb);
20,163✔
565
  return 0;
20,163✔
566
}
567

568
int32_t vnodeSnapWriterOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapWriter **ppWriter) {
20,163✔
569
  int32_t       code = 0;
20,163✔
570
  int32_t       lino;
571
  SVSnapWriter *pWriter = NULL;
20,163✔
572
  int64_t       sver = pParam->start;
20,163✔
573
  int64_t       ever = pParam->end;
20,163✔
574

575
  // disable write, cancel and disable all bg tasks
576
  (void)taosThreadMutexLock(&pVnode->mutex);
20,163✔
577
  pVnode->disableWrite = true;
20,163✔
578
  (void)taosThreadMutexUnlock(&pVnode->mutex);
20,163✔
579

580
  code = vnodeCancelAndDisableAllBgTask(pVnode);
20,163✔
581
  TSDB_CHECK_CODE(code, lino, _exit);
20,163✔
582

583
  // alloc
584
  pWriter = (SVSnapWriter *)taosMemoryCalloc(1, sizeof(*pWriter));
20,163✔
585
  if (pWriter == NULL) {
20,163✔
586
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
587
  }
588
  pWriter->pVnode = pVnode;
20,163✔
589
  pWriter->sver = sver;
20,163✔
590
  pWriter->ever = ever;
20,163✔
591

592
  // inc commit ID
593
  pWriter->commitID = ++pVnode->state.commitID;
20,163✔
594

595
  // snapshot info
596
  code = vnodeSnapWriterDealWithSnapInfo(pWriter, pParam);
20,163✔
597
  TSDB_CHECK_CODE(code, lino, _exit);
20,163✔
598

599
_exit:
20,163✔
600
  if (code) {
20,163✔
601
    vError("vgId:%d, vnode snapshot writer open failed since %s", TD_VID(pVnode), tstrerror(code));
×
602
    if (pWriter) taosMemoryFreeClear(pWriter);
×
603
    *ppWriter = NULL;
×
604
  } else {
605
    vInfo("vgId:%d, vnode snapshot writer opened, sver:%" PRId64 " ever:%" PRId64 " commit id:%" PRId64, TD_VID(pVnode),
20,163✔
606
          sver, ever, pWriter->commitID);
607
    *ppWriter = pWriter;
20,163✔
608
  }
609
  return code;
20,163✔
610
}
611

612
static void vnodeSnapWriterDestroyTsdbRanges(SVSnapWriter *pWriter) {
20,163✔
613
  int32_t tsdbTyps[TSDB_RETENTION_MAX] = {SNAP_DATA_TSDB, SNAP_DATA_RSMA1, SNAP_DATA_RSMA2};
20,163✔
614
  for (int32_t j = 0; j < TSDB_RETENTION_MAX; ++j) {
80,652✔
615
    TFileSetRangeArray **ppRanges = vnodeSnapWriterGetTsdbRanges(pWriter, tsdbTyps[j]);
60,489✔
616
    if (ppRanges == NULL) continue;
60,489✔
617
    tsdbTFileSetRangeArrayDestroy(ppRanges);
60,489✔
618
  }
619
}
20,163✔
620

621
int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *pSnapshot) {
20,163✔
622
  int32_t code = 0;
20,163✔
623
  SVnode *pVnode = pWriter->pVnode;
20,163✔
624

625
  vnodeSnapWriterDestroyTsdbRanges(pWriter);
20,163✔
626

627
  // prepare
628
  if (pWriter->pTsdbSnapWriter) {
20,163✔
629
    code = tsdbSnapWriterPrepareClose(pWriter->pTsdbSnapWriter, rollback);
640✔
630
    if (code) goto _exit;
640✔
631
  }
632

633
  if (pWriter->pTsdbSnapRAWWriter) {
20,163✔
634
    code = tsdbSnapRAWWriterPrepareClose(pWriter->pTsdbSnapRAWWriter);
16,175✔
635
    if (code) goto _exit;
16,175✔
636
  }
637
#ifdef USE_RSMA_ORIGIN
638
  if (pWriter->pRsmaSnapWriter) {
639
    code = rsmaSnapWriterPrepareClose(pWriter->pRsmaSnapWriter, rollback);
640
    if (code) goto _exit;
641
  }
642
#endif
643
  // commit json
644
  if (!rollback) {
20,163✔
645
    pWriter->info.state.committed = pWriter->ever;
20,163✔
646
    pVnode->config = pWriter->info.config;
20,163✔
647
    pVnode->state = (SVState){.committed = pWriter->info.state.committed,
40,326✔
648
                              .applied = pWriter->info.state.committed,
20,163✔
649
                              .commitID = pWriter->commitID,
20,163✔
650
                              .commitTerm = pWriter->info.state.commitTerm,
20,163✔
651
                              .applyTerm = pWriter->info.state.commitTerm};
20,163✔
652
    pVnode->statis = pWriter->info.statis;
20,163✔
653
    char dir[TSDB_FILENAME_LEN] = {0};
20,163✔
654
    vnodeGetPrimaryPath(pVnode, false, dir, TSDB_FILENAME_LEN);
20,163✔
655

656
    code = vnodeCommitInfo(dir);
20,163✔
657
    if (code) goto _exit;
20,163✔
658

659
  } else {
660
    vnodeRollback(pWriter->pVnode);
×
661
  }
662

663
  // commit/rollback sub-system
664
  if (pWriter->pMetaSnapWriter) {
20,163✔
665
    code = metaSnapWriterClose(&pWriter->pMetaSnapWriter, rollback);
16,761✔
666
    if (code) goto _exit;
16,761✔
667
  }
668

669
  if (pWriter->pTsdbSnapWriter) {
20,163✔
670
    code = tsdbSnapWriterClose(&pWriter->pTsdbSnapWriter, rollback);
640✔
671
    if (code) goto _exit;
640✔
672
  }
673

674
  if (pWriter->pTsdbSnapRAWWriter) {
20,163✔
675
    code = tsdbSnapRAWWriterClose(&pWriter->pTsdbSnapRAWWriter, rollback);
16,175✔
676
    if (code) goto _exit;
16,175✔
677
  }
678
#ifdef USE_TQ
679
  if (pWriter->pTqSnapHandleWriter) {
20,163✔
680
    code = tqSnapWriterClose(&pWriter->pTqSnapHandleWriter, rollback);
484✔
681
    if (code) goto _exit;
484✔
682
  }
683

684
  if (pWriter->pTqSnapOffsetWriter) {
20,163✔
685
    code = tqSnapWriterClose(&pWriter->pTqSnapOffsetWriter, rollback);
484✔
686
    if (code) goto _exit;
484✔
687
  }
688
#endif
689
#ifdef USE_RSMA_ORIGIN
690
  if (pWriter->pRsmaSnapWriter) {
691
    code = rsmaSnapWriterClose(&pWriter->pRsmaSnapWriter, rollback);
692
    if (code) goto _exit;
693
  }
694

695
  if (pWriter->pBseSnapWriter) {
696
    bseSnapWriterClose(&pWriter->pBseSnapWriter, rollback);
697
  }
698

699
#endif
700
  code = vnodeBegin(pVnode);
20,163✔
701
  if (code) goto _exit;
20,163✔
702

703
  (void)taosThreadMutexLock(&pVnode->mutex);
20,163✔
704
  pVnode->disableWrite = false;
20,163✔
705
  (void)taosThreadMutexUnlock(&pVnode->mutex);
20,163✔
706

707
_exit:
20,163✔
708
  if (code) {
20,163✔
709
    vError("vgId:%d, vnode snapshot writer close failed since %s", TD_VID(pWriter->pVnode), tstrerror(code));
×
710
  } else {
711
    vInfo("vgId:%d, vnode snapshot writer closed, rollback:%d", TD_VID(pVnode), rollback);
20,163✔
712
    taosMemoryFree(pWriter);
20,163✔
713
  }
714
  if (vnodeEnableBgTask(pVnode) != 0) {
20,163✔
715
    tsdbError("vgId:%d, failed to enable bg task", TD_VID(pVnode));
×
716
  }
717
  return code;
20,163✔
718
}
719

720
static int32_t vnodeSnapWriteInfo(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
20,163✔
721
  int32_t       code = 0;
20,163✔
722
  int32_t       lino;
723
  SVnode       *pVnode = pWriter->pVnode;
20,163✔
724
  SSnapDataHdr *pHdr = (SSnapDataHdr *)pData;
20,163✔
725

726
  // decode info
727
  code = vnodeDecodeInfo(pHdr->data, &pWriter->info);
20,163✔
728
  TSDB_CHECK_CODE(code, lino, _exit);
20,163✔
729

730
  // change some value
731
  pWriter->info.state.commitID = pWriter->commitID;
20,163✔
732

733
  // modify info as needed
734
  char dir[TSDB_FILENAME_LEN] = {0};
20,163✔
735
  vnodeGetPrimaryPath(pVnode, false, dir, TSDB_FILENAME_LEN);
20,163✔
736

737
  SVnodeStats vndStats = pWriter->info.config.vndStats;
20,163✔
738
  pWriter->info.config = pVnode->config;
20,163✔
739
  pWriter->info.config.vndStats = vndStats;
20,163✔
740
  vDebug("vgId:%d, save config while write snapshot", pWriter->pVnode->config.vgId);
20,163✔
741
  code = vnodeSaveInfo(dir, &pWriter->info);
20,163✔
742
  TSDB_CHECK_CODE(code, lino, _exit);
20,163✔
743

744
_exit:
20,163✔
745
  return code;
20,163✔
746
}
747

748
int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
218,969✔
749
  int32_t       code = 0;
218,969✔
750
  int32_t       lino;
751
  SSnapDataHdr *pHdr = (SSnapDataHdr *)pData;
218,969✔
752
  SVnode       *pVnode = pWriter->pVnode;
218,969✔
753

754
  if (!(pHdr->size + sizeof(SSnapDataHdr) == nData)) {
218,969✔
755
    return TSDB_CODE_INVALID_PARA;
×
756
  }
757

758
  if (pHdr->index != pWriter->index + 1) {
218,969✔
759
    vError("vgId:%d, unexpected vnode snapshot msg. index:%" PRId64 ", expected index:%" PRId64, TD_VID(pVnode),
×
760
           pHdr->index, pWriter->index + 1);
761
    TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_MSG, lino, _exit);
×
762
  }
763

764
  pWriter->index = pHdr->index;
218,969✔
765

766
  vDebug("vgId:%d, vnode snapshot write data, index:%" PRId64 " type:%d blockLen:%d", TD_VID(pVnode), pHdr->index,
218,969✔
767
         pHdr->type, nData);
768

769
  switch (pHdr->type) {
218,969✔
770
    case SNAP_DATA_CFG: {
20,163✔
771
      code = vnodeSnapWriteInfo(pWriter, pData, nData);
20,163✔
772
      TSDB_CHECK_CODE(code, lino, _exit);
20,163✔
773
    } break;
20,163✔
774
    case SNAP_DATA_META: {
52,028✔
775
      // meta
776
      if (pWriter->pMetaSnapWriter == NULL) {
52,028✔
777
        code = metaSnapWriterOpen(pVnode->pMeta, pWriter->sver, pWriter->ever, &pWriter->pMetaSnapWriter);
16,761✔
778
        TSDB_CHECK_CODE(code, lino, _exit);
16,761✔
779
      }
780

781
      code = metaSnapWrite(pWriter->pMetaSnapWriter, pData, nData);
52,028✔
782
      TSDB_CHECK_CODE(code, lino, _exit);
52,028✔
783
    } break;
52,028✔
784
    case SNAP_DATA_TSDB:
1,280✔
785
    case SNAP_DATA_DEL: {
786
      // tsdb
787
      if (pWriter->pTsdbSnapWriter == NULL) {
1,280✔
788
        code = tsdbSnapWriterOpen(pVnode->pTsdb, pWriter->sver, pWriter->ever, pWriter->pRanges,
640✔
789
                                  &pWriter->pTsdbSnapWriter);
790
        TSDB_CHECK_CODE(code, lino, _exit);
640✔
791
      }
792

793
      code = tsdbSnapWrite(pWriter->pTsdbSnapWriter, pHdr);
1,280✔
794
      TSDB_CHECK_CODE(code, lino, _exit);
1,280✔
795
    } break;
1,280✔
796
    case SNAP_DATA_RAW: {
144,530✔
797
      // tsdb
798
      if (pWriter->pTsdbSnapRAWWriter == NULL) {
144,530✔
799
        code = tsdbSnapRAWWriterOpen(pVnode->pTsdb, pWriter->ever, &pWriter->pTsdbSnapRAWWriter);
16,175✔
800
        TSDB_CHECK_CODE(code, lino, _exit);
16,175✔
801
      }
802

803
      code = tsdbSnapRAWWrite(pWriter->pTsdbSnapRAWWriter, pHdr);
144,530✔
804
      TSDB_CHECK_CODE(code, lino, _exit);
144,530✔
805
    } break;
144,530✔
806
#ifdef USE_TQ
807
    case SNAP_DATA_TQ_HANDLE: {
484✔
808
      // tq handle
809
      if (pWriter->pTqSnapHandleWriter == NULL) {
484✔
810
        code = tqSnapWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pTqSnapHandleWriter);
484✔
811
        TSDB_CHECK_CODE(code, lino, _exit);
484✔
812
      }
813

814
      code = tqSnapHandleWrite(pWriter->pTqSnapHandleWriter, pData, nData);
484✔
815
      TSDB_CHECK_CODE(code, lino, _exit);
484✔
816
    } break;
484✔
817
    case SNAP_DATA_TQ_OFFSET: {
484✔
818
      // tq offset
819
      if (pWriter->pTqSnapOffsetWriter == NULL) {
484✔
820
        code = tqSnapWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pTqSnapOffsetWriter);
484✔
821
        TSDB_CHECK_CODE(code, lino, _exit);
484✔
822
      }
823

824
      code = tqSnapOffsetWrite(pWriter->pTqSnapOffsetWriter, pData, nData);
484✔
825
      TSDB_CHECK_CODE(code, lino, _exit);
484✔
826
    } break;
484✔
827
#endif
828
#ifdef USE_RSMA_ORIGIN
829
    case SNAP_DATA_RSMA1:
830
    case SNAP_DATA_RSMA2:
831
    case SNAP_DATA_QTASK: {
832
      // rsma1/rsma2/qtask for rsma
833
      if (pWriter->pRsmaSnapWriter == NULL) {
834
        code = rsmaSnapWriterOpen(pVnode->pSma, pWriter->sver, pWriter->ever, (void **)pWriter->pRsmaRanges,
835
                                  &pWriter->pRsmaSnapWriter);
836
        TSDB_CHECK_CODE(code, lino, _exit);
837
      }
838

839
      code = rsmaSnapWrite(pWriter->pRsmaSnapWriter, pData, nData);
840
      TSDB_CHECK_CODE(code, lino, _exit);
841
    } break;
842
    case SNAP_DATA_BSE: {
843
      if (pWriter->pBseSnapWriter == NULL) {
844
        code = bseSnapWriterOpen(pVnode->pBse, pWriter->sver, pWriter->ever, &pWriter->pBseSnapWriter);
845
        TSDB_CHECK_CODE(code, lino, _exit);
846
      }
847
      code = bseSnapWriterWrite(pWriter->pBseSnapWriter, pData, nData);
848
      TSDB_CHECK_CODE(code, lino, _exit);
849
    } break;
850
#endif
851
    default:
×
852
      break;
×
853
  }
854
_exit:
218,969✔
855
  if (code) {
218,969✔
856
    vError("vgId:%d, vnode snapshot write failed since %s, index:%" PRId64 " type:%d nData:%d", TD_VID(pVnode),
×
857
           tstrerror(code), pHdr->index, pHdr->type, nData);
858
  }
859
  return code;
218,969✔
860
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc