• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3533

20 Nov 2024 07:11AM UTC coverage: 58.848% (-1.9%) from 60.78%
#3533

push

travis-ci

web-flow
Merge pull request #28823 from taosdata/fix/3.0/TD-32587

fix:[TD-32587]fix stmt segmentation fault

115578 of 252434 branches covered (45.79%)

Branch coverage included in aggregate %.

1 of 4 new or added lines in 1 file covered. (25.0%)

8038 existing lines in 233 files now uncovered.

194926 of 275199 relevant lines covered (70.83%)

1494459.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.02
/source/dnode/vnode/src/vnd/vnodeSnapshot.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tsdb.h"
17
#include "vnd.h"
18

19
static int32_t vnodeExtractSnapInfoDiff(void *buf, int32_t bufLen, TFileSetRangeArray **ppRanges) {
38✔
20
  int32_t            code = 0;
38✔
21
  STsdbFSetPartList *pList = tsdbFSetPartListCreate();
38✔
22
  if (pList == NULL) {
38!
23
    code = terrno;
×
24
    goto _out;
×
25
  }
26

27
  code = tDeserializeTsdbFSetPartList(buf, bufLen, pList);
38✔
28
  if (code) goto _out;
38!
29

30
  code = tsdbFSetPartListToRangeDiff(pList, ppRanges);
38✔
31
  if (code) goto _out;
38!
32

33
_out:
38✔
34
  tsdbFSetPartListDestroy(&pList);
38✔
35
  return code;
38✔
36
}
37

38
// SVSnapReader ========================================================
39
struct SVSnapReader {
40
  SVnode *pVnode;
41
  int64_t sver;
42
  int64_t ever;
43
  int64_t index;
44
  // config
45
  int8_t cfgDone;
46
  // meta
47
  int8_t           metaDone;
48
  SMetaSnapReader *pMetaReader;
49
  // tsdb
50
  int8_t              tsdbDone;
51
  TFileSetRangeArray *pRanges;
52
  STsdbSnapReader    *pTsdbReader;
53
  // tsdb raw
54
  int8_t              tsdbRAWDone;
55
  STsdbSnapRAWReader *pTsdbRAWReader;
56

57
  // tq
58
  int8_t         tqHandleDone;
59
  STqSnapReader *pTqSnapReader;
60
  int8_t         tqOffsetDone;
61
  STqSnapReader *pTqOffsetReader;
62
  int8_t         tqCheckInfoDone;
63
  STqSnapReader *pTqCheckInfoReader;
64
  // stream
65
  int8_t              streamTaskDone;
66
  SStreamTaskReader  *pStreamTaskReader;
67
  int8_t              streamStateDone;
68
  SStreamStateReader *pStreamStateReader;
69
  // rsma
70
  int8_t              rsmaDone;
71
  TFileSetRangeArray *pRsmaRanges[TSDB_RETENTION_L2];
72
  SRSmaSnapReader    *pRsmaReader;
73
};
74

75
static TFileSetRangeArray **vnodeSnapReaderGetTsdbRanges(SVSnapReader *pReader, int32_t tsdbTyp) {
76✔
76
  if (!(sizeof(pReader->pRsmaRanges) / sizeof(pReader->pRsmaRanges[0]) == 2)) {
77
    terrno = TSDB_CODE_INVALID_PARA;
78
    return NULL;
79
  }
80
  switch (tsdbTyp) {
76!
81
    case SNAP_DATA_TSDB:
38✔
82
      return &pReader->pRanges;
38✔
83
    case SNAP_DATA_RSMA1:
19✔
84
      return &pReader->pRsmaRanges[0];
19✔
85
    case SNAP_DATA_RSMA2:
19✔
86
      return &pReader->pRsmaRanges[1];
19✔
87
    default:
×
88
      return NULL;
×
89
  }
90
}
91

92
static int32_t vnodeSnapReaderDealWithSnapInfo(SVSnapReader *pReader, SSnapshotParam *pParam) {
19✔
93
  int32_t code = 0;
19✔
94
  SVnode *pVnode = pReader->pVnode;
19✔
95

96
  if (pParam->data) {
19!
97
    // decode
98
    SSyncTLV *datHead = (void *)pParam->data;
19✔
99
    if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
19!
100
      code = TSDB_CODE_INVALID_DATA_FMT;
×
101
      terrno = code;
×
102
      goto _out;
×
103
    }
104

105
    STsdbRepOpts         tsdbOpts = {0};
19✔
106
    TFileSetRangeArray **ppRanges = NULL;
19✔
107
    int32_t              offset = 0;
19✔
108

109
    while (offset + sizeof(SSyncTLV) < datHead->len) {
57✔
110
      SSyncTLV *subField = (void *)(datHead->val + offset);
38✔
111
      offset += sizeof(SSyncTLV) + subField->len;
38✔
112
      void   *buf = subField->val;
38✔
113
      int32_t bufLen = subField->len;
38✔
114

115
      switch (subField->typ) {
38!
116
        case SNAP_DATA_TSDB:
19✔
117
        case SNAP_DATA_RSMA1:
118
        case SNAP_DATA_RSMA2: {
119
          ppRanges = vnodeSnapReaderGetTsdbRanges(pReader, subField->typ);
19✔
120
          if (ppRanges == NULL) {
19!
121
            vError("vgId:%d, unexpected subfield type in snapshot param. subtyp:%d", TD_VID(pVnode), subField->typ);
×
122
            code = TSDB_CODE_INVALID_DATA_FMT;
×
123
            goto _out;
×
124
          }
125
          code = vnodeExtractSnapInfoDiff(buf, bufLen, ppRanges);
19✔
126
          if (code) {
19!
127
            vError("vgId:%d, failed to get range diff since %s", TD_VID(pVnode), terrstr());
×
128
            goto _out;
×
129
          }
130
        } break;
19✔
131
        case SNAP_DATA_RAW: {
19✔
132
          code = tDeserializeTsdbRepOpts(buf, bufLen, &tsdbOpts);
19✔
133
          if (code) {
19!
134
            vError("vgId:%d, failed to deserialize tsdb rep opts since %s", TD_VID(pVnode), terrstr());
×
135
            goto _out;
×
136
          }
137
        } break;
19✔
138
        default:
×
139
          vError("vgId:%d, unexpected subfield type of snap info. typ:%d", TD_VID(pVnode), subField->typ);
×
140
          code = TSDB_CODE_INVALID_DATA_FMT;
×
141
          goto _out;
×
142
      }
143
    }
144

145
    // toggle snap replication mode
146
    vInfo("vgId:%d, vnode snap reader supported tsdb rep of format:%d", TD_VID(pVnode), tsdbOpts.format);
19!
147
    if (pReader->sver == 0 && tsdbOpts.format == TSDB_SNAP_REP_FMT_RAW) {
19!
148
      pReader->tsdbDone = true;
19✔
149
    } else {
150
      pReader->tsdbRAWDone = true;
×
151
    }
152

153
    vInfo("vgId:%d, vnode snap writer enabled replication mode: %s", TD_VID(pVnode),
19!
154
          (pReader->tsdbDone ? "raw" : "normal"));
155
  }
156

157
_out:
×
158
  return code;
19✔
159
}
160

161
int32_t vnodeSnapReaderOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapReader **ppReader) {
19✔
162
  int32_t       code = 0;
19✔
163
  int64_t       sver = pParam->start;
19✔
164
  int64_t       ever = pParam->end;
19✔
165
  SVSnapReader *pReader = NULL;
19✔
166

167
  pReader = (SVSnapReader *)taosMemoryCalloc(1, sizeof(*pReader));
19✔
168
  if (pReader == NULL) {
19!
169
    code = terrno;
×
170
    goto _exit;
×
171
  }
172
  pReader->pVnode = pVnode;
19✔
173
  pReader->sver = sver;
19✔
174
  pReader->ever = ever;
19✔
175

176
  // snapshot info
177
  code = vnodeSnapReaderDealWithSnapInfo(pReader, pParam);
19✔
178
  if (code) goto _exit;
19!
179

180
  // open tsdb snapshot raw reader
181
  if (!pReader->tsdbRAWDone) {
19!
182
    code = tsdbSnapRAWReaderOpen(pVnode->pTsdb, ever, SNAP_DATA_RAW, &pReader->pTsdbRAWReader);
19✔
183
    if (code) goto _exit;
19!
184
  }
185

186
  // check snapshot ever
187
  SSnapshot snapshot = {0};
19✔
188
  code = vnodeGetSnapshot(pVnode, &snapshot);
19✔
189
  if (code) goto _exit;
19!
190
  if (ever != snapshot.lastApplyIndex) {
19!
191
    vError("vgId:%d, abort reader open due to vnode snapshot changed. ever:%" PRId64 ", commit ver:%" PRId64,
×
192
           TD_VID(pVnode), ever, snapshot.lastApplyIndex);
193
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
194
    goto _exit;
×
195
  }
196

197
_exit:
19✔
198
  if (code) {
19!
199
    vError("vgId:%d, vnode snapshot reader open failed since %s", TD_VID(pVnode), tstrerror(code));
×
200
    *ppReader = NULL;
×
201
  } else {
202
    vInfo("vgId:%d, vnode snapshot reader opened, sver:%" PRId64 " ever:%" PRId64, TD_VID(pVnode), sver, ever);
19!
203
    *ppReader = pReader;
19✔
204
  }
205
  return code;
19✔
206
}
207

208
static void vnodeSnapReaderDestroyTsdbRanges(SVSnapReader *pReader) {
19✔
209
  int32_t tsdbTyps[TSDB_RETENTION_MAX] = {SNAP_DATA_TSDB, SNAP_DATA_RSMA1, SNAP_DATA_RSMA2};
19✔
210
  for (int32_t j = 0; j < TSDB_RETENTION_MAX; ++j) {
76✔
211
    TFileSetRangeArray **ppRanges = vnodeSnapReaderGetTsdbRanges(pReader, tsdbTyps[j]);
57✔
212
    if (ppRanges == NULL) continue;
57!
213
    tsdbTFileSetRangeArrayDestroy(ppRanges);
57✔
214
  }
215
}
19✔
216

217
void vnodeSnapReaderClose(SVSnapReader *pReader) {
19✔
218
  vInfo("vgId:%d, close vnode snapshot reader", TD_VID(pReader->pVnode));
19!
219
  vnodeSnapReaderDestroyTsdbRanges(pReader);
19✔
220

221
  if (pReader->pRsmaReader) {
19!
222
    rsmaSnapReaderClose(&pReader->pRsmaReader);
×
223
  }
224

225
  if (pReader->pTsdbReader) {
19!
226
    tsdbSnapReaderClose(&pReader->pTsdbReader);
×
227
  }
228

229
  if (pReader->pTsdbRAWReader) {
19!
230
    tsdbSnapRAWReaderClose(&pReader->pTsdbRAWReader);
×
231
  }
232

233
  if (pReader->pMetaReader) {
19!
234
    metaSnapReaderClose(&pReader->pMetaReader);
×
235
  }
236

237
  if (pReader->pTqSnapReader) {
19!
238
    tqSnapReaderClose(&pReader->pTqSnapReader);
×
239
  }
240

241
  if (pReader->pTqOffsetReader) {
19!
242
    tqSnapReaderClose(&pReader->pTqOffsetReader);
×
243
  }
244

245
  if (pReader->pTqCheckInfoReader) {
19!
246
    tqSnapReaderClose(&pReader->pTqCheckInfoReader);
×
247
  }
248

249
  taosMemoryFree(pReader);
19✔
250
}
19✔
251

252
int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) {
1,405✔
253
  int32_t code = 0;
1,405✔
254
  int32_t lino;
255
  SVnode *pVnode = pReader->pVnode;
1,405✔
256
  int32_t vgId = TD_VID(pReader->pVnode);
1,405✔
257

258
  // CONFIG ==============
259
  // FIXME: if commit multiple times and the config changed?
260
  if (!pReader->cfgDone) {
1,405✔
261
    char    fName[TSDB_FILENAME_LEN];
262
    int32_t offset = 0;
19✔
263

264
    vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, fName, TSDB_FILENAME_LEN);
19✔
265
    offset = strlen(fName);
19✔
266
    snprintf(fName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VND_INFO_FNAME);
19✔
267

268
    TdFilePtr pFile = taosOpenFile(fName, TD_FILE_READ);
19✔
269
    if (NULL == pFile) {
19!
270
      code = terrno;
×
271
      TSDB_CHECK_CODE(code, lino, _exit);
×
272
    }
273

274
    int64_t size;
275
    code = taosFStatFile(pFile, &size, NULL);
19✔
276
    if (code != 0) {
19!
277
      if (taosCloseFile(&pFile) != 0) {
×
278
        vError("vgId:%d, failed to close file", vgId);
×
279
      }
280
      TSDB_CHECK_CODE(code, lino, _exit);
×
281
    }
282

283
    *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + size + 1);
19✔
284
    if (*ppData == NULL) {
19!
285
      if (taosCloseFile(&pFile) != 0) {
×
286
        vError("vgId:%d, failed to close file", vgId);
×
287
      }
288
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
289
    }
290
    ((SSnapDataHdr *)(*ppData))->type = SNAP_DATA_CFG;
19✔
291
    ((SSnapDataHdr *)(*ppData))->size = size + 1;
19✔
292
    ((SSnapDataHdr *)(*ppData))->data[size] = '\0';
19✔
293

294
    if (taosReadFile(pFile, ((SSnapDataHdr *)(*ppData))->data, size) < 0) {
19!
295
      taosMemoryFree(*ppData);
×
296
      if (taosCloseFile(&pFile) != 0) {
×
297
        vError("vgId:%d, failed to close file", vgId);
×
298
      }
299
      TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
300
    }
301

302
    if (taosCloseFile(&pFile) != 0) {
19!
303
      vError("vgId:%d, failed to close file", vgId);
×
304
    }
305

306
    pReader->cfgDone = 1;
19✔
307
    goto _exit;
19✔
308
  }
309

310
  // META ==============
311
  if (!pReader->metaDone) {
1,386✔
312
    // open reader if not
313
    if (pReader->pMetaReader == NULL) {
1,221✔
314
      code = metaSnapReaderOpen(pReader->pVnode->pMeta, pReader->sver, pReader->ever, &pReader->pMetaReader);
19✔
315
      TSDB_CHECK_CODE(code, lino, _exit);
19!
316
    }
317

318
    code = metaSnapRead(pReader->pMetaReader, ppData);
1,221✔
319
    TSDB_CHECK_CODE(code, lino, _exit);
1,221!
320

321
    if (*ppData) {
1,221✔
322
      goto _exit;
1,202✔
323
    } else {
324
      pReader->metaDone = 1;
19✔
325
      metaSnapReaderClose(&pReader->pMetaReader);
19✔
326
    }
327
  }
328

329
  // TSDB ==============
330
  if (!pReader->tsdbDone) {
184!
331
    // open if not
332
    if (pReader->pTsdbReader == NULL) {
×
333
      code = tsdbSnapReaderOpen(pReader->pVnode->pTsdb, pReader->sver, pReader->ever, SNAP_DATA_TSDB, pReader->pRanges,
×
334
                                &pReader->pTsdbReader);
335
      TSDB_CHECK_CODE(code, lino, _exit);
×
336
    }
337

338
    code = tsdbSnapRead(pReader->pTsdbReader, ppData);
×
339
    TSDB_CHECK_CODE(code, lino, _exit);
×
340
    if (*ppData) {
×
341
      goto _exit;
×
342
    } else {
343
      pReader->tsdbDone = 1;
×
344
      tsdbSnapReaderClose(&pReader->pTsdbReader);
×
345
    }
346
  }
347

348
  if (!pReader->tsdbRAWDone) {
184✔
349
    // open if not
350
    if (pReader->pTsdbRAWReader == NULL) {
146!
351
      code = tsdbSnapRAWReaderOpen(pReader->pVnode->pTsdb, pReader->ever, SNAP_DATA_RAW, &pReader->pTsdbRAWReader);
×
352
      TSDB_CHECK_CODE(code, lino, _exit);
×
353
    }
354

355
    code = tsdbSnapRAWRead(pReader->pTsdbRAWReader, ppData);
146✔
356
    TSDB_CHECK_CODE(code, lino, _exit);
146!
357
    if (*ppData) {
146✔
358
      goto _exit;
127✔
359
    } else {
360
      pReader->tsdbRAWDone = 1;
19✔
361
      tsdbSnapRAWReaderClose(&pReader->pTsdbRAWReader);
19✔
362
    }
363
  }
364

365
  // TQ ================
366
  vInfo("vgId:%d tq transform start", vgId);
57!
367
  if (!pReader->tqHandleDone) {
57✔
368
    if (pReader->pTqSnapReader == NULL) {
36✔
369
      code = tqSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->ever, SNAP_DATA_TQ_HANDLE,
19✔
370
                              &pReader->pTqSnapReader);
371
      TSDB_CHECK_CODE(code, lino, _exit);
19!
372
    }
373

374
    code = tqSnapRead(pReader->pTqSnapReader, ppData);
36✔
375
    TSDB_CHECK_CODE(code, lino, _exit);
36!
376
    if (*ppData) {
36✔
377
      goto _exit;
17✔
378
    } else {
379
      pReader->tqHandleDone = 1;
19✔
380
      tqSnapReaderClose(&pReader->pTqSnapReader);
19✔
381
    }
382
  }
383
  if (!pReader->tqCheckInfoDone) {
40✔
384
    if (pReader->pTqCheckInfoReader == NULL) {
19!
385
      code = tqSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->ever, SNAP_DATA_TQ_CHECKINFO,
19✔
386
                              &pReader->pTqCheckInfoReader);
387
      TSDB_CHECK_CODE(code, lino, _exit);
19!
388
    }
389

390
    code = tqSnapRead(pReader->pTqCheckInfoReader, ppData);
19✔
391
    TSDB_CHECK_CODE(code, lino, _exit);
19!
392
    if (*ppData) {
19!
UNCOV
393
      goto _exit;
×
394
    } else {
395
      pReader->tqCheckInfoDone = 1;
19✔
396
      tqSnapReaderClose(&pReader->pTqCheckInfoReader);
19✔
397
    }
398
  }
399
  if (!pReader->tqOffsetDone) {
40✔
400
    if (pReader->pTqOffsetReader == NULL) {
36✔
401
      code = tqSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->ever, SNAP_DATA_TQ_OFFSET,
19✔
402
                              &pReader->pTqOffsetReader);
403
      TSDB_CHECK_CODE(code, lino, _exit);
19!
404
    }
405

406
    code = tqSnapRead(pReader->pTqOffsetReader, ppData);
36✔
407
    TSDB_CHECK_CODE(code, lino, _exit);
36!
408
    if (*ppData) {
36✔
409
      goto _exit;
17✔
410
    } else {
411
      pReader->tqOffsetDone = 1;
19✔
412
      tqSnapReaderClose(&pReader->pTqOffsetReader);
19✔
413
    }
414
  }
415

416
  // STREAM ============
417
  vInfo("vgId:%d stream task start to take snapshot", vgId);
23!
418
  if (!pReader->streamTaskDone) {
23!
419
    if (pReader->pStreamTaskReader == NULL) {
23✔
420
      code = streamTaskSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->sver, &pReader->pStreamTaskReader);
19✔
421
      TSDB_CHECK_CODE(code, lino, _exit);
19!
422
    }
423

424
    code = streamTaskSnapRead(pReader->pStreamTaskReader, ppData);
23✔
425
    TSDB_CHECK_CODE(code, lino, _exit);
23!
426
    if (*ppData) {
23✔
427
      vInfo("vgId:%d no streamTask snapshot", vgId);
4!
428
      goto _exit;
4✔
429
    } else {
430
      pReader->streamTaskDone = 1;
19✔
431
      code = streamTaskSnapReaderClose(pReader->pStreamTaskReader);
19✔
432
      TSDB_CHECK_CODE(code, lino, _exit);
19!
433
      pReader->pStreamTaskReader = NULL;
19✔
434
    }
435
  }
436
  if (!pReader->streamStateDone) {
19!
437
    if (pReader->pStreamStateReader == NULL) {
19!
438
      code =
439
          streamStateSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->sver, &pReader->pStreamStateReader);
19✔
440
      if (code) {
19!
441
        pReader->streamStateDone = 1;
×
442
        pReader->pStreamStateReader = NULL;
×
443
        TSDB_CHECK_CODE(code, lino, _exit);
×
444
      }
445
    }
446
    code = streamStateSnapRead(pReader->pStreamStateReader, ppData);
19✔
447
    TSDB_CHECK_CODE(code, lino, _exit);
19!
448
    if (*ppData) {
19!
UNCOV
449
      goto _exit;
×
450
    } else {
451
      pReader->streamStateDone = 1;
19✔
452
      code = streamStateSnapReaderClose(pReader->pStreamStateReader);
19✔
453
      TSDB_CHECK_CODE(code, lino, _exit);
19!
454
      pReader->pStreamStateReader = NULL;
19✔
455
    }
456
  }
457

458
  // RSMA ==============
459
  if (VND_IS_RSMA(pReader->pVnode) && !pReader->rsmaDone) {
19!
460
    // open if not
461
    if (pReader->pRsmaReader == NULL) {
×
462
      code = rsmaSnapReaderOpen(pReader->pVnode->pSma, pReader->sver, pReader->ever, &pReader->pRsmaReader);
×
463
      TSDB_CHECK_CODE(code, lino, _exit);
×
464
    }
465

466
    code = rsmaSnapRead(pReader->pRsmaReader, ppData);
×
467
    TSDB_CHECK_CODE(code, lino, _exit);
×
468
    if (*ppData) {
×
469
      goto _exit;
×
470
    } else {
471
      pReader->rsmaDone = 1;
×
472
      rsmaSnapReaderClose(&pReader->pRsmaReader);
×
473
    }
474
  }
475

476
  *ppData = NULL;
19✔
477
  *nData = 0;
19✔
478

479
_exit:
1,405✔
480
  if (code) {
1,405!
481
    vError("vgId:%d, vnode snapshot read failed at %s:%d since %s", vgId, __FILE__, lino, tstrerror(code));
×
482
  } else {
483
    if (*ppData) {
1,405✔
484
      SSnapDataHdr *pHdr = (SSnapDataHdr *)(*ppData);
1,386✔
485

486
      pReader->index++;
1,386✔
487
      *nData = sizeof(SSnapDataHdr) + pHdr->size;
1,386✔
488
      pHdr->index = pReader->index;
1,386✔
489
      vDebug("vgId:%d, vnode snapshot read data, index:%" PRId64 " type:%d blockLen:%d ", vgId, pReader->index,
1,386!
490
             pHdr->type, *nData);
491
    } else {
492
      vInfo("vgId:%d, vnode snapshot read data end, index:%" PRId64, vgId, pReader->index);
19!
493
    }
494
  }
495
  return code;
1,405✔
496
}
497

498
// SVSnapWriter ========================================================
499
struct SVSnapWriter {
500
  SVnode *pVnode;
501
  int64_t sver;
502
  int64_t ever;
503
  int64_t commitID;
504
  int64_t index;
505
  // config
506
  SVnodeInfo info;
507
  // meta
508
  SMetaSnapWriter *pMetaSnapWriter;
509
  // tsdb
510
  TFileSetRangeArray *pRanges;
511
  STsdbSnapWriter    *pTsdbSnapWriter;
512
  // tsdb raw
513
  STsdbSnapRAWWriter *pTsdbSnapRAWWriter;
514
  // tq
515
  STqSnapWriter *pTqSnapHandleWriter;
516
  STqSnapWriter *pTqSnapOffsetWriter;
517
  STqSnapWriter *pTqSnapCheckInfoWriter;
518
  // stream
519
  SStreamTaskWriter  *pStreamTaskWriter;
520
  SStreamStateWriter *pStreamStateWriter;
521
  // rsma
522
  TFileSetRangeArray *pRsmaRanges[TSDB_RETENTION_L2];
523
  SRSmaSnapWriter    *pRsmaSnapWriter;
524
};
525

526
TFileSetRangeArray **vnodeSnapWriterGetTsdbRanges(SVSnapWriter *pWriter, int32_t tsdbTyp) {
76✔
527
  switch (tsdbTyp) {
76!
528
    case SNAP_DATA_TSDB:
38✔
529
      return &pWriter->pRanges;
38✔
530
    case SNAP_DATA_RSMA1:
19✔
531
      return &pWriter->pRsmaRanges[0];
19✔
532
    case SNAP_DATA_RSMA2:
19✔
533
      return &pWriter->pRsmaRanges[1];
19✔
534
    default:
×
535
      return NULL;
×
536
  }
537
}
538

539
static int32_t vnodeSnapWriterDealWithSnapInfo(SVSnapWriter *pWriter, SSnapshotParam *pParam) {
19✔
540
  SVnode *pVnode = pWriter->pVnode;
19✔
541
  int32_t code = 0;
19✔
542
  int32_t lino;
543

544
  if (pParam->data) {
19!
545
    SSyncTLV *datHead = (void *)pParam->data;
19✔
546
    if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
19!
547
      TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_DATA_FMT, lino, _exit);
×
548
    }
549

550
    STsdbRepOpts         tsdbOpts = {0};
19✔
551
    TFileSetRangeArray **ppRanges = NULL;
19✔
552
    int32_t              offset = 0;
19✔
553

554
    while (offset + sizeof(SSyncTLV) < datHead->len) {
57✔
555
      SSyncTLV *subField = (void *)(datHead->val + offset);
38✔
556
      offset += sizeof(SSyncTLV) + subField->len;
38✔
557
      void   *buf = subField->val;
38✔
558
      int32_t bufLen = subField->len;
38✔
559

560
      switch (subField->typ) {
38!
561
        case SNAP_DATA_TSDB:
19✔
562
        case SNAP_DATA_RSMA1:
563
        case SNAP_DATA_RSMA2: {
564
          ppRanges = vnodeSnapWriterGetTsdbRanges(pWriter, subField->typ);
19✔
565
          if (ppRanges == NULL) {
19!
566
            vError("vgId:%d, unexpected subfield type in snapshot param. subtyp:%d", TD_VID(pVnode), subField->typ);
×
567
            TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
568
          }
569

570
          code = vnodeExtractSnapInfoDiff(buf, bufLen, ppRanges);
19✔
571
          TSDB_CHECK_CODE(code, lino, _exit);
19!
572
        } break;
19✔
573
        case SNAP_DATA_RAW: {
19✔
574
          code = tDeserializeTsdbRepOpts(buf, bufLen, &tsdbOpts);
19✔
575
          TSDB_CHECK_CODE(code, lino, _exit);
19!
576
        } break;
19✔
577
        default:
×
578
          vError("vgId:%d, unexpected subfield type of snap info. typ:%d", TD_VID(pVnode), subField->typ);
×
579
          TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_DATA_FMT, lino, _exit);
×
580
          goto _exit;
581
      }
582
    }
583

584
    vInfo("vgId:%d, vnode snap writer supported tsdb rep of format:%d", TD_VID(pVnode), tsdbOpts.format);
19!
585
  }
586

587
_exit:
×
588
  if (code) {
19!
589
    vError("vgId:%d %s failed at %s:%d since %s", TD_VID(pVnode), __func__, __FILE__, __LINE__, tstrerror(code));
×
590
  }
591
  return code;
19✔
592
}
593

594
extern int32_t tsdbDisableAndCancelAllBgTask(STsdb *pTsdb);
595
extern void    tsdbEnableBgTask(STsdb *pTsdb);
596

597
static int32_t vnodeCancelAndDisableAllBgTask(SVnode *pVnode) {
19✔
598
  TAOS_CHECK_RETURN(tsdbDisableAndCancelAllBgTask(pVnode->pTsdb));
19!
599
  TAOS_CHECK_RETURN(vnodeSyncCommit(pVnode));
19!
600
  TAOS_CHECK_RETURN(vnodeAChannelDestroy(&pVnode->commitChannel, true));
19!
601
  return 0;
19✔
602
}
603

604
static int32_t vnodeEnableBgTask(SVnode *pVnode) {
19✔
605
  tsdbEnableBgTask(pVnode->pTsdb);
19✔
606
  TAOS_CHECK_RETURN(vnodeAChannelInit(1, &pVnode->commitChannel));
19!
607
  return 0;
19✔
608
}
609

610
int32_t vnodeSnapWriterOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapWriter **ppWriter) {
19✔
611
  int32_t       code = 0;
19✔
612
  int32_t       lino;
613
  SVSnapWriter *pWriter = NULL;
19✔
614
  int64_t       sver = pParam->start;
19✔
615
  int64_t       ever = pParam->end;
19✔
616

617
  // disable write, cancel and disable all bg tasks
618
  (void)taosThreadMutexLock(&pVnode->mutex);
19✔
619
  pVnode->disableWrite = true;
19✔
620
  (void)taosThreadMutexUnlock(&pVnode->mutex);
19✔
621

622
  code = vnodeCancelAndDisableAllBgTask(pVnode);
19✔
623
  TSDB_CHECK_CODE(code, lino, _exit);
19!
624

625
  // alloc
626
  pWriter = (SVSnapWriter *)taosMemoryCalloc(1, sizeof(*pWriter));
19✔
627
  if (pWriter == NULL) {
19!
628
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
629
  }
630
  pWriter->pVnode = pVnode;
19✔
631
  pWriter->sver = sver;
19✔
632
  pWriter->ever = ever;
19✔
633

634
  // inc commit ID
635
  pWriter->commitID = ++pVnode->state.commitID;
19✔
636

637
  // snapshot info
638
  code = vnodeSnapWriterDealWithSnapInfo(pWriter, pParam);
19✔
639
  TSDB_CHECK_CODE(code, lino, _exit);
19!
640

641
_exit:
19✔
642
  if (code) {
19!
643
    vError("vgId:%d, vnode snapshot writer open failed since %s", TD_VID(pVnode), tstrerror(code));
×
644
    if (pWriter) taosMemoryFreeClear(pWriter);
×
645
    *ppWriter = NULL;
×
646
  } else {
647
    vInfo("vgId:%d, vnode snapshot writer opened, sver:%" PRId64 " ever:%" PRId64 " commit id:%" PRId64, TD_VID(pVnode),
19!
648
          sver, ever, pWriter->commitID);
649
    *ppWriter = pWriter;
19✔
650
  }
651
  return code;
19✔
652
}
653

654
static void vnodeSnapWriterDestroyTsdbRanges(SVSnapWriter *pWriter) {
19✔
655
  int32_t tsdbTyps[TSDB_RETENTION_MAX] = {SNAP_DATA_TSDB, SNAP_DATA_RSMA1, SNAP_DATA_RSMA2};
19✔
656
  for (int32_t j = 0; j < TSDB_RETENTION_MAX; ++j) {
76✔
657
    TFileSetRangeArray **ppRanges = vnodeSnapWriterGetTsdbRanges(pWriter, tsdbTyps[j]);
57✔
658
    if (ppRanges == NULL) continue;
57!
659
    tsdbTFileSetRangeArrayDestroy(ppRanges);
57✔
660
  }
661
}
19✔
662

663
int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *pSnapshot) {
19✔
664
  int32_t code = 0;
19✔
665
  SVnode *pVnode = pWriter->pVnode;
19✔
666

667
  vnodeSnapWriterDestroyTsdbRanges(pWriter);
19✔
668

669
  // prepare
670
  if (pWriter->pTsdbSnapWriter) {
19!
671
    code = tsdbSnapWriterPrepareClose(pWriter->pTsdbSnapWriter, rollback);
×
672
    if (code) goto _exit;
×
673
  }
674

675
  if (pWriter->pTsdbSnapRAWWriter) {
19!
676
    code = tsdbSnapRAWWriterPrepareClose(pWriter->pTsdbSnapRAWWriter);
19✔
677
    if (code) goto _exit;
19!
678
  }
679

680
  if (pWriter->pRsmaSnapWriter) {
19!
681
    code = rsmaSnapWriterPrepareClose(pWriter->pRsmaSnapWriter, rollback);
×
682
    if (code) goto _exit;
×
683
  }
684

685
  // commit json
686
  if (!rollback) {
19!
687
    pWriter->info.state.committed = pWriter->ever;
19✔
688
    pVnode->config = pWriter->info.config;
19✔
689
    pVnode->state = (SVState){.committed = pWriter->info.state.committed,
19✔
690
                              .applied = pWriter->info.state.committed,
19✔
691
                              .commitID = pWriter->commitID,
19✔
692
                              .commitTerm = pWriter->info.state.commitTerm,
19✔
693
                              .applyTerm = pWriter->info.state.commitTerm};
19✔
694
    pVnode->statis = pWriter->info.statis;
19✔
695
    char dir[TSDB_FILENAME_LEN] = {0};
19✔
696
    vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, dir, TSDB_FILENAME_LEN);
19✔
697

698
    code = vnodeCommitInfo(dir);
19✔
699
    if (code) goto _exit;
19!
700

701
  } else {
702
    vnodeRollback(pWriter->pVnode);
×
703
  }
704

705
  // commit/rollback sub-system
706
  if (pWriter->pMetaSnapWriter) {
19!
707
    code = metaSnapWriterClose(&pWriter->pMetaSnapWriter, rollback);
19✔
708
    if (code) goto _exit;
19!
709
  }
710

711
  if (pWriter->pTsdbSnapWriter) {
19!
712
    code = tsdbSnapWriterClose(&pWriter->pTsdbSnapWriter, rollback);
×
713
    if (code) goto _exit;
×
714
  }
715

716
  if (pWriter->pTsdbSnapRAWWriter) {
19!
717
    code = tsdbSnapRAWWriterClose(&pWriter->pTsdbSnapRAWWriter, rollback);
19✔
718
    if (code) goto _exit;
19!
719
  }
720

721
  if (pWriter->pTqSnapHandleWriter) {
19✔
722
    code = tqSnapWriterClose(&pWriter->pTqSnapHandleWriter, rollback);
17✔
723
    if (code) goto _exit;
17!
724
  }
725

726
  if (pWriter->pTqSnapCheckInfoWriter) {
19!
UNCOV
727
    code = tqSnapWriterClose(&pWriter->pTqSnapCheckInfoWriter, rollback);
×
UNCOV
728
    if (code) goto _exit;
×
729
  }
730

731
  if (pWriter->pTqSnapOffsetWriter) {
19✔
732
    code = tqSnapWriterClose(&pWriter->pTqSnapOffsetWriter, rollback);
17✔
733
    if (code) goto _exit;
17!
734
  }
735

736
  if (pWriter->pStreamTaskWriter) {
19✔
737
    code = streamTaskSnapWriterClose(pWriter->pStreamTaskWriter, rollback, pWriter->pStreamStateWriter == NULL ? 1 : 0);
2✔
738

739
    if (code) goto _exit;
2!
740
  }
741

742
  if (pWriter->pStreamStateWriter) {
19!
UNCOV
743
    code = streamStateSnapWriterClose(pWriter->pStreamStateWriter, rollback);
×
UNCOV
744
    if (code) goto _exit;
×
745

UNCOV
746
    code = streamStateRebuildFromSnap(pWriter->pStreamStateWriter, 0);
×
UNCOV
747
    pWriter->pStreamStateWriter = NULL;
×
UNCOV
748
    if (code) goto _exit;
×
749
  }
750

751
  if (pWriter->pRsmaSnapWriter) {
19!
752
    code = rsmaSnapWriterClose(&pWriter->pRsmaSnapWriter, rollback);
×
753
    if (code) goto _exit;
×
754
  }
755

756
  code = vnodeBegin(pVnode);
19✔
757
  if (code) goto _exit;
19!
758

759
  (void)taosThreadMutexLock(&pVnode->mutex);
19✔
760
  pVnode->disableWrite = false;
19✔
761
  (void)taosThreadMutexUnlock(&pVnode->mutex);
19✔
762

763
_exit:
19✔
764
  if (code) {
19!
765
    vError("vgId:%d, vnode snapshot writer close failed since %s", TD_VID(pWriter->pVnode), tstrerror(code));
×
766
  } else {
767
    vInfo("vgId:%d, vnode snapshot writer closed, rollback:%d", TD_VID(pVnode), rollback);
19!
768
    taosMemoryFree(pWriter);
19✔
769
  }
770
  if (vnodeEnableBgTask(pVnode) != 0) {
19!
771
    tsdbError("vgId:%d, failed to enable bg task", TD_VID(pVnode));
×
772
  }
773
  return code;
19✔
774
}
775

776
static int32_t vnodeSnapWriteInfo(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
19✔
777
  int32_t       code = 0;
19✔
778
  int32_t       lino;
779
  SVnode       *pVnode = pWriter->pVnode;
19✔
780
  SSnapDataHdr *pHdr = (SSnapDataHdr *)pData;
19✔
781

782
  // decode info
783
  code = vnodeDecodeInfo(pHdr->data, &pWriter->info);
19✔
784
  TSDB_CHECK_CODE(code, lino, _exit);
19!
785

786
  // change some value
787
  pWriter->info.state.commitID = pWriter->commitID;
19✔
788

789
  // modify info as needed
790
  char dir[TSDB_FILENAME_LEN] = {0};
19✔
791
  vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, dir, TSDB_FILENAME_LEN);
19✔
792

793
  SVnodeStats vndStats = pWriter->info.config.vndStats;
19✔
794
  pWriter->info.config = pVnode->config;
19✔
795
  pWriter->info.config.vndStats = vndStats;
19✔
796
  vDebug("vgId:%d, save config while write snapshot", pWriter->pVnode->config.vgId);
19!
797
  code = vnodeSaveInfo(dir, &pWriter->info);
19✔
798
  TSDB_CHECK_CODE(code, lino, _exit);
19!
799

800
_exit:
19✔
801
  return code;
19✔
802
}
803

804
int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
1,386✔
805
  int32_t       code = 0;
1,386✔
806
  int32_t       lino;
807
  SSnapDataHdr *pHdr = (SSnapDataHdr *)pData;
1,386✔
808
  SVnode       *pVnode = pWriter->pVnode;
1,386✔
809

810
  if (!(pHdr->size + sizeof(SSnapDataHdr) == nData)) {
1,386!
811
    return TSDB_CODE_INVALID_PARA;
×
812
  }
813

814
  if (pHdr->index != pWriter->index + 1) {
1,386!
815
    vError("vgId:%d, unexpected vnode snapshot msg. index:%" PRId64 ", expected index:%" PRId64, TD_VID(pVnode),
×
816
           pHdr->index, pWriter->index + 1);
817
    TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_MSG, lino, _exit);
×
818
  }
819

820
  pWriter->index = pHdr->index;
1,386✔
821

822
  vDebug("vgId:%d, vnode snapshot write data, index:%" PRId64 " type:%d blockLen:%d", TD_VID(pVnode), pHdr->index,
1,386!
823
         pHdr->type, nData);
824

825
  switch (pHdr->type) {
1,386!
826
    case SNAP_DATA_CFG: {
19✔
827
      code = vnodeSnapWriteInfo(pWriter, pData, nData);
19✔
828
      TSDB_CHECK_CODE(code, lino, _exit);
19!
829
    } break;
19✔
830
    case SNAP_DATA_META: {
1,202✔
831
      // meta
832
      if (pWriter->pMetaSnapWriter == NULL) {
1,202✔
833
        code = metaSnapWriterOpen(pVnode->pMeta, pWriter->sver, pWriter->ever, &pWriter->pMetaSnapWriter);
19✔
834
        TSDB_CHECK_CODE(code, lino, _exit);
19!
835
      }
836

837
      code = metaSnapWrite(pWriter->pMetaSnapWriter, pData, nData);
1,202✔
838
      TSDB_CHECK_CODE(code, lino, _exit);
1,202!
839
    } break;
1,202✔
840
    case SNAP_DATA_TSDB:
×
841
    case SNAP_DATA_DEL: {
842
      // tsdb
843
      if (pWriter->pTsdbSnapWriter == NULL) {
×
844
        code = tsdbSnapWriterOpen(pVnode->pTsdb, pWriter->sver, pWriter->ever, pWriter->pRanges,
×
845
                                  &pWriter->pTsdbSnapWriter);
846
        TSDB_CHECK_CODE(code, lino, _exit);
×
847
      }
848

849
      code = tsdbSnapWrite(pWriter->pTsdbSnapWriter, pHdr);
×
850
      TSDB_CHECK_CODE(code, lino, _exit);
×
851
    } break;
×
852
    case SNAP_DATA_RAW: {
127✔
853
      // tsdb
854
      if (pWriter->pTsdbSnapRAWWriter == NULL) {
127✔
855
        code = tsdbSnapRAWWriterOpen(pVnode->pTsdb, pWriter->ever, &pWriter->pTsdbSnapRAWWriter);
19✔
856
        TSDB_CHECK_CODE(code, lino, _exit);
19!
857
      }
858

859
      code = tsdbSnapRAWWrite(pWriter->pTsdbSnapRAWWriter, pHdr);
127✔
860
      TSDB_CHECK_CODE(code, lino, _exit);
127!
861
    } break;
127✔
862
    case SNAP_DATA_TQ_HANDLE: {
17✔
863
      // tq handle
864
      if (pWriter->pTqSnapHandleWriter == NULL) {
17!
865
        code = tqSnapWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pTqSnapHandleWriter);
17✔
866
        TSDB_CHECK_CODE(code, lino, _exit);
17!
867
      }
868

869
      code = tqSnapHandleWrite(pWriter->pTqSnapHandleWriter, pData, nData);
17✔
870
      TSDB_CHECK_CODE(code, lino, _exit);
17!
871
    } break;
17✔
UNCOV
872
    case SNAP_DATA_TQ_CHECKINFO: {
×
873
      // tq checkinfo
UNCOV
874
      if (pWriter->pTqSnapCheckInfoWriter == NULL) {
×
UNCOV
875
        code = tqSnapWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pTqSnapCheckInfoWriter);
×
UNCOV
876
        TSDB_CHECK_CODE(code, lino, _exit);
×
877
      }
878

UNCOV
879
      code = tqSnapCheckInfoWrite(pWriter->pTqSnapCheckInfoWriter, pData, nData);
×
UNCOV
880
      TSDB_CHECK_CODE(code, lino, _exit);
×
UNCOV
881
    } break;
×
882
    case SNAP_DATA_TQ_OFFSET: {
17✔
883
      // tq offset
884
      if (pWriter->pTqSnapOffsetWriter == NULL) {
17!
885
        code = tqSnapWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pTqSnapOffsetWriter);
17✔
886
        TSDB_CHECK_CODE(code, lino, _exit);
17!
887
      }
888

889
      code = tqSnapOffsetWrite(pWriter->pTqSnapOffsetWriter, pData, nData);
17✔
890
      TSDB_CHECK_CODE(code, lino, _exit);
17!
891
    } break;
17✔
892
    case SNAP_DATA_STREAM_TASK:
4✔
893
    case SNAP_DATA_STREAM_TASK_CHECKPOINT: {
894
      if (pWriter->pStreamTaskWriter == NULL) {
4✔
895
        code = streamTaskSnapWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pStreamTaskWriter);
2✔
896
        TSDB_CHECK_CODE(code, lino, _exit);
2!
897
      }
898
      code = streamTaskSnapWrite(pWriter->pStreamTaskWriter, pData, nData);
4✔
899
      TSDB_CHECK_CODE(code, lino, _exit);
4!
900
    } break;
4✔
UNCOV
901
    case SNAP_DATA_STREAM_STATE_BACKEND: {
×
UNCOV
902
      if (pWriter->pStreamStateWriter == NULL) {
×
UNCOV
903
        code = streamStateSnapWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pStreamStateWriter);
×
UNCOV
904
        TSDB_CHECK_CODE(code, lino, _exit);
×
905
      }
UNCOV
906
      code = streamStateSnapWrite(pWriter->pStreamStateWriter, pData, nData);
×
UNCOV
907
      TSDB_CHECK_CODE(code, lino, _exit);
×
908

UNCOV
909
    } break;
×
910
    case SNAP_DATA_RSMA1:
×
911
    case SNAP_DATA_RSMA2:
912
    case SNAP_DATA_QTASK: {
913
      // rsma1/rsma2/qtask for rsma
914
      if (pWriter->pRsmaSnapWriter == NULL) {
×
915
        code = rsmaSnapWriterOpen(pVnode->pSma, pWriter->sver, pWriter->ever, (void **)pWriter->pRsmaRanges,
×
916
                                  &pWriter->pRsmaSnapWriter);
917
        TSDB_CHECK_CODE(code, lino, _exit);
×
918
      }
919

920
      code = rsmaSnapWrite(pWriter->pRsmaSnapWriter, pData, nData);
×
921
      TSDB_CHECK_CODE(code, lino, _exit);
×
922
    } break;
×
923
    default:
×
924
      break;
×
925
  }
926
_exit:
1,386✔
927
  if (code) {
1,386!
928
    vError("vgId:%d, vnode snapshot write failed since %s, index:%" PRId64 " type:%d nData:%d", TD_VID(pVnode),
×
929
           tstrerror(code), pHdr->index, pHdr->type, nData);
930
  }
931
  return code;
1,386✔
932
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc