• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3798

31 Mar 2025 10:39AM UTC coverage: 9.424% (-20.9%) from 30.372%
#3798

push

travis-ci

happyguoxy
test:add test cases

21549 of 307601 branches covered (7.01%)

Branch coverage included in aggregate %.

36084 of 303967 relevant lines covered (11.87%)

58620.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/libs/sync/src/syncSnapshot.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "syncSnapshot.h"
18
#include "syncIndexMgr.h"
19
#include "syncPipeline.h"
20
#include "syncRaftCfg.h"
21
#include "syncRaftLog.h"
22
#include "syncRaftStore.h"
23
#include "syncReplication.h"
24
#include "syncUtil.h"
25
#include "tglobal.h"
26

27
static SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths);
28

29
static void syncSnapBufferReset(SSyncSnapBuffer *pBuf) {
×
30
  for (int64_t i = pBuf->start; i < pBuf->end; ++i) {
×
31
    if (pBuf->entryDeleteCb) {
×
32
      pBuf->entryDeleteCb(pBuf->entries[i % pBuf->size]);
×
33
    }
34
    pBuf->entries[i % pBuf->size] = NULL;
×
35
  }
36
  pBuf->start = SYNC_SNAPSHOT_SEQ_BEGIN + 1;
×
37
  pBuf->end = pBuf->start;
×
38
  pBuf->cursor = pBuf->start - 1;
×
39
}
×
40

41
static void syncSnapBufferDestroy(SSyncSnapBuffer **ppBuf) {
×
42
  if (ppBuf == NULL || ppBuf[0] == NULL) return;
×
43
  SSyncSnapBuffer *pBuf = ppBuf[0];
×
44

45
  syncSnapBufferReset(pBuf);
×
46

47
  (void)taosThreadMutexDestroy(&pBuf->mutex);
×
48
  taosMemoryFree(ppBuf[0]);
×
49
  ppBuf[0] = NULL;
×
50
  return;
×
51
}
52

53
static int32_t syncSnapBufferCreate(SSyncSnapBuffer **ppBuf) {
×
54
  SSyncSnapBuffer *pBuf = taosMemoryCalloc(1, sizeof(SSyncSnapBuffer));
×
55
  if (pBuf == NULL) {
×
56
    *ppBuf = NULL;
×
57
    TAOS_RETURN(terrno);
×
58
  }
59
  pBuf->size = sizeof(pBuf->entries) / sizeof(void *);
×
60
  if (pBuf->size != TSDB_SYNC_SNAP_BUFFER_SIZE) return TSDB_CODE_SYN_INTERNAL_ERROR;
×
61
  (void)taosThreadMutexInit(&pBuf->mutex, NULL);
×
62
  *ppBuf = pBuf;
×
63
  TAOS_RETURN(0);
×
64
}
65

66
int32_t snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex, SSyncSnapshotSender **ppSender) {
×
67
  int32_t code = 0;
×
68
  *ppSender = NULL;
×
69
  bool condition = (pSyncNode->pFsm->FpSnapshotStartRead != NULL) && (pSyncNode->pFsm->FpSnapshotStopRead != NULL) &&
×
70
                   (pSyncNode->pFsm->FpSnapshotDoRead != NULL);
×
71
  if (!condition) {
×
72
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
73
  }
74

75
  SSyncSnapshotSender *pSender = taosMemoryCalloc(1, sizeof(SSyncSnapshotSender));
×
76
  if (pSender == NULL) {
×
77
    TAOS_RETURN(terrno);
×
78
  }
79

80
  pSender->start = false;
×
81
  pSender->seq = SYNC_SNAPSHOT_SEQ_INVALID;
×
82
  pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
×
83
  pSender->pReader = NULL;
×
84
  pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS;
×
85
  pSender->pSyncNode = pSyncNode;
×
86
  pSender->replicaIndex = replicaIndex;
×
87
  pSender->term = raftStoreGetTerm(pSyncNode);
×
88
  pSender->startTime = -1;
×
89
  pSender->finish = false;
×
90

91
  code = pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &pSender->snapshot);
×
92
  if (code != 0) {
×
93
    taosMemoryFreeClear(pSender);
×
94
    TAOS_RETURN(code);
×
95
  }
96
  SSyncSnapBuffer *pSndBuf = NULL;
×
97
  code = syncSnapBufferCreate(&pSndBuf);
×
98
  if (pSndBuf == NULL) {
×
99
    taosMemoryFreeClear(pSender);
×
100
    TAOS_RETURN(code);
×
101
  }
102
  pSndBuf->entryDeleteCb = syncSnapBlockDestroy;
×
103
  pSender->pSndBuf = pSndBuf;
×
104

105
  syncSnapBufferReset(pSender->pSndBuf);
×
106
  *ppSender = pSender;
×
107
  TAOS_RETURN(code);
×
108
}
109

110
void syncSnapBlockDestroy(void *ptr) {
×
111
  SyncSnapBlock *pBlk = ptr;
×
112
  if (pBlk->pBlock != NULL) {
×
113
    taosMemoryFree(pBlk->pBlock);
×
114
    pBlk->pBlock = NULL;
×
115
    pBlk->blockLen = 0;
×
116
  }
117
  taosMemoryFree(pBlk);
×
118
}
×
119

120
static int32_t snapshotSenderClearInfoData(SSyncSnapshotSender *pSender) {
×
121
  if (pSender->snapshotParam.data) {
×
122
    taosMemoryFree(pSender->snapshotParam.data);
×
123
    pSender->snapshotParam.data = NULL;
×
124
  }
125

126
  if (pSender->snapshot.data) {
×
127
    taosMemoryFree(pSender->snapshot.data);
×
128
    pSender->snapshot.data = NULL;
×
129
  }
130
  return 0;
×
131
}
132

133
void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
×
134
  if (pSender == NULL) return;
×
135

136
  // close reader
137
  if (pSender->pReader != NULL) {
×
138
    pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
×
139
    pSender->pReader = NULL;
×
140
  }
141

142
  // free snap buffer
143
  if (pSender->pSndBuf) {
×
144
    syncSnapBufferDestroy(&pSender->pSndBuf);
×
145
  }
146

147
  (void)snapshotSenderClearInfoData(pSender);
×
148

149
  // free sender
150
  taosMemoryFree(pSender);
×
151
}
152

153
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return atomic_load_8(&pSender->start); }
×
154

155
int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
×
156
  int32_t code = 0;
×
157

158
  int8_t started = atomic_val_compare_exchange_8(&pSender->start, false, true);
×
159
  if (started) return 0;
×
160

161
  pSender->seq = SYNC_SNAPSHOT_SEQ_PREP;
×
162
  pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
×
163
  pSender->pReader = NULL;
×
164
  pSender->snapshotParam.start = SYNC_INDEX_INVALID;
×
165
  pSender->snapshotParam.end = SYNC_INDEX_INVALID;
×
166
  pSender->snapshot.data = NULL;
×
167
  pSender->snapshotParam.end = SYNC_INDEX_INVALID;
×
168
  pSender->snapshot.lastApplyIndex = SYNC_INDEX_INVALID;
×
169
  pSender->snapshot.lastApplyTerm = SYNC_TERM_INVALID;
×
170
  pSender->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;
×
171

172
  (void)memset(&pSender->lastConfig, 0, sizeof(pSender->lastConfig));
×
173
  pSender->sendingMS = 0;
×
174
  pSender->term = raftStoreGetTerm(pSender->pSyncNode);
×
175
  pSender->startTime = taosGetMonoTimestampMs();
×
176
  pSender->lastSendTime = taosGetTimestampMs();
×
177
  pSender->finish = false;
×
178

179
  // Get snapshot info
180
  SSyncNode *pSyncNode = pSender->pSyncNode;
×
181
  SSnapshot  snapInfo = {.type = TDMT_SYNC_PREP_SNAPSHOT};
×
182
  if ((code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapInfo)) != 0) {
×
183
    sSError(pSender, "snapshot get info failure since %s", tstrerror(code));
×
184
    goto _out;
×
185
  }
186

187
  void   *pData = snapInfo.data;
×
188
  int32_t type = (pData) ? snapInfo.type : 0;
×
189
  int32_t dataLen = 0;
×
190
  if (pData) {
×
191
    SSyncTLV *datHead = pData;
×
192
    if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT) {
×
193
      sSError(pSender, "unexpected data typ in data of snapshot info. typ: %d", datHead->typ);
×
194
      code = TSDB_CODE_INVALID_DATA_FMT;
×
195
      goto _out;
×
196
    }
197
    dataLen = sizeof(SSyncTLV) + datHead->len;
×
198
  }
199

200
  if ((code = syncSnapSendMsg(pSender, pSender->seq, pData, dataLen, type)) != 0) {
×
201
    goto _out;
×
202
  }
203

204
  SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
×
205
  sSInfo(pSender, "snapshot sender start, to dnode:%d.", DID(&destId));
×
206
_out:
×
207
  if (snapInfo.data) {
×
208
    taosMemoryFree(snapInfo.data);
×
209
    snapInfo.data = NULL;
×
210
  }
211
  TAOS_RETURN(code);
×
212
}
213

214
void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
×
215
  sSDebug(pSender, "snapshot sender stop, finish:%d reader:%p", finish, pSender->pReader);
×
216

217
  // update flag
218
  int8_t stopped = !atomic_val_compare_exchange_8(&pSender->start, true, false);
×
219
  if (stopped) return;
×
220
  (void)taosThreadMutexLock(&pSender->pSndBuf->mutex);
×
221
  {
222
    pSender->finish = finish;
×
223

224
    // close reader
225
    if (pSender->pReader != NULL) {
×
226
      pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
×
227
      pSender->pReader = NULL;
×
228
    }
229

230
    syncSnapBufferReset(pSender->pSndBuf);
×
231

232
    (void)snapshotSenderClearInfoData(pSender);
×
233

234
    SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
×
235
    sSInfo(pSender, "snapshot sender stop, to dnode:%d, finish:%d", DID(&destId), finish);
×
236
  }
237
  (void)taosThreadMutexUnlock(&pSender->pSndBuf->mutex);
×
238
}
239

240
int32_t syncSnapSendMsg(SSyncSnapshotSender *pSender, int32_t seq, void *pBlock, int32_t blockLen, int32_t typ) {
×
241
  int32_t code = 0;
×
242
  SRpcMsg rpcMsg = {0};
×
243

244
  if ((code = syncBuildSnapshotSend(&rpcMsg, blockLen, pSender->pSyncNode->vgId)) != 0) {
×
245
    sSError(pSender, "failed to build snap replication msg since %s", tstrerror(code));
×
246
    goto _OUT;
×
247
  }
248

249
  SyncSnapshotSend *pMsg = rpcMsg.pCont;
×
250
  pMsg->srcId = pSender->pSyncNode->myRaftId;
×
251
  pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
×
252
  pMsg->term = pSender->term;
×
253
  pMsg->beginIndex = pSender->snapshotParam.start;
×
254
  pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
×
255
  pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
×
256
  pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
×
257
  pMsg->lastConfig = pSender->lastConfig;
×
258
  pMsg->startTime = pSender->startTime;
×
259
  pMsg->seq = seq;
×
260

261
  if (pBlock != NULL && blockLen > 0) {
×
262
    (void)memcpy(pMsg->data, pBlock, blockLen);
×
263
  }
264
  pMsg->payloadType = typ;
×
265

266
  // send msg
267
  if ((code = syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg)) != 0) {
×
268
    sSError(pSender, "failed to send snap replication msg since %s. seq:%d", tstrerror(code), seq);
×
269
    goto _OUT;
×
270
  }
271

272
_OUT:
×
273
  TAOS_RETURN(code);
×
274
}
275

276
// when sender receive ack, call this function to send msg from seq
277
// seq = ack + 1, already updated
278
static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
×
279
  int32_t        code = 0;
×
280
  SyncSnapBlock *pBlk = NULL;
×
281

282
  if (pSender->seq < SYNC_SNAPSHOT_SEQ_END) {
×
283
    pSender->seq++;
×
284

285
    if (pSender->seq > SYNC_SNAPSHOT_SEQ_BEGIN) {
×
286
      pBlk = taosMemoryCalloc(1, sizeof(SyncSnapBlock));
×
287
      if (pBlk == NULL) {
×
288
        code = terrno;
×
289
        goto _OUT;
×
290
      }
291

292
      pBlk->seq = pSender->seq;
×
293

294
      // read data
295
      code = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader, &pBlk->pBlock,
×
296
                                                        &pBlk->blockLen);
297
      if (code != 0) {
×
298
        sSError(pSender, "snapshot sender read failed since %s", tstrerror(code));
×
299
        goto _OUT;
×
300
      }
301

302
      if (pBlk->blockLen > 0) {
×
303
        // has read data
304
        sSDebug(pSender, "snapshot sender continue to read, blockLen:%d seq:%d", pBlk->blockLen, pBlk->seq);
×
305
      } else {
306
        // read finish, update seq to end
307
        pSender->seq = SYNC_SNAPSHOT_SEQ_END;
×
308
        sSInfo(pSender, "snapshot sender read to the end");
×
309
        goto _OUT;
×
310
      }
311
    }
312
  }
313

314
  if (!(pSender->seq >= SYNC_SNAPSHOT_SEQ_BEGIN && pSender->seq <= SYNC_SNAPSHOT_SEQ_END)) {
×
315
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
316
    goto _OUT;
×
317
  }
318

319
  // send msg
320
  int32_t blockLen = (pBlk) ? pBlk->blockLen : 0;
×
321
  void   *pBlock = (pBlk) ? pBlk->pBlock : NULL;
×
322
  if ((code = syncSnapSendMsg(pSender, pSender->seq, pBlock, blockLen, 0)) != 0) {
×
323
    goto _OUT;
×
324
  }
325

326
  // put in buffer
327
  int64_t nowMs = taosGetTimestampMs();
×
328
  if (pBlk) {
×
329
    if (!(pBlk->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pBlk->seq < SYNC_SNAPSHOT_SEQ_END)) {
×
330
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
331
      goto _OUT;
×
332
    }
333
    pBlk->sendTimeMs = nowMs;
×
334
    pSender->pSndBuf->entries[pSender->seq % pSender->pSndBuf->size] = pBlk;
×
335
    pBlk = NULL;
×
336
    pSender->pSndBuf->end = TMAX(pSender->seq + 1, pSender->pSndBuf->end);
×
337
  }
338
  pSender->lastSendTime = nowMs;
×
339

340
_OUT:;
×
341
  if (pBlk != NULL) {
×
342
    syncSnapBlockDestroy(pBlk);
×
343
    pBlk = NULL;
×
344
  }
345
  TAOS_RETURN(code);
×
346
}
347

348
// send snapshot data from cache
349
int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
×
350
  SSyncSnapBuffer *pSndBuf = pSender->pSndBuf;
×
351
  int32_t          code = 0;
×
352
  (void)taosThreadMutexLock(&pSndBuf->mutex);
×
353
  if (pSender->pReader == NULL || pSender->finish || !snapshotSenderIsStart(pSender)) {
×
354
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
355
    goto _out;
×
356
  }
357

358
  for (int32_t seq = pSndBuf->cursor + 1; seq < pSndBuf->end; ++seq) {
×
359
    SyncSnapBlock *pBlk = pSndBuf->entries[seq % pSndBuf->size];
×
360
    if (!pBlk) {
×
361
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
362
      goto _out;
×
363
    }
364
    int64_t nowMs = taosGetTimestampMs();
×
365
    if (pBlk->acked || nowMs < pBlk->sendTimeMs + SYNC_SNAP_RESEND_MS) {
×
366
      continue;
×
367
    }
368
    if ((code = syncSnapSendMsg(pSender, pBlk->seq, pBlk->pBlock, pBlk->blockLen, 0)) != 0) {
×
369
      goto _out;
×
370
    }
371
    pBlk->sendTimeMs = nowMs;
×
372
  }
373

374
  if (pSender->seq != SYNC_SNAPSHOT_SEQ_END && pSndBuf->end <= pSndBuf->start) {
×
375
    if ((code = snapshotSend(pSender)) != 0) {
×
376
      goto _out;
×
377
    }
378
  }
379

380
  if (pSender->seq == SYNC_SNAPSHOT_SEQ_END && pSndBuf->end <= pSndBuf->start) {
×
381
    if ((code = syncSnapSendMsg(pSender, pSender->seq, NULL, 0, 0)) != 0) {
×
382
      goto _out;
×
383
    }
384
  }
385
_out:;
×
386
  (void)taosThreadMutexUnlock(&pSndBuf->mutex);
×
387
  TAOS_RETURN(code);
×
388
}
389

390
int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) {
×
391
  SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, pDestId);
×
392
  if (pSender == NULL) {
×
393
    sNError(pSyncNode, "snapshot sender start error since get failed");
×
394
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
395
  }
396

397
  if (snapshotSenderIsStart(pSender)) {
×
398
    sSDebug(pSender, "snapshot sender already start, ignore");
×
399
    return 0;
×
400
  }
401

402
  taosMsleep(1);
×
403

404
  int32_t code = snapshotSenderStart(pSender);
×
405
  if (code != 0) {
×
406
    sSError(pSender, "snapshot sender start error since %s", tstrerror(code));
×
407
    TAOS_RETURN(code);
×
408
  }
409

410
  return 0;
×
411
}
412

413
// receiver
414
int32_t snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId, SSyncSnapshotReceiver **ppReceiver) {
×
415
  int32_t code = 0;
×
416
  *ppReceiver = NULL;
×
417
  bool condition = (pSyncNode->pFsm->FpSnapshotStartWrite != NULL) && (pSyncNode->pFsm->FpSnapshotStopWrite != NULL) &&
×
418
                   (pSyncNode->pFsm->FpSnapshotDoWrite != NULL);
×
419
  if (!condition) {
×
420
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
421
  }
422

423
  SSyncSnapshotReceiver *pReceiver = taosMemoryCalloc(1, sizeof(SSyncSnapshotReceiver));
×
424
  if (pReceiver == NULL) {
×
425
    TAOS_RETURN(terrno);
×
426
  }
427

428
  pReceiver->start = false;
×
429
  pReceiver->startTime = 0;
×
430
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
×
431
  pReceiver->pWriter = NULL;
×
432
  code = taosThreadMutexInit(&pReceiver->writerMutex, NULL);
×
433
  if (code != 0) {
×
434
    taosMemoryFree(pReceiver);
×
435
    pReceiver = NULL;
×
436
    TAOS_RETURN(code);
×
437
  }
438
  pReceiver->pSyncNode = pSyncNode;
×
439
  pReceiver->fromId = fromId;
×
440
  pReceiver->term = raftStoreGetTerm(pSyncNode);
×
441
  pReceiver->snapshot.data = NULL;
×
442
  pReceiver->snapshot.lastApplyIndex = SYNC_INDEX_INVALID;
×
443
  pReceiver->snapshot.lastApplyTerm = 0;
×
444
  pReceiver->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;
×
445

446
  SSyncSnapBuffer *pRcvBuf = NULL;
×
447
  code = syncSnapBufferCreate(&pRcvBuf);
×
448
  if (pRcvBuf == NULL) {
×
449
    int32_t ret = taosThreadMutexDestroy(&pReceiver->writerMutex);
×
450
    if (ret != 0) {
×
451
      sError("failed to destroy mutex since %s", tstrerror(ret));
×
452
    }
453
    taosMemoryFree(pReceiver);
×
454
    pReceiver = NULL;
×
455
    TAOS_RETURN(code);
×
456
  }
457
  pRcvBuf->entryDeleteCb = rpcFreeCont;
×
458
  pReceiver->pRcvBuf = pRcvBuf;
×
459

460
  syncSnapBufferReset(pReceiver->pRcvBuf);
×
461
  *ppReceiver = pReceiver;
×
462
  TAOS_RETURN(code);
×
463
}
464

465
static int32_t snapshotReceiverClearInfoData(SSyncSnapshotReceiver *pReceiver) {
×
466
  if (pReceiver->snapshotParam.data) {
×
467
    taosMemoryFree(pReceiver->snapshotParam.data);
×
468
    pReceiver->snapshotParam.data = NULL;
×
469
  }
470

471
  if (pReceiver->snapshot.data) {
×
472
    taosMemoryFree(pReceiver->snapshot.data);
×
473
    pReceiver->snapshot.data = NULL;
×
474
  }
475
  return 0;
×
476
}
477

478
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
×
479
  if (pReceiver == NULL) return;
×
480

481
  (void)taosThreadMutexLock(&pReceiver->writerMutex);
×
482
  // close writer
483
  if (pReceiver->pWriter != NULL) {
×
484
    int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
×
485
                                                                   false, &pReceiver->snapshot);
486
    if (code != 0) {
×
487
      sError("vgId:%d, snapshot receiver stop failed while destroy since %s", pReceiver->pSyncNode->vgId,
×
488
             tstrerror(code));
489
    }
490
    pReceiver->pWriter = NULL;
×
491
  }
492
  (void)taosThreadMutexUnlock(&pReceiver->writerMutex);
×
493

494
  (void)taosThreadMutexDestroy(&pReceiver->writerMutex);
×
495

496
  // free snap buf
497
  if (pReceiver->pRcvBuf) {
×
498
    syncSnapBufferDestroy(&pReceiver->pRcvBuf);
×
499
  }
500

501
  (void)snapshotReceiverClearInfoData(pReceiver);
×
502

503
  // free receiver
504
  taosMemoryFree(pReceiver);
×
505
}
506

507
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) {
×
508
  return (pReceiver != NULL ? atomic_load_8(&pReceiver->start) : false);
×
509
}
510

511
static int32_t snapshotReceiverSignatureCmp(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
×
512
  if (pReceiver->term < pMsg->term) return -1;
×
513
  if (pReceiver->term > pMsg->term) return 1;
×
514
  if (pReceiver->startTime < pMsg->startTime) return -1;
×
515
  if (pReceiver->startTime > pMsg->startTime) return 1;
×
516
  return 0;
×
517
}
518

519
static int32_t snapshotReceiverStartWriter(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) {
×
520
  if (pReceiver->pWriter != NULL) {
×
521
    sRError(pReceiver, "vgId:%d, snapshot receiver writer is not null", pReceiver->pSyncNode->vgId);
×
522
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
523
  }
524

525
  // update ack
526
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
×
527

528
  // update snapshot
529
  pReceiver->snapshot.lastApplyIndex = pBeginMsg->lastIndex;
×
530
  pReceiver->snapshot.lastApplyTerm = pBeginMsg->lastTerm;
×
531
  pReceiver->snapshot.lastConfigIndex = pBeginMsg->lastConfigIndex;
×
532
  pReceiver->snapshotParam.start = pBeginMsg->beginIndex;
×
533
  pReceiver->snapshotParam.end = pBeginMsg->lastIndex;
×
534

535
  // start writer
536
  int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, &pReceiver->snapshotParam,
×
537
                                                                  &pReceiver->pWriter);
538
  if (code != 0) {
×
539
    sRError(pReceiver, "snapshot receiver start write failed since %s", tstrerror(code));
×
540
    TAOS_RETURN(code);
×
541
  }
542

543
  // event log
544
  sRInfo(pReceiver, "snapshot receiver start write");
×
545
  return 0;
×
546
}
547

548
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pPreMsg) {
×
549
  if (snapshotReceiverIsStart(pReceiver)) {
×
550
    sRInfo(pReceiver, "snapshot receiver has started");
×
551
    return;
×
552
  }
553

554
  int8_t started = atomic_val_compare_exchange_8(&pReceiver->start, false, true);
×
555
  if (started) return;
×
556

557
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_PREP;
×
558
  pReceiver->term = pPreMsg->term;
×
559
  pReceiver->fromId = pPreMsg->srcId;
×
560
  pReceiver->startTime = pPreMsg->startTime;
×
561

562
  pReceiver->snapshotParam.start = syncNodeGetSnapBeginIndex(pReceiver->pSyncNode);
×
563
  pReceiver->snapshotParam.end = -1;
×
564

565
  sRInfo(pReceiver, "snapshot receiver start, from dnode:%d.", DID(&pReceiver->fromId));
×
566
}
567

568
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
×
569
  sRDebug(pReceiver, "snapshot receiver stop, not apply, writer:%p", pReceiver->pWriter);
×
570

571
  int8_t stopped = !atomic_val_compare_exchange_8(&pReceiver->start, true, false);
×
572
  if (stopped) return;
×
573

574
  (void)taosThreadMutexLock(&pReceiver->writerMutex);
×
575
  {
576
    if (pReceiver->pWriter != NULL) {
×
577
      int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
×
578
                                                                     false, &pReceiver->snapshot);
579
      if (code != 0) {
×
580
        sRError(pReceiver, "snapshot receiver stop write failed since %s", tstrerror(code));
×
581
      }
582
      pReceiver->pWriter = NULL;
×
583
    } else {
584
      sRInfo(pReceiver, "snapshot receiver stop, writer is null");
×
585
    }
586
  }
587
  (void)taosThreadMutexUnlock(&pReceiver->writerMutex);
×
588

589
  (void)taosThreadMutexLock(&pReceiver->pRcvBuf->mutex);
×
590
  {
591
    syncSnapBufferReset(pReceiver->pRcvBuf);
×
592

593
    (void)snapshotReceiverClearInfoData(pReceiver);
×
594
  }
595
  (void)taosThreadMutexUnlock(&pReceiver->pRcvBuf->mutex);
×
596
}
597

598
static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
×
599
  int32_t code = 0;
×
600
  if (pReceiver->pWriter != NULL) {
×
601
    // write data
602
    sRInfo(pReceiver, "snapshot receiver write about to finish, blockLen:%d seq:%d", pMsg->dataLen, pMsg->seq);
×
603
    if (pMsg->dataLen > 0) {
×
604
      (void)taosThreadMutexLock(&pReceiver->writerMutex);
×
605
      code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, pMsg->data,
×
606
                                                           pMsg->dataLen);
×
607
      (void)taosThreadMutexUnlock(&pReceiver->writerMutex);
×
608
      if (code != 0) {
×
609
        sRError(pReceiver, "failed to finish snapshot receiver write since %s", tstrerror(code));
×
610
        TAOS_RETURN(code);
×
611
      }
612
    }
613

614
    // update commit index
615
    if (pReceiver->snapshot.lastApplyIndex > pReceiver->pSyncNode->commitIndex) {
×
616
      pReceiver->pSyncNode->commitIndex = pReceiver->snapshot.lastApplyIndex;
×
617
    }
618

619
    // maybe update term
620
    if (pReceiver->snapshot.lastApplyTerm > raftStoreGetTerm(pReceiver->pSyncNode)) {
×
621
      raftStoreSetTerm(pReceiver->pSyncNode, pReceiver->snapshot.lastApplyTerm);
×
622
    }
623

624
    (void)taosThreadMutexLock(&pReceiver->writerMutex);
×
625
    if (pReceiver->pWriter != NULL) {
×
626
      // stop writer, apply data
627
      code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true,
×
628
                                                             &pReceiver->snapshot);
629
      if (code != 0) {
×
630
        sRError(pReceiver, "snapshot receiver apply failed since %s", tstrerror(code));
×
631
        TAOS_RETURN(code);
×
632
      }
633
      pReceiver->pWriter = NULL;
×
634
      sRInfo(pReceiver, "snapshot receiver write stopped");
×
635
    }
636
    (void)taosThreadMutexUnlock(&pReceiver->writerMutex);
×
637

638
    // update progress
639
    pReceiver->ack = SYNC_SNAPSHOT_SEQ_END;
×
640

641
    // get fsmState
642
    SSnapshot snapshot = {0};
×
643
    code = pReceiver->pSyncNode->pFsm->FpGetSnapshotInfo(pReceiver->pSyncNode->pFsm, &snapshot);
×
644
    if (code != 0) {
×
645
      sRError(pReceiver, "snapshot receiver get snapshot info failed since %s", tstrerror(code));
×
646
      TAOS_RETURN(code);
×
647
    }
648
    pReceiver->pSyncNode->fsmState = snapshot.state;
×
649

650
    // reset wal
651
    code =
652
        pReceiver->pSyncNode->pLogStore->syncLogRestoreFromSnapshot(pReceiver->pSyncNode->pLogStore, pMsg->lastIndex);
×
653
    if (code != 0) {
×
654
      sRError(pReceiver, "failed to snapshot receiver log restore since %s", tstrerror(code));
×
655
      TAOS_RETURN(code);
×
656
    }
657
    sRInfo(pReceiver, "wal log restored from snapshot");
×
658
  } else {
659
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
660
    sRError(pReceiver, "snapshot receiver finish error since writer is null");
×
661
    TAOS_RETURN(code);
×
662
  }
663

664
  return 0;
×
665
}
666

667
static int32_t snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
×
668
  if (pMsg->seq != pReceiver->ack + 1) {
×
669
    sRError(pReceiver, "snapshot receiver invalid seq, ack:%d seq:%d", pReceiver->ack, pMsg->seq);
×
670
    TAOS_RETURN(TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG);
×
671
  }
672

673
  if (pReceiver->pWriter == NULL) {
×
674
    sRError(pReceiver, "snapshot receiver failed to write data since writer is null");
×
675
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
676
  }
677

678
  sRDebug(pReceiver, "snapshot receiver continue to write, blockLen:%d seq:%d", pMsg->dataLen, pMsg->seq);
×
679

680
  if (pMsg->dataLen > 0) {
×
681
    // apply data block
682
    int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
×
683
                                                                 pMsg->data, pMsg->dataLen);
×
684
    if (code != 0) {
×
685
      sRError(pReceiver, "snapshot receiver continue write failed since %s", tstrerror(code));
×
686
      TAOS_RETURN(code);
×
687
    }
688
  }
689

690
  // update progress
691
  pReceiver->ack = pMsg->seq;
×
692

693
  // event log
694
  sRDebug(pReceiver, "snapshot receiver continue to write finish");
×
695
  return 0;
×
696
}
697

698
SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths) {
×
699
  SyncIndex snapStart = SYNC_INDEX_INVALID;
×
700

701
  if (syncNodeIsMnode(ths)) {
×
702
    snapStart = SYNC_INDEX_BEGIN;
×
703
    sNInfo(ths, "snapshot begin index is %" PRId64 " since its mnode", snapStart);
×
704
  } else {
705
    SSyncLogStoreData *pData = ths->pLogStore->data;
×
706
    SWal              *pWal = pData->pWal;
×
707

708
    int64_t walCommitVer = walGetCommittedVer(pWal);
×
709
    snapStart = TMAX(ths->commitIndex, walCommitVer) + 1;
×
710

711
    sNInfo(ths, "snapshot begin index is %" PRId64, snapStart);
×
712
  }
713

714
  return snapStart;
×
715
}
716

717
static int32_t syncSnapReceiverExchgSnapInfo(SSyncNode *pSyncNode, SSyncSnapshotReceiver *pReceiver,
×
718
                                             SyncSnapshotSend *pMsg, SSnapshot *pInfo) {
719
  if (pMsg->payloadType != TDMT_SYNC_PREP_SNAPSHOT) return TSDB_CODE_SYN_INTERNAL_ERROR;
×
720
  int32_t code = 0, lino = 0;
×
721

722
  // copy snap info from leader
723
  void *data = taosMemoryCalloc(1, pMsg->dataLen);
×
724
  if (data == NULL) {
×
725
    TAOS_CHECK_EXIT(terrno);
×
726
  }
727
  pInfo->data = data;
×
728
  data = NULL;
×
729
  (void)memcpy(pInfo->data, pMsg->data, pMsg->dataLen);
×
730

731
  // exchange snap info
732
  if ((code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, pInfo)) != 0) {
×
733
    sRError(pReceiver, "failed to get snapshot info. type: %d", pMsg->payloadType);
×
734
    goto _exit;
×
735
  }
736
  SSyncTLV *datHead = pInfo->data;
×
737
  if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
×
738
    sRError(pReceiver, "unexpected data typ in data of snapshot info. typ: %d", datHead->typ);
×
739
    code = TSDB_CODE_INVALID_DATA_FMT;
×
740
    goto _exit;
×
741
  }
742
  int32_t dataLen = sizeof(SSyncTLV) + datHead->len;
×
743

744
  // save exchanged snap info
745
  SSnapshotParam *pParam = &pReceiver->snapshotParam;
×
746
  data = taosMemoryRealloc(pParam->data, dataLen);
×
747
  if (data == NULL) {
×
748
    code = terrno;
×
749
    sError("vgId:%d, failed to realloc memory for snapshot prep due to %s. dataLen:%d", pSyncNode->vgId,
×
750
           tstrerror(code), dataLen);
751
    goto _exit;
×
752
  }
753
  pParam->data = data;
×
754
  data = NULL;
×
755
  (void)memcpy(pParam->data, pInfo->data, dataLen);
×
756

757
_exit:
×
758
  TAOS_RETURN(code);
×
759
}
760

761
static int32_t syncNodeOnSnapshotPrep(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
×
762
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
×
763
  int64_t                timeNow = taosGetTimestampMs();
×
764
  int32_t                code = 0;
×
765

766
  if (snapshotReceiverIsStart(pReceiver)) {
×
767
    // already start
768
    int32_t order = 0;
×
769
    if ((order = snapshotReceiverSignatureCmp(pReceiver, pMsg)) < 0) {
×
770
      sRInfo(pReceiver,
×
771
             "received a new snapshot preparation. restart receiver."
772
             " msg signature:(%" PRId64 ", %" PRId64 ")",
773
             pMsg->term, pMsg->startTime);
774
      goto _START_RECEIVER;
×
775
    } else if (order == 0) {
×
776
      sRInfo(pReceiver,
×
777
             "received a duplicate snapshot preparation. send reply."
778
             " msg signature:(%" PRId64 ", %" PRId64 ")",
779
             pMsg->term, pMsg->startTime);
780
      goto _SEND_REPLY;
×
781
    } else {
782
      // ignore
783
      sRError(pReceiver,
×
784
              "received a stale snapshot preparation. ignore."
785
              " msg signature:(%" PRId64 ", %" PRId64 ")",
786
              pMsg->term, pMsg->startTime);
787
      code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
788
      goto _SEND_REPLY;
×
789
    }
790
  } else {
791
    // start new
792
    sRInfo(pReceiver, "snapshot receiver not start yet so start new one");
×
793
    goto _START_RECEIVER;
×
794
  }
795

796
_START_RECEIVER:
×
797
  if (snapshotReceiverIsStart(pReceiver)) {
×
798
    sRInfo(pReceiver, "snapshot receiver already start and force stop pre one");
×
799
    snapshotReceiverStop(pReceiver);
×
800
  }
801

802
  snapshotReceiverStart(pReceiver, pMsg);
×
803

804
_SEND_REPLY:;
×
805

806
  SSnapshot snapInfo = {.type = TDMT_SYNC_PREP_SNAPSHOT_REPLY};
×
807
  int32_t   dataLen = 0;
×
808
  if (pMsg->payloadType == TDMT_SYNC_PREP_SNAPSHOT) {
×
809
    if ((code = syncSnapReceiverExchgSnapInfo(pSyncNode, pReceiver, pMsg, &snapInfo)) != 0) {
×
810
      goto _out;
×
811
    }
812
    SSyncTLV *datHead = snapInfo.data;
×
813
    dataLen = sizeof(SSyncTLV) + datHead->len;
×
814
  }
815

816
  // send response
817
  int32_t type = (snapInfo.data) ? snapInfo.type : 0;
×
818
  if ((code = syncSnapSendRsp(pReceiver, pMsg, snapInfo.data, dataLen, type, code)) != 0) {
×
819
    goto _out;
×
820
  }
821

822
_out:
×
823
  if (snapInfo.data) {
×
824
    taosMemoryFree(snapInfo.data);
×
825
    snapInfo.data = NULL;
×
826
  }
827
  TAOS_RETURN(code);
×
828
}
829

830
static int32_t syncNodeOnSnapshotBegin(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
×
831
  // condition 1
832
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
×
833
  int32_t                code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
834

835
  if (!snapshotReceiverIsStart(pReceiver)) {
×
836
    sRError(pReceiver, "failed to begin snapshot receiver since not started");
×
837
    goto _SEND_REPLY;
×
838
  }
839

840
  if (snapshotReceiverSignatureCmp(pReceiver, pMsg) != 0) {
×
841
    code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
842
    sRError(pReceiver, "failed to begin snapshot receiver since %s", tstrerror(code));
×
843
    goto _SEND_REPLY;
×
844
  }
845

846
  // start writer
847
  if ((code = snapshotReceiverStartWriter(pReceiver, pMsg)) != 0) {
×
848
    sRError(pReceiver, "failed to start snapshot writer since %s", tstrerror(code));
×
849
    goto _SEND_REPLY;
×
850
  }
851

852
  SyncIndex beginIndex = syncNodeGetSnapBeginIndex(pSyncNode);
×
853
  if (pReceiver->snapshotParam.start != beginIndex) {
×
854
    sRError(pReceiver, "snapshot begin index is changed unexpectedly. sver:%" PRId64 ", beginIndex:%" PRId64,
×
855
            pReceiver->snapshotParam.start, beginIndex);
856
    goto _SEND_REPLY;
×
857
  }
858

859
  code = 0;
×
860
_SEND_REPLY:
×
861

862
  // send response
863
  TAOS_CHECK_RETURN(syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, code));
×
864

865
  TAOS_RETURN(code);
×
866
}
867

868
int32_t syncSnapSendRsp(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg, void *pBlock, int32_t blockLen,
×
869
                        int32_t type, int32_t rspCode) {
870
  int32_t    code = 0;
×
871
  SSyncNode *pSyncNode = pReceiver->pSyncNode;
×
872
  // build msg
873
  SRpcMsg rpcMsg = {0};
×
874
  if ((code = syncBuildSnapshotSendRsp(&rpcMsg, blockLen, pSyncNode->vgId)) != 0) {
×
875
    sRError(pReceiver, "failed to build snapshot receiver resp since %s", tstrerror(code));
×
876
    TAOS_RETURN(code);
×
877
  }
878

879
  SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
×
880
  pRspMsg->srcId = pSyncNode->myRaftId;
×
881
  pRspMsg->destId = pMsg->srcId;
×
882
  pRspMsg->term = pMsg->term;
×
883
  pRspMsg->lastIndex = pMsg->lastIndex;
×
884
  pRspMsg->lastTerm = pMsg->lastTerm;
×
885
  pRspMsg->startTime = pMsg->startTime;
×
886
  pRspMsg->ack = pMsg->seq;
×
887
  pRspMsg->code = rspCode;
×
888
  pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;
×
889
  pRspMsg->payloadType = type;
×
890

891
  if (pBlock != NULL && blockLen > 0) {
×
892
    (void)memcpy(pRspMsg->data, pBlock, blockLen);
×
893
  }
894

895
  // send msg
896
  if ((code = syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg)) != 0) {
×
897
    sRError(pReceiver, "failed to send snapshot receiver resp since %s", tstrerror(code));
×
898
    TAOS_RETURN(code);
×
899
  }
900
  return 0;
×
901
}
902

903
static int32_t syncSnapBufferRecv(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend **ppMsg) {
×
904
  int32_t           code = 0;
×
905
  SSyncSnapBuffer  *pRcvBuf = pReceiver->pRcvBuf;
×
906
  SyncSnapshotSend *pMsg = ppMsg[0];
×
907

908
  (void)taosThreadMutexLock(&pRcvBuf->mutex);
×
909

910
  if (pMsg->seq - pRcvBuf->start >= pRcvBuf->size) {
×
911
    code = TSDB_CODE_SYN_BUFFER_FULL;
×
912
    goto _out;
×
913
  }
914

915
  if (!(pRcvBuf->start <= pRcvBuf->cursor + 1 && pRcvBuf->cursor < pRcvBuf->end)) return TSDB_CODE_SYN_INTERNAL_ERROR;
×
916

917
  if (pMsg->seq > pRcvBuf->cursor) {
×
918
    if (pRcvBuf->entries[pMsg->seq % pRcvBuf->size]) {
×
919
      pRcvBuf->entryDeleteCb(pRcvBuf->entries[pMsg->seq % pRcvBuf->size]);
×
920
    }
921
    pRcvBuf->entries[pMsg->seq % pRcvBuf->size] = pMsg;
×
922
    ppMsg[0] = NULL;
×
923
    pRcvBuf->end = TMAX(pMsg->seq + 1, pRcvBuf->end);
×
924
  } else if (pMsg->seq < pRcvBuf->start) {
×
925
    code = syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, code);
×
926
    goto _out;
×
927
  }
928

929
  for (int64_t seq = pRcvBuf->cursor + 1; seq < pRcvBuf->end; ++seq) {
×
930
    if (pRcvBuf->entries[seq % pRcvBuf->size]) {
×
931
      pRcvBuf->cursor = seq;
×
932
    } else {
933
      break;
×
934
    }
935
  }
936

937
  for (int64_t seq = pRcvBuf->start; seq <= pRcvBuf->cursor; ++seq) {
×
938
    if ((code = snapshotReceiverGotData(pReceiver, pRcvBuf->entries[seq % pRcvBuf->size])) != 0) {
×
939
      if (code >= SYNC_SNAPSHOT_SEQ_INVALID) {
×
940
        code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
941
      }
942
    }
943
    pRcvBuf->start = seq + 1;
×
944
    if (syncSnapSendRsp(pReceiver, pRcvBuf->entries[seq % pRcvBuf->size], NULL, 0, 0, code) != 0) {
×
945
      sError("failed to send snap rsp");
×
946
    }
947
    pRcvBuf->entryDeleteCb(pRcvBuf->entries[seq % pRcvBuf->size]);
×
948
    pRcvBuf->entries[seq % pRcvBuf->size] = NULL;
×
949
    if (code) goto _out;
×
950
  }
951

952
_out:
×
953
  (void)taosThreadMutexUnlock(&pRcvBuf->mutex);
×
954
  TAOS_RETURN(code);
×
955
}
956

957
static int32_t syncNodeOnSnapshotReceive(SSyncNode *pSyncNode, SyncSnapshotSend **ppMsg) {
×
958
  // condition 4
959
  // transfering
960
  SyncSnapshotSend *pMsg = ppMsg[0];
×
961
  if (!pMsg) return TSDB_CODE_SYN_INTERNAL_ERROR;
×
962
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
×
963
  int64_t                timeNow = taosGetTimestampMs();
×
964
  int32_t                code = 0;
×
965

966
  if (snapshotReceiverSignatureCmp(pReceiver, pMsg) != 0) {
×
967
    code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
968
    sRError(pReceiver, "failed to receive snapshot data since %s.", tstrerror(code));
×
969
    return syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, code);
×
970
  }
971

972
  return syncSnapBufferRecv(pReceiver, ppMsg);
×
973
}
974

975
static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
×
976
  // condition 2
977
  // end, finish FSM
978
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
×
979
  int64_t                timeNow = taosGetTimestampMs();
×
980
  int32_t                code = 0;
×
981

982
  if (snapshotReceiverSignatureCmp(pReceiver, pMsg) != 0) {
×
983
    sRError(pReceiver, "snapshot end failed since startTime:%" PRId64 " not equal to msg startTime:%" PRId64,
×
984
            pReceiver->startTime, pMsg->startTime);
985
    code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
986
    goto _SEND_REPLY;
×
987
  }
988

989
  code = snapshotReceiverFinish(pReceiver, pMsg);
×
990
  if (code == 0) {
×
991
    snapshotReceiverStop(pReceiver);
×
992
  }
993

994
_SEND_REPLY:;
×
995

996
  // build msg
997
  SRpcMsg rpcMsg = {0};
×
998
  if ((code = syncBuildSnapshotSendRsp(&rpcMsg, 0, pSyncNode->vgId)) != 0) {
×
999
    sRError(pReceiver, "snapshot receiver build rsp failed since %s", tstrerror(code));
×
1000
    TAOS_RETURN(code);
×
1001
  }
1002

1003
  SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
×
1004
  pRspMsg->srcId = pSyncNode->myRaftId;
×
1005
  pRspMsg->destId = pMsg->srcId;
×
1006
  pRspMsg->term = raftStoreGetTerm(pSyncNode);
×
1007
  pRspMsg->lastIndex = pMsg->lastIndex;
×
1008
  pRspMsg->lastTerm = pMsg->lastTerm;
×
1009
  pRspMsg->startTime = pMsg->startTime;
×
1010
  pRspMsg->ack = pReceiver->ack;  // receiver maybe already closed
×
1011
  pRspMsg->code = code;
×
1012
  pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;
×
1013

1014
  // send msg
1015
  syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver end", &rpcMsg.info.traceId);
×
1016
  if ((code = syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg)) != 0) {
×
1017
    sRError(pReceiver, "snapshot receiver send rsp failed since %s", tstrerror(code));
×
1018
    TAOS_RETURN(code);
×
1019
  }
1020

1021
  TAOS_RETURN(code);
×
1022
}
1023

1024
int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
×
1025
  SyncSnapshotSend     **ppMsg = (SyncSnapshotSend **)&pRpcMsg->pCont;
×
1026
  SyncSnapshotSend      *pMsg = ppMsg[0];
×
1027
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
×
1028
  int32_t                code = 0;
×
1029

1030
  // if already drop replica, do not process
1031
  if (!syncNodeInRaftGroup(pSyncNode, &pMsg->srcId)) {
×
1032
    syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "not in my config", &pRpcMsg->info.traceId);
×
1033
    code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
1034
    TAOS_RETURN(code);
×
1035
  }
1036

1037
  if (pMsg->term < raftStoreGetTerm(pSyncNode)) {
×
1038
    sRError(pReceiver, "reject snap replication with smaller term. msg term:%" PRId64 ", seq:%d", pMsg->term,
×
1039
            pMsg->seq);
1040
    code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
1041
    if (syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, code) != 0) sError("failed to send snap rsp");
×
1042
    TAOS_RETURN(code);
×
1043
  }
1044

1045
  if (pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole != TAOS_SYNC_ROLE_LEARNER) {
×
1046
    if (pMsg->term > raftStoreGetTerm(pSyncNode)) {
×
1047
      syncNodeStepDown(pSyncNode, pMsg->term, pMsg->srcId);
×
1048
    }
1049
  } else {
1050
    syncNodeUpdateTermWithoutStepDown(pSyncNode, pMsg->term);
×
1051
  }
1052

1053
  if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER && pSyncNode->state != TAOS_SYNC_STATE_LEARNER) {
×
1054
    sRError(pReceiver, "snapshot receiver not a follower or learner");
×
1055
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1056
    TAOS_RETURN(code);
×
1057
  }
1058

1059
  if (pMsg->seq < SYNC_SNAPSHOT_SEQ_PREP || pMsg->seq > SYNC_SNAPSHOT_SEQ_END) {
×
1060
    sRError(pReceiver, "snap replication msg with invalid seq:%d", pMsg->seq);
×
1061
    code = TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG;
×
1062
    TAOS_RETURN(code);
×
1063
  }
1064

1065
  // prepare
1066
  if (pMsg->seq == SYNC_SNAPSHOT_SEQ_PREP) {
×
1067
    sInfo("vgId:%d, prepare snap replication. msg signature:(%" PRId64 ", %" PRId64 ")", pSyncNode->vgId, pMsg->term,
×
1068
          pMsg->startTime);
1069
    code = syncNodeOnSnapshotPrep(pSyncNode, pMsg);
×
1070
    goto _out;
×
1071
  }
1072

1073
  // begin
1074
  if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) {
×
1075
    sInfo("vgId:%d, begin snap replication. msg signature:(%" PRId64 ", %" PRId64 ")", pSyncNode->vgId, pMsg->term,
×
1076
          pMsg->startTime);
1077
    code = syncNodeOnSnapshotBegin(pSyncNode, pMsg);
×
1078
    goto _out;
×
1079
  }
1080

1081
  // data
1082
  if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
×
1083
    code = syncNodeOnSnapshotReceive(pSyncNode, ppMsg);
×
1084
    goto _out;
×
1085
  }
1086

1087
  // end
1088
  if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
×
1089
    sInfo("vgId:%d, end snap replication. msg signature:(%" PRId64 ", %" PRId64 ")", pSyncNode->vgId, pMsg->term,
×
1090
          pMsg->startTime);
1091
    code = syncNodeOnSnapshotEnd(pSyncNode, pMsg);
×
1092
    if (code != 0) {
×
1093
      sRError(pReceiver, "failed to end snapshot.");
×
1094
      goto _out;
×
1095
    }
1096

1097
    code = syncLogBufferReInit(pSyncNode->pLogBuf, pSyncNode);
×
1098
    if (code != 0) {
×
1099
      sRError(pReceiver, "failed to reinit log buffer since %s", tstrerror(code));
×
1100
    }
1101
    goto _out;
×
1102
  }
1103

1104
_out:;
×
1105
  syncNodeResetElectTimer(pSyncNode);
×
1106
  TAOS_RETURN(code);
×
1107
}
1108

1109
static int32_t syncSnapSenderExchgSnapInfo(SSyncNode *pSyncNode, SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
×
1110
  if (pMsg->payloadType != TDMT_SYNC_PREP_SNAPSHOT_REPLY) return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1111

1112
  SSyncTLV *datHead = (void *)pMsg->data;
×
1113
  if (datHead->typ != pMsg->payloadType) {
×
1114
    sSError(pSender, "unexpected data type in data of SyncSnapshotRsp. typ: %d", datHead->typ);
×
1115
    TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT);
×
1116
  }
1117
  int32_t dataLen = sizeof(SSyncTLV) + datHead->len;
×
1118

1119
  SSnapshotParam *pParam = &pSender->snapshotParam;
×
1120
  void           *data = taosMemoryRealloc(pParam->data, dataLen);
×
1121
  if (data == NULL) {
×
1122
    TAOS_RETURN(terrno);
×
1123
  }
1124
  (void)memcpy(data, pMsg->data, dataLen);
×
1125

1126
  pParam->data = data;
×
1127
  data = NULL;
×
1128
  sSInfo(pSender, "data of snapshot param. len: %d", datHead->len);
×
1129
  return 0;
×
1130
}
1131

1132
// sender
1133
static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
×
1134
  int32_t   code = 0;
×
1135
  SSnapshot snapshot = {0};
×
1136

1137
  if (pMsg->snapBeginIndex > pSyncNode->commitIndex) {
×
1138
    sSError(pSender,
×
1139
            "snapshot begin index is greater than commit index. snapBeginIndex:%" PRId64 ", commitIndex:%" PRId64,
1140
            pMsg->snapBeginIndex, pSyncNode->commitIndex);
1141
    TAOS_RETURN(TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG);
×
1142
  }
1143

1144
  (void)taosThreadMutexLock(&pSender->pSndBuf->mutex);
×
1145
  TAOS_CHECK_GOTO(pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot), NULL, _out);
×
1146

1147
  // prepare <begin, end>
1148
  pSender->snapshotParam.start = pMsg->snapBeginIndex;
×
1149
  pSender->snapshotParam.end = snapshot.lastApplyIndex;
×
1150

1151
  sSInfo(pSender, "prepare snapshot, recv-begin:%" PRId64 ", snapshot.last:%" PRId64 ", snapshot.term:%" PRId64,
×
1152
         pMsg->snapBeginIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
1153

1154
  // update sender
1155
  pSender->snapshot = snapshot;
×
1156

1157
  // start reader
1158
  if (pMsg->payloadType == TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
×
1159
    TAOS_CHECK_GOTO(syncSnapSenderExchgSnapInfo(pSyncNode, pSender, pMsg), NULL, _out);
×
1160
  }
1161

1162
  code = pSyncNode->pFsm->FpSnapshotStartRead(pSyncNode->pFsm, &pSender->snapshotParam, &pSender->pReader);
×
1163
  if (code != 0) {
×
1164
    sSError(pSender, "prepare snapshot failed since %s", tstrerror(code));
×
1165
    goto _out;
×
1166
  }
1167

1168
  // update next index
1169
  syncIndexMgrSetIndex(pSyncNode->pNextIndex, &pMsg->srcId, snapshot.lastApplyIndex + 1);
×
1170

1171
  code = snapshotSend(pSender);
×
1172

1173
_out:
×
1174
  (void)taosThreadMutexUnlock(&pSender->pSndBuf->mutex);
×
1175
  TAOS_RETURN(code);
×
1176
}
1177

1178
static int32_t snapshotSenderSignatureCmp(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
×
1179
  if (pSender->term < pMsg->term) return -1;
×
1180
  if (pSender->term > pMsg->term) return 1;
×
1181
  if (pSender->startTime < pMsg->startTime) return -1;
×
1182
  if (pSender->startTime > pMsg->startTime) return 1;
×
1183
  return 0;
×
1184
}
1185

1186
static int32_t syncSnapBufferSend(SSyncSnapshotSender *pSender, SyncSnapshotRsp **ppMsg) {
×
1187
  int32_t          code = 0;
×
1188
  SSyncSnapBuffer *pSndBuf = pSender->pSndBuf;
×
1189
  SyncSnapshotRsp *pMsg = ppMsg[0];
×
1190

1191
  (void)taosThreadMutexLock(&pSndBuf->mutex);
×
1192
  if (snapshotSenderSignatureCmp(pSender, pMsg) != 0) {
×
1193
    code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
1194
    goto _out;
×
1195
  }
1196

1197
  if (pSender->pReader == NULL || pSender->finish || !snapshotSenderIsStart(pSender)) {
×
1198
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1199
    goto _out;
×
1200
  }
1201

1202
  if (pMsg->ack - pSndBuf->start >= pSndBuf->size) {
×
1203
    code = TSDB_CODE_SYN_BUFFER_FULL;
×
1204
    goto _out;
×
1205
  }
1206

1207
  if (!(pSndBuf->start <= pSndBuf->cursor + 1 && pSndBuf->cursor < pSndBuf->end)) {
×
1208
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1209
    goto _out;
×
1210
  }
1211

1212
  if (pMsg->ack > pSndBuf->cursor && pMsg->ack < pSndBuf->end) {
×
1213
    SyncSnapBlock *pBlk = pSndBuf->entries[pMsg->ack % pSndBuf->size];
×
1214
    if (!pBlk) {
×
1215
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1216
      goto _out;
×
1217
    }
1218
    pBlk->acked = 1;
×
1219
  }
1220

1221
  for (int64_t ack = pSndBuf->cursor + 1; ack < pSndBuf->end; ++ack) {
×
1222
    SyncSnapBlock *pBlk = pSndBuf->entries[ack % pSndBuf->size];
×
1223
    if (pBlk->acked) {
×
1224
      pSndBuf->cursor = ack;
×
1225
    } else {
1226
      break;
×
1227
    }
1228
  }
1229

1230
  for (int64_t ack = pSndBuf->start; ack <= pSndBuf->cursor; ++ack) {
×
1231
    pSndBuf->entryDeleteCb(pSndBuf->entries[ack % pSndBuf->size]);
×
1232
    pSndBuf->entries[ack % pSndBuf->size] = NULL;
×
1233
    pSndBuf->start = ack + 1;
×
1234
  }
1235

1236
  while (pSender->seq != SYNC_SNAPSHOT_SEQ_END && pSender->seq - pSndBuf->start < tsSnapReplMaxWaitN) {
×
1237
    if ((code = snapshotSend(pSender)) != 0) {
×
1238
      goto _out;
×
1239
    }
1240
  }
1241

1242
  if (pSender->seq == SYNC_SNAPSHOT_SEQ_END && pSndBuf->end <= pSndBuf->start) {
×
1243
    if ((code = snapshotSend(pSender)) != 0) {
×
1244
      goto _out;
×
1245
    }
1246
  }
1247
_out:
×
1248
  (void)taosThreadMutexUnlock(&pSndBuf->mutex);
×
1249
  TAOS_RETURN(code);
×
1250
}
1251

1252
int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
×
1253
  SyncSnapshotRsp **ppMsg = (SyncSnapshotRsp **)&pRpcMsg->pCont;
×
1254
  SyncSnapshotRsp  *pMsg = ppMsg[0];
×
1255
  int32_t           code = 0;
×
1256

1257
  // if already drop replica, do not process
1258
  if (!syncNodeInRaftGroup(pSyncNode, &pMsg->srcId)) {
×
1259
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "maybe replica already dropped", &pRpcMsg->info.traceId);
×
1260
    TAOS_RETURN(TSDB_CODE_SYN_MISMATCHED_SIGNATURE);
×
1261
  }
1262

1263
  // get sender
1264
  SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, &pMsg->srcId);
×
1265
  if (pSender == NULL) {
×
1266
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "sender is null", &pRpcMsg->info.traceId);
×
1267
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
1268
  }
1269

1270
  if (!snapshotSenderIsStart(pSender)) {
×
1271
    sSError(pSender, "snapshot sender stopped. sender startTime:%" PRId64 ", msg startTime:%" PRId64,
×
1272
            pSender->startTime, pMsg->startTime);
1273
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
1274
  }
1275

1276
  // check signature
1277
  int32_t order = 0;
×
1278
  if ((order = snapshotSenderSignatureCmp(pSender, pMsg)) > 0) {
×
1279
    sSWarn(pSender, "ignore a stale snap rsp, msg signature:(%" PRId64 ", %" PRId64 ").", pMsg->term, pMsg->startTime);
×
1280
    TAOS_RETURN(TSDB_CODE_SYN_MISMATCHED_SIGNATURE);
×
1281
  } else if (order < 0) {
×
1282
    sSError(pSender, "snapshot sender is stale. stop");
×
1283
    code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
1284
    goto _ERROR;
×
1285
  }
1286

1287
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
×
1288
    sSError(pSender, "snapshot sender not leader");
×
1289
    code = TSDB_CODE_SYN_NOT_LEADER;
×
1290
    goto _ERROR;
×
1291
  }
1292

1293
  SyncTerm currentTerm = raftStoreGetTerm(pSyncNode);
×
1294
  if (pMsg->term != currentTerm) {
×
1295
    sSError(pSender, "snapshot sender term mismatch, msg term:%" PRId64 " currentTerm:%" PRId64, pMsg->term,
×
1296
            currentTerm);
1297
    code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
1298
    goto _ERROR;
×
1299
  }
1300

1301
  if (pMsg->code != 0) {
×
1302
    sSError(pSender, "snapshot sender receive error:%s 0x%x and stop sender", tstrerror(pMsg->code), pMsg->code);
×
1303
    code = pMsg->code;
×
1304
    goto _ERROR;
×
1305
  }
1306

1307
  // send begin
1308
  if (pMsg->ack == SYNC_SNAPSHOT_SEQ_PREP) {
×
1309
    sSInfo(pSender, "process prepare rsp");
×
1310
    if ((code = syncNodeOnSnapshotPrepRsp(pSyncNode, pSender, pMsg)) != 0) {
×
1311
      goto _ERROR;
×
1312
    }
1313
  }
1314

1315
  // send msg of data or end
1316
  if (pMsg->ack >= SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->ack < SYNC_SNAPSHOT_SEQ_END) {
×
1317
    if ((code = syncSnapBufferSend(pSender, ppMsg)) != 0) {
×
1318
      sSError(pSender, "failed to replicate snap since %s. seq:%d, pReader:%p, finish:%d", tstrerror(code),
×
1319
              pSender->seq, pSender->pReader, pSender->finish);
1320
      goto _ERROR;
×
1321
    }
1322
  }
1323

1324
  // end
1325
  if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
×
1326
    sSInfo(pSender, "process end rsp");
×
1327
    snapshotSenderStop(pSender, true);
×
1328
    TAOS_CHECK_GOTO(syncNodeReplicateReset(pSyncNode, &pMsg->srcId), NULL, _ERROR);
×
1329
  }
1330

1331
  return 0;
×
1332

1333
_ERROR:
×
1334
  snapshotSenderStop(pSender, false);
×
1335
  if (syncNodeReplicateReset(pSyncNode, &pMsg->srcId) != 0) sError("failed to reset replicate");
×
1336
  TAOS_RETURN(code);
×
1337
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc