• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3663

19 Mar 2025 09:21AM UTC coverage: 61.664% (-0.6%) from 62.28%
#3663

push

travis-ci

web-flow
docs: add defination of tmq_config_res_t & fix spell error (#30271)

153169 of 318241 branches covered (48.13%)

Branch coverage included in aggregate %.

239405 of 318390 relevant lines covered (75.19%)

5762846.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.4
/source/dnode/vnode/src/vnd/vnodeSnapshot.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tsdb.h"
17
#include "vnd.h"
18

19
static int32_t vnodeExtractSnapInfoDiff(void *buf, int32_t bufLen, TFileSetRangeArray **ppRanges) {
113✔
20
  int32_t            code = 0;
113✔
21
  STsdbFSetPartList *pList = tsdbFSetPartListCreate();
113✔
22
  if (pList == NULL) {
113!
23
    code = terrno;
×
24
    goto _out;
×
25
  }
26

27
  code = tDeserializeTsdbFSetPartList(buf, bufLen, pList);
113✔
28
  if (code) goto _out;
113!
29

30
  code = tsdbFSetPartListToRangeDiff(pList, ppRanges);
113✔
31
  if (code) goto _out;
113!
32

33
_out:
113✔
34
  tsdbFSetPartListDestroy(&pList);
113✔
35
  return code;
113✔
36
}
37

38
// SVSnapReader ========================================================
39
struct SVSnapReader {
40
  SVnode *pVnode;
41
  int64_t sver;
42
  int64_t ever;
43
  int64_t index;
44
  // config
45
  int8_t cfgDone;
46
  // meta
47
  int8_t           metaDone;
48
  SMetaSnapReader *pMetaReader;
49
  // tsdb
50
  int8_t              tsdbDone;
51
  TFileSetRangeArray *pRanges;
52
  STsdbSnapReader    *pTsdbReader;
53
  // tsdb raw
54
  int8_t              tsdbRAWDone;
55
  STsdbSnapRAWReader *pTsdbRAWReader;
56

57
  // tq
58
  int8_t         tqHandleDone;
59
  STqSnapReader *pTqSnapReader;
60
  int8_t         tqOffsetDone;
61
  STqSnapReader *pTqOffsetReader;
62
  int8_t         tqCheckInfoDone;
63
  STqSnapReader *pTqCheckInfoReader;
64
  // stream
65
  int8_t              streamTaskDone;
66
  SStreamTaskReader  *pStreamTaskReader;
67
  int8_t              streamStateDone;
68
  SStreamStateReader *pStreamStateReader;
69
  // rsma
70
  int8_t              rsmaDone;
71
  TFileSetRangeArray *pRsmaRanges[TSDB_RETENTION_L2];
72
  SRSmaSnapReader    *pRsmaReader;
73
};
74

75
static TFileSetRangeArray **vnodeSnapReaderGetTsdbRanges(SVSnapReader *pReader, int32_t tsdbTyp) {
228✔
76
  if (!(sizeof(pReader->pRsmaRanges) / sizeof(pReader->pRsmaRanges[0]) == 2)) {
77
    terrno = TSDB_CODE_INVALID_PARA;
78
    return NULL;
79
  }
80
  switch (tsdbTyp) {
228!
81
    case SNAP_DATA_TSDB:
114✔
82
      return &pReader->pRanges;
114✔
83
    case SNAP_DATA_RSMA1:
57✔
84
      return &pReader->pRsmaRanges[0];
57✔
85
    case SNAP_DATA_RSMA2:
57✔
86
      return &pReader->pRsmaRanges[1];
57✔
87
    default:
×
88
      return NULL;
×
89
  }
90
}
91

92
static int32_t vnodeSnapReaderDealWithSnapInfo(SVSnapReader *pReader, SSnapshotParam *pParam) {
57✔
93
  int32_t code = 0;
57✔
94
  SVnode *pVnode = pReader->pVnode;
57✔
95

96
  if (pParam->data) {
57!
97
    // decode
98
    SSyncTLV *datHead = (void *)pParam->data;
57✔
99
    if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
57!
100
      code = TSDB_CODE_INVALID_DATA_FMT;
×
101
      terrno = code;
×
102
      goto _out;
×
103
    }
104

105
    STsdbRepOpts         tsdbOpts = {0};
57✔
106
    TFileSetRangeArray **ppRanges = NULL;
57✔
107
    int32_t              offset = 0;
57✔
108

109
    while (offset + sizeof(SSyncTLV) < datHead->len) {
171✔
110
      SSyncTLV *subField = (void *)(datHead->val + offset);
114✔
111
      offset += sizeof(SSyncTLV) + subField->len;
114✔
112
      void   *buf = subField->val;
114✔
113
      int32_t bufLen = subField->len;
114✔
114

115
      switch (subField->typ) {
114!
116
        case SNAP_DATA_TSDB:
57✔
117
        case SNAP_DATA_RSMA1:
118
        case SNAP_DATA_RSMA2: {
119
          ppRanges = vnodeSnapReaderGetTsdbRanges(pReader, subField->typ);
57✔
120
          if (ppRanges == NULL) {
57!
121
            vError("vgId:%d, unexpected subfield type in snapshot param. subtyp:%d", TD_VID(pVnode), subField->typ);
×
122
            code = TSDB_CODE_INVALID_DATA_FMT;
×
123
            goto _out;
×
124
          }
125
          code = vnodeExtractSnapInfoDiff(buf, bufLen, ppRanges);
57✔
126
          if (code) {
57!
127
            vError("vgId:%d, failed to get range diff since %s", TD_VID(pVnode), terrstr());
×
128
            goto _out;
×
129
          }
130
        } break;
57✔
131
        case SNAP_DATA_RAW: {
57✔
132
          code = tDeserializeTsdbRepOpts(buf, bufLen, &tsdbOpts);
57✔
133
          if (code) {
57!
134
            vError("vgId:%d, failed to deserialize tsdb rep opts since %s", TD_VID(pVnode), terrstr());
×
135
            goto _out;
×
136
          }
137
        } break;
57✔
138
        default:
×
139
          vError("vgId:%d, unexpected subfield type of snap info. typ:%d", TD_VID(pVnode), subField->typ);
×
140
          code = TSDB_CODE_INVALID_DATA_FMT;
×
141
          goto _out;
×
142
      }
143
    }
144

145
    // toggle snap replication mode
146
    vInfo("vgId:%d, vnode snap reader supported tsdb rep of format:%d", TD_VID(pVnode), tsdbOpts.format);
57!
147
    if (pReader->sver == 0 && tsdbOpts.format == TSDB_SNAP_REP_FMT_RAW) {
57!
148
      pReader->tsdbDone = true;
57✔
149
    } else {
150
      pReader->tsdbRAWDone = true;
×
151
    }
152

153
    vInfo("vgId:%d, vnode snap writer enabled replication mode: %s", TD_VID(pVnode),
57!
154
          (pReader->tsdbDone ? "raw" : "normal"));
155
  }
156

157
_out:
×
158
  return code;
57✔
159
}
160

161
int32_t vnodeSnapReaderOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapReader **ppReader) {
57✔
162
  int32_t       code = 0;
57✔
163
  int64_t       sver = pParam->start;
57✔
164
  int64_t       ever = pParam->end;
57✔
165
  SVSnapReader *pReader = NULL;
57✔
166

167
  pReader = (SVSnapReader *)taosMemoryCalloc(1, sizeof(*pReader));
57!
168
  if (pReader == NULL) {
57!
169
    code = terrno;
×
170
    goto _exit;
×
171
  }
172
  pReader->pVnode = pVnode;
57✔
173
  pReader->sver = sver;
57✔
174
  pReader->ever = ever;
57✔
175

176
  // snapshot info
177
  code = vnodeSnapReaderDealWithSnapInfo(pReader, pParam);
57✔
178
  if (code) goto _exit;
57!
179

180
  // open tsdb snapshot raw reader
181
  if (!pReader->tsdbRAWDone) {
57!
182
    code = tsdbSnapRAWReaderOpen(pVnode->pTsdb, ever, SNAP_DATA_RAW, &pReader->pTsdbRAWReader);
57✔
183
    if (code) goto _exit;
57!
184
  }
185

186
  // check snapshot ever
187
  SSnapshot snapshot = {0};
57✔
188
  code = vnodeGetSnapshot(pVnode, &snapshot);
57✔
189
  if (code) goto _exit;
57!
190
  if (ever != snapshot.lastApplyIndex) {
57!
191
    vError("vgId:%d, abort reader open due to vnode snapshot changed. ever:%" PRId64 ", commit ver:%" PRId64,
×
192
           TD_VID(pVnode), ever, snapshot.lastApplyIndex);
193
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
194
    goto _exit;
×
195
  }
196

197
_exit:
57✔
198
  if (code) {
57!
199
    vError("vgId:%d, vnode snapshot reader open failed since %s", TD_VID(pVnode), tstrerror(code));
×
200
    *ppReader = NULL;
×
201
  } else {
202
    vInfo("vgId:%d, vnode snapshot reader opened, sver:%" PRId64 " ever:%" PRId64, TD_VID(pVnode), sver, ever);
57!
203
    *ppReader = pReader;
57✔
204
  }
205
  return code;
57✔
206
}
207

208
static void vnodeSnapReaderDestroyTsdbRanges(SVSnapReader *pReader) {
57✔
209
  int32_t tsdbTyps[TSDB_RETENTION_MAX] = {SNAP_DATA_TSDB, SNAP_DATA_RSMA1, SNAP_DATA_RSMA2};
57✔
210
  for (int32_t j = 0; j < TSDB_RETENTION_MAX; ++j) {
228✔
211
    TFileSetRangeArray **ppRanges = vnodeSnapReaderGetTsdbRanges(pReader, tsdbTyps[j]);
171✔
212
    if (ppRanges == NULL) continue;
171!
213
    tsdbTFileSetRangeArrayDestroy(ppRanges);
171✔
214
  }
215
}
57✔
216

217
void vnodeSnapReaderClose(SVSnapReader *pReader) {
57✔
218
  vInfo("vgId:%d, close vnode snapshot reader", TD_VID(pReader->pVnode));
57!
219
  vnodeSnapReaderDestroyTsdbRanges(pReader);
57✔
220
#ifdef USE_RSMA
221
  if (pReader->pRsmaReader) {
57!
222
    rsmaSnapReaderClose(&pReader->pRsmaReader);
×
223
  }
224
#endif
225

226
  if (pReader->pTsdbReader) {
57!
227
    tsdbSnapReaderClose(&pReader->pTsdbReader);
×
228
  }
229

230
  if (pReader->pTsdbRAWReader) {
57!
231
    tsdbSnapRAWReaderClose(&pReader->pTsdbRAWReader);
×
232
  }
233

234
  if (pReader->pMetaReader) {
57!
235
    metaSnapReaderClose(&pReader->pMetaReader);
×
236
  }
237
#ifdef USE_TQ
238
  if (pReader->pTqSnapReader) {
57!
239
    tqSnapReaderClose(&pReader->pTqSnapReader);
×
240
  }
241

242
  if (pReader->pTqOffsetReader) {
57!
243
    tqSnapReaderClose(&pReader->pTqOffsetReader);
×
244
  }
245

246
  if (pReader->pTqCheckInfoReader) {
57!
247
    tqSnapReaderClose(&pReader->pTqCheckInfoReader);
×
248
  }
249
#endif
250
  taosMemoryFree(pReader);
57!
251
}
57✔
252

253
int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) {
2,189✔
254
  int32_t code = 0;
2,189✔
255
  int32_t lino;
256
  SVnode *pVnode = pReader->pVnode;
2,189✔
257
  int32_t vgId = TD_VID(pReader->pVnode);
2,189✔
258

259
  // CONFIG ==============
260
  // FIXME: if commit multiple times and the config changed?
261
  if (!pReader->cfgDone) {
2,189✔
262
    char    fName[TSDB_FILENAME_LEN];
263
    int32_t offset = 0;
57✔
264

265
    vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, fName, TSDB_FILENAME_LEN);
57✔
266
    offset = strlen(fName);
57✔
267
    snprintf(fName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VND_INFO_FNAME);
57✔
268

269
    TdFilePtr pFile = taosOpenFile(fName, TD_FILE_READ);
57✔
270
    if (NULL == pFile) {
57!
271
      code = terrno;
×
272
      TSDB_CHECK_CODE(code, lino, _exit);
×
273
    }
274

275
    int64_t size;
276
    code = taosFStatFile(pFile, &size, NULL);
57✔
277
    if (code != 0) {
57!
278
      if (taosCloseFile(&pFile) != 0) {
×
279
        vError("vgId:%d, failed to close file", vgId);
×
280
      }
281
      TSDB_CHECK_CODE(code, lino, _exit);
×
282
    }
283

284
    *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + size + 1);
57!
285
    if (*ppData == NULL) {
57!
286
      if (taosCloseFile(&pFile) != 0) {
×
287
        vError("vgId:%d, failed to close file", vgId);
×
288
      }
289
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
290
    }
291
    ((SSnapDataHdr *)(*ppData))->type = SNAP_DATA_CFG;
57✔
292
    ((SSnapDataHdr *)(*ppData))->size = size + 1;
57✔
293
    ((SSnapDataHdr *)(*ppData))->data[size] = '\0';
57✔
294

295
    if (taosReadFile(pFile, ((SSnapDataHdr *)(*ppData))->data, size) < 0) {
57!
296
      taosMemoryFree(*ppData);
×
297
      if (taosCloseFile(&pFile) != 0) {
×
298
        vError("vgId:%d, failed to close file", vgId);
×
299
      }
300
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
301
    }
302

303
    if (taosCloseFile(&pFile) != 0) {
57!
304
      vError("vgId:%d, failed to close file", vgId);
×
305
    }
306

307
    pReader->cfgDone = 1;
57✔
308
    goto _exit;
57✔
309
  }
310

311
  // META ==============
312
  if (!pReader->metaDone) {
2,132✔
313
    // open reader if not
314
    if (pReader->pMetaReader == NULL) {
1,763✔
315
      code = metaSnapReaderOpen(pReader->pVnode->pMeta, pReader->sver, pReader->ever, &pReader->pMetaReader);
57✔
316
      TSDB_CHECK_CODE(code, lino, _exit);
57!
317
    }
318

319
    code = metaSnapRead(pReader->pMetaReader, ppData);
1,763✔
320
    TSDB_CHECK_CODE(code, lino, _exit);
1,763!
321

322
    if (*ppData) {
1,763✔
323
      goto _exit;
1,706✔
324
    } else {
325
      pReader->metaDone = 1;
57✔
326
      metaSnapReaderClose(&pReader->pMetaReader);
57✔
327
    }
328
  }
329

330
  // TSDB ==============
331
  if (!pReader->tsdbDone) {
426!
332
    // open if not
333
    if (pReader->pTsdbReader == NULL) {
×
334
      code = tsdbSnapReaderOpen(pReader->pVnode->pTsdb, pReader->sver, pReader->ever, SNAP_DATA_TSDB, pReader->pRanges,
×
335
                                &pReader->pTsdbReader);
336
      TSDB_CHECK_CODE(code, lino, _exit);
×
337
    }
338

339
    code = tsdbSnapRead(pReader->pTsdbReader, ppData);
×
340
    TSDB_CHECK_CODE(code, lino, _exit);
×
341
    if (*ppData) {
×
342
      goto _exit;
×
343
    } else {
344
      pReader->tsdbDone = 1;
×
345
      tsdbSnapReaderClose(&pReader->pTsdbReader);
×
346
    }
347
  }
348

349
  if (!pReader->tsdbRAWDone) {
426✔
350
    // open if not
351
    if (pReader->pTsdbRAWReader == NULL) {
324!
352
      code = tsdbSnapRAWReaderOpen(pReader->pVnode->pTsdb, pReader->ever, SNAP_DATA_RAW, &pReader->pTsdbRAWReader);
×
353
      TSDB_CHECK_CODE(code, lino, _exit);
×
354
    }
355

356
    code = tsdbSnapRAWRead(pReader->pTsdbRAWReader, ppData);
324✔
357
    TSDB_CHECK_CODE(code, lino, _exit);
324!
358
    if (*ppData) {
324✔
359
      goto _exit;
267✔
360
    } else {
361
      pReader->tsdbRAWDone = 1;
57✔
362
      tsdbSnapRAWReaderClose(&pReader->pTsdbRAWReader);
57✔
363
    }
364
  }
365

366
  // TQ ================
367
#ifdef USE_TQ
368
  vInfo("vgId:%d tq transform start", vgId);
159!
369
  if (!pReader->tqHandleDone) {
159✔
370
    if (pReader->pTqSnapReader == NULL) {
104✔
371
      code = tqSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->ever, SNAP_DATA_TQ_HANDLE,
57✔
372
                              &pReader->pTqSnapReader);
373
      TSDB_CHECK_CODE(code, lino, _exit);
57!
374
    }
375

376
    code = tqSnapRead(pReader->pTqSnapReader, ppData);
104✔
377
    TSDB_CHECK_CODE(code, lino, _exit);
104!
378
    if (*ppData) {
104✔
379
      goto _exit;
47✔
380
    } else {
381
      pReader->tqHandleDone = 1;
57✔
382
      tqSnapReaderClose(&pReader->pTqSnapReader);
57✔
383
    }
384
  }
385
  if (!pReader->tqCheckInfoDone) {
112✔
386
    if (pReader->pTqCheckInfoReader == NULL) {
61✔
387
      code = tqSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->ever, SNAP_DATA_TQ_CHECKINFO,
57✔
388
                              &pReader->pTqCheckInfoReader);
389
      TSDB_CHECK_CODE(code, lino, _exit);
57!
390
    }
391

392
    code = tqSnapRead(pReader->pTqCheckInfoReader, ppData);
61✔
393
    TSDB_CHECK_CODE(code, lino, _exit);
61!
394
    if (*ppData) {
61✔
395
      goto _exit;
4✔
396
    } else {
397
      pReader->tqCheckInfoDone = 1;
57✔
398
      tqSnapReaderClose(&pReader->pTqCheckInfoReader);
57✔
399
    }
400
  }
401
  if (!pReader->tqOffsetDone) {
108✔
402
    if (pReader->pTqOffsetReader == NULL) {
104✔
403
      code = tqSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->ever, SNAP_DATA_TQ_OFFSET,
57✔
404
                              &pReader->pTqOffsetReader);
405
      TSDB_CHECK_CODE(code, lino, _exit);
57!
406
    }
407

408
    code = tqSnapRead(pReader->pTqOffsetReader, ppData);
104✔
409
    TSDB_CHECK_CODE(code, lino, _exit);
104!
410
    if (*ppData) {
104✔
411
      goto _exit;
47✔
412
    } else {
413
      pReader->tqOffsetDone = 1;
57✔
414
      tqSnapReaderClose(&pReader->pTqOffsetReader);
57✔
415
    }
416
  }
417
#endif
418
  // STREAM ============
419
#ifdef USE_STREAM
420
  vInfo("vgId:%d stream task start to take snapshot", vgId);
61!
421
  if (!pReader->streamTaskDone) {
61!
422
    if (pReader->pStreamTaskReader == NULL) {
61✔
423
      code = streamTaskSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->sver, &pReader->pStreamTaskReader);
57✔
424
      TSDB_CHECK_CODE(code, lino, _exit);
57!
425
    }
426

427
    code = streamTaskSnapRead(pReader->pStreamTaskReader, ppData);
61✔
428
    TSDB_CHECK_CODE(code, lino, _exit);
61!
429
    if (*ppData) {
61✔
430
      vInfo("vgId:%d no streamTask snapshot", vgId);
4!
431
      goto _exit;
4✔
432
    } else {
433
      pReader->streamTaskDone = 1;
57✔
434
      code = streamTaskSnapReaderClose(pReader->pStreamTaskReader);
57✔
435
      TSDB_CHECK_CODE(code, lino, _exit);
57!
436
      pReader->pStreamTaskReader = NULL;
57✔
437
    }
438
  }
439
  if (!pReader->streamStateDone) {
57!
440
    if (pReader->pStreamStateReader == NULL) {
57!
441
      code =
442
          streamStateSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->sver, &pReader->pStreamStateReader);
57✔
443
      if (code) {
57!
444
        pReader->streamStateDone = 1;
×
445
        pReader->pStreamStateReader = NULL;
×
446
        TSDB_CHECK_CODE(code, lino, _exit);
×
447
      }
448
    }
449
    code = streamStateSnapRead(pReader->pStreamStateReader, ppData);
57✔
450
    TSDB_CHECK_CODE(code, lino, _exit);
57!
451
    if (*ppData) {
57!
452
      goto _exit;
×
453
    } else {
454
      pReader->streamStateDone = 1;
57✔
455
      code = streamStateSnapReaderClose(pReader->pStreamStateReader);
57✔
456
      TSDB_CHECK_CODE(code, lino, _exit);
57!
457
      pReader->pStreamStateReader = NULL;
57✔
458
    }
459
  }
460
#endif
461
  // RSMA ==============
462
#ifdef USE_RSMA
463
  if (VND_IS_RSMA(pReader->pVnode) && !pReader->rsmaDone) {
57!
464
    // open if not
465
    if (pReader->pRsmaReader == NULL) {
×
466
      code = rsmaSnapReaderOpen(pReader->pVnode->pSma, pReader->sver, pReader->ever, &pReader->pRsmaReader);
×
467
      TSDB_CHECK_CODE(code, lino, _exit);
×
468
    }
469

470
    code = rsmaSnapRead(pReader->pRsmaReader, ppData);
×
471
    TSDB_CHECK_CODE(code, lino, _exit);
×
472
    if (*ppData) {
×
473
      goto _exit;
×
474
    } else {
475
      pReader->rsmaDone = 1;
×
476
      rsmaSnapReaderClose(&pReader->pRsmaReader);
×
477
    }
478
  }
479
#endif
480
  *ppData = NULL;
57✔
481
  *nData = 0;
57✔
482

483
_exit:
2,189✔
484
  if (code) {
2,189!
485
    vError("vgId:%d, vnode snapshot read failed at %s:%d since %s", vgId, __FILE__, lino, tstrerror(code));
×
486
  } else {
487
    if (*ppData) {
2,189✔
488
      SSnapDataHdr *pHdr = (SSnapDataHdr *)(*ppData);
2,132✔
489

490
      pReader->index++;
2,132✔
491
      *nData = sizeof(SSnapDataHdr) + pHdr->size;
2,132✔
492
      pHdr->index = pReader->index;
2,132✔
493
      vDebug("vgId:%d, vnode snapshot read data, index:%" PRId64 " type:%d blockLen:%d ", vgId, pReader->index,
2,132!
494
             pHdr->type, *nData);
495
    } else {
496
      vInfo("vgId:%d, vnode snapshot read data end, index:%" PRId64, vgId, pReader->index);
57!
497
    }
498
  }
499
  return code;
2,189✔
500
}
501

502
// SVSnapWriter ========================================================
503
struct SVSnapWriter {
504
  SVnode *pVnode;
505
  int64_t sver;
506
  int64_t ever;
507
  int64_t commitID;
508
  int64_t index;
509
  // config
510
  SVnodeInfo info;
511
  // meta
512
  SMetaSnapWriter *pMetaSnapWriter;
513
  // tsdb
514
  TFileSetRangeArray *pRanges;
515
  STsdbSnapWriter    *pTsdbSnapWriter;
516
  // tsdb raw
517
  STsdbSnapRAWWriter *pTsdbSnapRAWWriter;
518
  // tq
519
  STqSnapWriter *pTqSnapHandleWriter;
520
  STqSnapWriter *pTqSnapOffsetWriter;
521
  STqSnapWriter *pTqSnapCheckInfoWriter;
522
  // stream
523
  SStreamTaskWriter  *pStreamTaskWriter;
524
  SStreamStateWriter *pStreamStateWriter;
525
  // rsma
526
  TFileSetRangeArray *pRsmaRanges[TSDB_RETENTION_L2];
527
  SRSmaSnapWriter    *pRsmaSnapWriter;
528
};
529

530
TFileSetRangeArray **vnodeSnapWriterGetTsdbRanges(SVSnapWriter *pWriter, int32_t tsdbTyp) {
224✔
531
  switch (tsdbTyp) {
224!
532
    case SNAP_DATA_TSDB:
112✔
533
      return &pWriter->pRanges;
112✔
534
    case SNAP_DATA_RSMA1:
56✔
535
      return &pWriter->pRsmaRanges[0];
56✔
536
    case SNAP_DATA_RSMA2:
56✔
537
      return &pWriter->pRsmaRanges[1];
56✔
538
    default:
×
539
      return NULL;
×
540
  }
541
}
542

543
static int32_t vnodeSnapWriterDealWithSnapInfo(SVSnapWriter *pWriter, SSnapshotParam *pParam) {
56✔
544
  SVnode *pVnode = pWriter->pVnode;
56✔
545
  int32_t code = 0;
56✔
546
  int32_t lino;
547

548
  if (pParam->data) {
56!
549
    SSyncTLV *datHead = (void *)pParam->data;
56✔
550
    if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
56!
551
      TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_DATA_FMT, lino, _exit);
×
552
    }
553

554
    STsdbRepOpts         tsdbOpts = {0};
56✔
555
    TFileSetRangeArray **ppRanges = NULL;
56✔
556
    int32_t              offset = 0;
56✔
557

558
    while (offset + sizeof(SSyncTLV) < datHead->len) {
168✔
559
      SSyncTLV *subField = (void *)(datHead->val + offset);
112✔
560
      offset += sizeof(SSyncTLV) + subField->len;
112✔
561
      void   *buf = subField->val;
112✔
562
      int32_t bufLen = subField->len;
112✔
563

564
      switch (subField->typ) {
112!
565
        case SNAP_DATA_TSDB:
56✔
566
        case SNAP_DATA_RSMA1:
567
        case SNAP_DATA_RSMA2: {
568
          ppRanges = vnodeSnapWriterGetTsdbRanges(pWriter, subField->typ);
56✔
569
          if (ppRanges == NULL) {
56!
570
            vError("vgId:%d, unexpected subfield type in snapshot param. subtyp:%d", TD_VID(pVnode), subField->typ);
×
571
            TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
572
          }
573

574
          code = vnodeExtractSnapInfoDiff(buf, bufLen, ppRanges);
56✔
575
          TSDB_CHECK_CODE(code, lino, _exit);
56!
576
        } break;
56✔
577
        case SNAP_DATA_RAW: {
56✔
578
          code = tDeserializeTsdbRepOpts(buf, bufLen, &tsdbOpts);
56✔
579
          TSDB_CHECK_CODE(code, lino, _exit);
56!
580
        } break;
56✔
581
        default:
×
582
          vError("vgId:%d, unexpected subfield type of snap info. typ:%d", TD_VID(pVnode), subField->typ);
×
583
          TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_DATA_FMT, lino, _exit);
×
584
          goto _exit;
585
      }
586
    }
587

588
    vInfo("vgId:%d, vnode snap writer supported tsdb rep of format:%d", TD_VID(pVnode), tsdbOpts.format);
56!
589
  }
590

591
_exit:
×
592
  if (code) {
56!
593
    vError("vgId:%d %s failed at %s:%d since %s", TD_VID(pVnode), __func__, __FILE__, __LINE__, tstrerror(code));
×
594
  }
595
  return code;
56✔
596
}
597

598
extern int32_t tsdbDisableAndCancelAllBgTask(STsdb *pTsdb);
599
extern void    tsdbEnableBgTask(STsdb *pTsdb);
600

601
static int32_t vnodeCancelAndDisableAllBgTask(SVnode *pVnode) {
56✔
602
  TAOS_CHECK_RETURN(tsdbDisableAndCancelAllBgTask(pVnode->pTsdb));
56!
603
  TAOS_CHECK_RETURN(vnodeSyncCommit(pVnode));
56!
604
  return 0;
56✔
605
}
606

607
static int32_t vnodeEnableBgTask(SVnode *pVnode) {
56✔
608
  tsdbEnableBgTask(pVnode->pTsdb);
56✔
609
  return 0;
56✔
610
}
611

612
int32_t vnodeSnapWriterOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapWriter **ppWriter) {
56✔
613
  int32_t       code = 0;
56✔
614
  int32_t       lino;
615
  SVSnapWriter *pWriter = NULL;
56✔
616
  int64_t       sver = pParam->start;
56✔
617
  int64_t       ever = pParam->end;
56✔
618

619
  // disable write, cancel and disable all bg tasks
620
  (void)taosThreadMutexLock(&pVnode->mutex);
56✔
621
  pVnode->disableWrite = true;
56✔
622
  (void)taosThreadMutexUnlock(&pVnode->mutex);
56✔
623

624
  code = vnodeCancelAndDisableAllBgTask(pVnode);
56✔
625
  TSDB_CHECK_CODE(code, lino, _exit);
56!
626

627
  // alloc
628
  pWriter = (SVSnapWriter *)taosMemoryCalloc(1, sizeof(*pWriter));
56!
629
  if (pWriter == NULL) {
56!
630
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
631
  }
632
  pWriter->pVnode = pVnode;
56✔
633
  pWriter->sver = sver;
56✔
634
  pWriter->ever = ever;
56✔
635

636
  // inc commit ID
637
  pWriter->commitID = ++pVnode->state.commitID;
56✔
638

639
  // snapshot info
640
  code = vnodeSnapWriterDealWithSnapInfo(pWriter, pParam);
56✔
641
  TSDB_CHECK_CODE(code, lino, _exit);
56!
642

643
_exit:
56✔
644
  if (code) {
56!
645
    vError("vgId:%d, vnode snapshot writer open failed since %s", TD_VID(pVnode), tstrerror(code));
×
646
    if (pWriter) taosMemoryFreeClear(pWriter);
×
647
    *ppWriter = NULL;
×
648
  } else {
649
    vInfo("vgId:%d, vnode snapshot writer opened, sver:%" PRId64 " ever:%" PRId64 " commit id:%" PRId64, TD_VID(pVnode),
56!
650
          sver, ever, pWriter->commitID);
651
    *ppWriter = pWriter;
56✔
652
  }
653
  return code;
56✔
654
}
655

656
static void vnodeSnapWriterDestroyTsdbRanges(SVSnapWriter *pWriter) {
56✔
657
  int32_t tsdbTyps[TSDB_RETENTION_MAX] = {SNAP_DATA_TSDB, SNAP_DATA_RSMA1, SNAP_DATA_RSMA2};
56✔
658
  for (int32_t j = 0; j < TSDB_RETENTION_MAX; ++j) {
224✔
659
    TFileSetRangeArray **ppRanges = vnodeSnapWriterGetTsdbRanges(pWriter, tsdbTyps[j]);
168✔
660
    if (ppRanges == NULL) continue;
168!
661
    tsdbTFileSetRangeArrayDestroy(ppRanges);
168✔
662
  }
663
}
56✔
664

665
int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *pSnapshot) {
56✔
666
  int32_t code = 0;
56✔
667
  SVnode *pVnode = pWriter->pVnode;
56✔
668

669
  vnodeSnapWriterDestroyTsdbRanges(pWriter);
56✔
670

671
  // prepare
672
  if (pWriter->pTsdbSnapWriter) {
56!
673
    code = tsdbSnapWriterPrepareClose(pWriter->pTsdbSnapWriter, rollback);
×
674
    if (code) goto _exit;
×
675
  }
676

677
  if (pWriter->pTsdbSnapRAWWriter) {
56✔
678
    code = tsdbSnapRAWWriterPrepareClose(pWriter->pTsdbSnapRAWWriter);
53✔
679
    if (code) goto _exit;
53!
680
  }
681
#ifdef USE_RSMA
682
  if (pWriter->pRsmaSnapWriter) {
56!
683
    code = rsmaSnapWriterPrepareClose(pWriter->pRsmaSnapWriter, rollback);
×
684
    if (code) goto _exit;
×
685
  }
686
#endif
687
  // commit json
688
  if (!rollback) {
56!
689
    pWriter->info.state.committed = pWriter->ever;
56✔
690
    pVnode->config = pWriter->info.config;
56✔
691
    pVnode->state = (SVState){.committed = pWriter->info.state.committed,
56✔
692
                              .applied = pWriter->info.state.committed,
56✔
693
                              .commitID = pWriter->commitID,
56✔
694
                              .commitTerm = pWriter->info.state.commitTerm,
56✔
695
                              .applyTerm = pWriter->info.state.commitTerm};
56✔
696
    pVnode->statis = pWriter->info.statis;
56✔
697
    char dir[TSDB_FILENAME_LEN] = {0};
56✔
698
    vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, dir, TSDB_FILENAME_LEN);
56✔
699

700
    code = vnodeCommitInfo(dir);
56✔
701
    if (code) goto _exit;
56!
702

703
  } else {
704
    vnodeRollback(pWriter->pVnode);
×
705
  }
706

707
  // commit/rollback sub-system
708
  if (pWriter->pMetaSnapWriter) {
56✔
709
    code = metaSnapWriterClose(&pWriter->pMetaSnapWriter, rollback);
51✔
710
    if (code) goto _exit;
51!
711
  }
712

713
  if (pWriter->pTsdbSnapWriter) {
56!
714
    code = tsdbSnapWriterClose(&pWriter->pTsdbSnapWriter, rollback);
×
715
    if (code) goto _exit;
×
716
  }
717

718
  if (pWriter->pTsdbSnapRAWWriter) {
56✔
719
    code = tsdbSnapRAWWriterClose(&pWriter->pTsdbSnapRAWWriter, rollback);
53✔
720
    if (code) goto _exit;
53!
721
  }
722
#ifdef USE_TQ
723
  if (pWriter->pTqSnapHandleWriter) {
56✔
724
    code = tqSnapWriterClose(&pWriter->pTqSnapHandleWriter, rollback);
47✔
725
    if (code) goto _exit;
47!
726
  }
727

728
  if (pWriter->pTqSnapCheckInfoWriter) {
56✔
729
    code = tqSnapWriterClose(&pWriter->pTqSnapCheckInfoWriter, rollback);
4✔
730
    if (code) goto _exit;
4!
731
  }
732

733
  if (pWriter->pTqSnapOffsetWriter) {
56✔
734
    code = tqSnapWriterClose(&pWriter->pTqSnapOffsetWriter, rollback);
47✔
735
    if (code) goto _exit;
47!
736
  }
737
#endif
738
#ifdef USE_STREAM
739
  if (pWriter->pStreamTaskWriter) {
56✔
740
    code = streamTaskSnapWriterClose(pWriter->pStreamTaskWriter, rollback, pWriter->pStreamStateWriter == NULL ? 1 : 0);
2✔
741

742
    if (code) goto _exit;
2!
743
  }
744

745
  if (pWriter->pStreamStateWriter) {
56!
746
    code = streamStateSnapWriterClose(pWriter->pStreamStateWriter, rollback);
×
747
    if (code) goto _exit;
×
748

749
    code = streamStateRebuildFromSnap(pWriter->pStreamStateWriter, 0);
×
750
    pWriter->pStreamStateWriter = NULL;
×
751
    if (code) goto _exit;
×
752
  }
753
#endif
754
#ifdef USE_RSMA
755
  if (pWriter->pRsmaSnapWriter) {
56!
756
    code = rsmaSnapWriterClose(&pWriter->pRsmaSnapWriter, rollback);
×
757
    if (code) goto _exit;
×
758
  }
759
#endif
760
  code = vnodeBegin(pVnode);
56✔
761
  if (code) goto _exit;
56!
762

763
  (void)taosThreadMutexLock(&pVnode->mutex);
56✔
764
  pVnode->disableWrite = false;
56✔
765
  (void)taosThreadMutexUnlock(&pVnode->mutex);
56✔
766

767
_exit:
56✔
768
  if (code) {
56!
769
    vError("vgId:%d, vnode snapshot writer close failed since %s", TD_VID(pWriter->pVnode), tstrerror(code));
×
770
  } else {
771
    vInfo("vgId:%d, vnode snapshot writer closed, rollback:%d", TD_VID(pVnode), rollback);
56!
772
    taosMemoryFree(pWriter);
56!
773
  }
774
  if (vnodeEnableBgTask(pVnode) != 0) {
56!
775
    tsdbError("vgId:%d, failed to enable bg task", TD_VID(pVnode));
×
776
  }
777
  return code;
56✔
778
}
779

780
static int32_t vnodeSnapWriteInfo(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
56✔
781
  int32_t       code = 0;
56✔
782
  int32_t       lino;
783
  SVnode       *pVnode = pWriter->pVnode;
56✔
784
  SSnapDataHdr *pHdr = (SSnapDataHdr *)pData;
56✔
785

786
  // decode info
787
  code = vnodeDecodeInfo(pHdr->data, &pWriter->info);
56✔
788
  TSDB_CHECK_CODE(code, lino, _exit);
56!
789

790
  // change some value
791
  pWriter->info.state.commitID = pWriter->commitID;
56✔
792

793
  // modify info as needed
794
  char dir[TSDB_FILENAME_LEN] = {0};
56✔
795
  vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, dir, TSDB_FILENAME_LEN);
56✔
796

797
  SVnodeStats vndStats = pWriter->info.config.vndStats;
56✔
798
  pWriter->info.config = pVnode->config;
56✔
799
  pWriter->info.config.vndStats = vndStats;
56✔
800
  vDebug("vgId:%d, save config while write snapshot", pWriter->pVnode->config.vgId);
56!
801
  code = vnodeSaveInfo(dir, &pWriter->info);
56✔
802
  TSDB_CHECK_CODE(code, lino, _exit);
56!
803

804
_exit:
56✔
805
  return code;
56✔
806
}
807

808
int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
2,104✔
809
  int32_t       code = 0;
2,104✔
810
  int32_t       lino;
811
  SSnapDataHdr *pHdr = (SSnapDataHdr *)pData;
2,104✔
812
  SVnode       *pVnode = pWriter->pVnode;
2,104✔
813

814
  if (!(pHdr->size + sizeof(SSnapDataHdr) == nData)) {
2,104!
815
    return TSDB_CODE_INVALID_PARA;
×
816
  }
817

818
  if (pHdr->index != pWriter->index + 1) {
2,104!
819
    vError("vgId:%d, unexpected vnode snapshot msg. index:%" PRId64 ", expected index:%" PRId64, TD_VID(pVnode),
×
820
           pHdr->index, pWriter->index + 1);
821
    TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_MSG, lino, _exit);
×
822
  }
823

824
  pWriter->index = pHdr->index;
2,104✔
825

826
  vDebug("vgId:%d, vnode snapshot write data, index:%" PRId64 " type:%d blockLen:%d", TD_VID(pVnode), pHdr->index,
2,104!
827
         pHdr->type, nData);
828

829
  switch (pHdr->type) {
2,104!
830
    case SNAP_DATA_CFG: {
56✔
831
      code = vnodeSnapWriteInfo(pWriter, pData, nData);
56✔
832
      TSDB_CHECK_CODE(code, lino, _exit);
56!
833
    } break;
56✔
834
    case SNAP_DATA_META: {
1,702✔
835
      // meta
836
      if (pWriter->pMetaSnapWriter == NULL) {
1,702✔
837
        code = metaSnapWriterOpen(pVnode->pMeta, pWriter->sver, pWriter->ever, &pWriter->pMetaSnapWriter);
51✔
838
        TSDB_CHECK_CODE(code, lino, _exit);
51!
839
      }
840

841
      code = metaSnapWrite(pWriter->pMetaSnapWriter, pData, nData);
1,702✔
842
      TSDB_CHECK_CODE(code, lino, _exit);
1,702!
843
    } break;
1,702✔
844
    case SNAP_DATA_TSDB:
×
845
    case SNAP_DATA_DEL: {
846
      // tsdb
847
      if (pWriter->pTsdbSnapWriter == NULL) {
×
848
        code = tsdbSnapWriterOpen(pVnode->pTsdb, pWriter->sver, pWriter->ever, pWriter->pRanges,
×
849
                                  &pWriter->pTsdbSnapWriter);
850
        TSDB_CHECK_CODE(code, lino, _exit);
×
851
      }
852

853
      code = tsdbSnapWrite(pWriter->pTsdbSnapWriter, pHdr);
×
854
      TSDB_CHECK_CODE(code, lino, _exit);
×
855
    } break;
×
856
    case SNAP_DATA_RAW: {
244✔
857
      // tsdb
858
      if (pWriter->pTsdbSnapRAWWriter == NULL) {
244✔
859
        code = tsdbSnapRAWWriterOpen(pVnode->pTsdb, pWriter->ever, &pWriter->pTsdbSnapRAWWriter);
53✔
860
        TSDB_CHECK_CODE(code, lino, _exit);
53!
861
      }
862

863
      code = tsdbSnapRAWWrite(pWriter->pTsdbSnapRAWWriter, pHdr);
244✔
864
      TSDB_CHECK_CODE(code, lino, _exit);
244!
865
    } break;
244✔
866
#ifdef USE_TQ
867
    case SNAP_DATA_TQ_HANDLE: {
47✔
868
      // tq handle
869
      if (pWriter->pTqSnapHandleWriter == NULL) {
47!
870
        code = tqSnapWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pTqSnapHandleWriter);
47✔
871
        TSDB_CHECK_CODE(code, lino, _exit);
47!
872
      }
873

874
      code = tqSnapHandleWrite(pWriter->pTqSnapHandleWriter, pData, nData);
47✔
875
      TSDB_CHECK_CODE(code, lino, _exit);
47!
876
    } break;
47✔
877
    case SNAP_DATA_TQ_CHECKINFO: {
4✔
878
      // tq checkinfo
879
      if (pWriter->pTqSnapCheckInfoWriter == NULL) {
4!
880
        code = tqSnapWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pTqSnapCheckInfoWriter);
4✔
881
        TSDB_CHECK_CODE(code, lino, _exit);
4!
882
      }
883

884
      code = tqSnapCheckInfoWrite(pWriter->pTqSnapCheckInfoWriter, pData, nData);
4✔
885
      TSDB_CHECK_CODE(code, lino, _exit);
4!
886
    } break;
4✔
887
    case SNAP_DATA_TQ_OFFSET: {
47✔
888
      // tq offset
889
      if (pWriter->pTqSnapOffsetWriter == NULL) {
47!
890
        code = tqSnapWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pTqSnapOffsetWriter);
47✔
891
        TSDB_CHECK_CODE(code, lino, _exit);
47!
892
      }
893

894
      code = tqSnapOffsetWrite(pWriter->pTqSnapOffsetWriter, pData, nData);
47✔
895
      TSDB_CHECK_CODE(code, lino, _exit);
47!
896
    } break;
47✔
897
#endif
898
#ifdef USE_STREAM
899
    case SNAP_DATA_STREAM_TASK:
4✔
900
    case SNAP_DATA_STREAM_TASK_CHECKPOINT: {
901
      if (pWriter->pStreamTaskWriter == NULL) {
4✔
902
        code = streamTaskSnapWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pStreamTaskWriter);
2✔
903
        TSDB_CHECK_CODE(code, lino, _exit);
2!
904
      }
905
      code = streamTaskSnapWrite(pWriter->pStreamTaskWriter, pData, nData);
4✔
906
      TSDB_CHECK_CODE(code, lino, _exit);
4!
907
    } break;
4✔
908
    case SNAP_DATA_STREAM_STATE_BACKEND: {
×
909
      if (pWriter->pStreamStateWriter == NULL) {
×
910
        code = streamStateSnapWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pStreamStateWriter);
×
911
        TSDB_CHECK_CODE(code, lino, _exit);
×
912
      }
913
      code = streamStateSnapWrite(pWriter->pStreamStateWriter, pData, nData);
×
914
      TSDB_CHECK_CODE(code, lino, _exit);
×
915

916
    } break;
×
917
#endif
918
#ifdef USE_RSMA
919
    case SNAP_DATA_RSMA1:
×
920
    case SNAP_DATA_RSMA2:
921
    case SNAP_DATA_QTASK: {
922
      // rsma1/rsma2/qtask for rsma
923
      if (pWriter->pRsmaSnapWriter == NULL) {
×
924
        code = rsmaSnapWriterOpen(pVnode->pSma, pWriter->sver, pWriter->ever, (void **)pWriter->pRsmaRanges,
×
925
                                  &pWriter->pRsmaSnapWriter);
926
        TSDB_CHECK_CODE(code, lino, _exit);
×
927
      }
928

929
      code = rsmaSnapWrite(pWriter->pRsmaSnapWriter, pData, nData);
×
930
      TSDB_CHECK_CODE(code, lino, _exit);
×
931
    } break;
×
932
#endif
933
    default:
×
934
      break;
×
935
  }
936
_exit:
2,104✔
937
  if (code) {
2,104!
938
    vError("vgId:%d, vnode snapshot write failed since %s, index:%" PRId64 " type:%d nData:%d", TD_VID(pVnode),
×
939
           tstrerror(code), pHdr->index, pHdr->type, nData);
940
  }
941
  return code;
2,104✔
942
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc