• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4788

14 Oct 2025 11:21AM UTC coverage: 60.992% (-2.3%) from 63.264%
#4788

push

travis-ci

web-flow
Merge 7ca9b50f9 into 19574fe21

154868 of 324306 branches covered (47.75%)

Branch coverage included in aggregate %.

207304 of 269498 relevant lines covered (76.92%)

125773493.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.05
/source/libs/sync/src/syncSnapshot.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "syncSnapshot.h"
18
#include "syncIndexMgr.h"
19
#include "syncPipeline.h"
20
#include "syncRaftCfg.h"
21
#include "syncRaftLog.h"
22
#include "syncRaftStore.h"
23
#include "syncReplication.h"
24
#include "syncUtil.h"
25
#include "tglobal.h"
26

27
static SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths);
28

29
static void syncSnapBufferReset(SSyncSnapBuffer *pBuf) {
151,466,798✔
30
  for (int64_t i = pBuf->start; i < pBuf->end; ++i) {
151,466,798!
31
    if (pBuf->entryDeleteCb) {
×
32
      pBuf->entryDeleteCb(pBuf->entries[i % pBuf->size]);
×
33
    }
34
    pBuf->entries[i % pBuf->size] = NULL;
×
35
  }
36
  pBuf->start = SYNC_SNAPSHOT_SEQ_BEGIN + 1;
151,466,910✔
37
  pBuf->end = pBuf->start;
151,467,445✔
38
  pBuf->cursor = pBuf->start - 1;
151,466,970✔
39
}
151,467,590✔
40

41
static void syncSnapBufferDestroy(SSyncSnapBuffer **ppBuf) {
75,568,973✔
42
  if (ppBuf == NULL || ppBuf[0] == NULL) return;
75,568,973!
43
  SSyncSnapBuffer *pBuf = ppBuf[0];
75,568,973✔
44

45
  syncSnapBufferReset(pBuf);
75,568,973✔
46

47
  (void)taosThreadMutexDestroy(&pBuf->mutex);
75,569,416✔
48
  taosMemoryFree(ppBuf[0]);
75,568,608!
49
  ppBuf[0] = NULL;
75,564,330✔
50
  return;
75,564,330✔
51
}
52

53
static int32_t syncSnapBufferCreate(SSyncSnapBuffer **ppBuf) {
75,874,721✔
54
  SSyncSnapBuffer *pBuf = taosMemoryCalloc(1, sizeof(SSyncSnapBuffer));
75,874,721!
55
  if (pBuf == NULL) {
75,870,394!
56
    *ppBuf = NULL;
×
57
    TAOS_RETURN(terrno);
×
58
  }
59
  pBuf->size = sizeof(pBuf->entries) / sizeof(void *);
75,870,394✔
60
  if (pBuf->size != TSDB_SYNC_SNAP_BUFFER_SIZE) return TSDB_CODE_SYN_INTERNAL_ERROR;
75,871,195!
61
  (void)taosThreadMutexInit(&pBuf->mutex, NULL);
75,871,195✔
62
  *ppBuf = pBuf;
75,875,965✔
63
  TAOS_RETURN(0);
75,875,965✔
64
}
65

66
int32_t snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex, SSyncSnapshotSender **ppSender) {
71,189,885✔
67
  int32_t code = 0;
71,189,885✔
68
  *ppSender = NULL;
71,189,885✔
69
  bool condition = (pSyncNode->pFsm->FpSnapshotStartRead != NULL) && (pSyncNode->pFsm->FpSnapshotStopRead != NULL) &&
142,380,623✔
70
                   (pSyncNode->pFsm->FpSnapshotDoRead != NULL);
71,189,989!
71
  if (!condition) {
71,192,044!
72
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
73
  }
74

75
  SSyncSnapshotSender *pSender = taosMemoryCalloc(1, sizeof(SSyncSnapshotSender));
71,192,044!
76
  if (pSender == NULL) {
71,181,532!
77
    TAOS_RETURN(terrno);
×
78
  }
79

80
  pSender->start = false;
71,181,532✔
81
  pSender->seq = SYNC_SNAPSHOT_SEQ_INVALID;
71,181,532✔
82
  pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
71,181,122✔
83
  pSender->pReader = NULL;
71,182,233✔
84
  pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS;
71,182,212✔
85
  pSender->pSyncNode = pSyncNode;
71,183,251✔
86
  pSender->replicaIndex = replicaIndex;
71,183,984✔
87
  pSender->term = raftStoreGetTerm(pSyncNode);
71,184,125✔
88
  pSender->startTime = -1;
71,191,325✔
89
  pSender->finish = false;
71,191,325✔
90

91
  code = pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &pSender->snapshot);
71,191,860✔
92
  if (code != 0) {
71,190,907!
93
    taosMemoryFreeClear(pSender);
×
94
    TAOS_RETURN(code);
×
95
  }
96
  SSyncSnapBuffer *pSndBuf = NULL;
71,190,907✔
97
  code = syncSnapBufferCreate(&pSndBuf);
71,190,907✔
98
  if (pSndBuf == NULL) {
71,190,769!
99
    taosMemoryFreeClear(pSender);
×
100
    TAOS_RETURN(code);
×
101
  }
102
  pSndBuf->entryDeleteCb = syncSnapBlockDestroy;
71,190,769✔
103
  pSender->pSndBuf = pSndBuf;
71,190,769✔
104

105
  syncSnapBufferReset(pSender->pSndBuf);
71,190,769✔
106
  *ppSender = pSender;
71,189,723✔
107
  TAOS_RETURN(code);
71,189,723✔
108
}
109

110
void syncSnapBlockDestroy(void *ptr) {
275,908✔
111
  SyncSnapBlock *pBlk = ptr;
275,908✔
112
  if (pBlk->pBlock != NULL) {
275,908✔
113
    taosMemoryFree(pBlk->pBlock);
264,219!
114
    pBlk->pBlock = NULL;
264,219✔
115
    pBlk->blockLen = 0;
264,219✔
116
  }
117
  taosMemoryFree(pBlk);
275,908!
118
}
275,908✔
119

120
static int32_t snapshotSenderClearInfoData(SSyncSnapshotSender *pSender) {
70,893,576✔
121
  if (pSender->snapshotParam.data) {
70,893,576✔
122
    taosMemoryFree(pSender->snapshotParam.data);
11,689!
123
    pSender->snapshotParam.data = NULL;
11,689✔
124
  }
125

126
  if (pSender->snapshot.data) {
70,894,404!
127
    taosMemoryFree(pSender->snapshot.data);
×
128
    pSender->snapshot.data = NULL;
×
129
  }
130
  return 0;
70,895,430✔
131
}
132

133
void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
70,884,282✔
134
  if (pSender == NULL) return;
70,884,282!
135

136
  // close reader
137
  if (pSender->pReader != NULL) {
70,884,282!
138
    pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
×
139
    pSender->pReader = NULL;
×
140
  }
141

142
  // free snap buffer
143
  if (pSender->pSndBuf) {
70,884,282!
144
    syncSnapBufferDestroy(&pSender->pSndBuf);
70,884,725✔
145
  }
146

147
  (void)snapshotSenderClearInfoData(pSender);
70,880,558✔
148

149
  // free sender
150
  taosMemoryFree(pSender);
70,883,741!
151
}
152

153
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return atomic_load_8(&pSender->start); }
70,546,514✔
154

155
int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
11,689✔
156
  int32_t code = 0;
11,689✔
157

158
  int8_t started = atomic_val_compare_exchange_8(&pSender->start, false, true);
11,689✔
159
  if (started) return 0;
11,689!
160

161
  pSender->seq = SYNC_SNAPSHOT_SEQ_PREP;
11,689✔
162
  pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
11,689✔
163
  pSender->pReader = NULL;
11,689✔
164
  pSender->snapshotParam.start = SYNC_INDEX_INVALID;
11,689✔
165
  pSender->snapshotParam.end = SYNC_INDEX_INVALID;
11,689✔
166
  pSender->snapshot.data = NULL;
11,689✔
167
  pSender->snapshotParam.end = SYNC_INDEX_INVALID;
11,689✔
168
  pSender->snapshot.lastApplyIndex = SYNC_INDEX_INVALID;
11,689✔
169
  pSender->snapshot.lastApplyTerm = SYNC_TERM_INVALID;
11,689✔
170
  pSender->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;
11,689✔
171

172
  (void)memset(&pSender->lastConfig, 0, sizeof(pSender->lastConfig));
11,689!
173
  pSender->sendingMS = 0;
11,689✔
174
  pSender->term = raftStoreGetTerm(pSender->pSyncNode);
11,689✔
175
  pSender->startTime = taosGetMonoTimestampMs();
11,689✔
176
  pSender->lastSendTime = taosGetTimestampMs();
11,689✔
177
  pSender->finish = false;
11,689✔
178

179
  // Get snapshot info
180
  SSyncNode *pSyncNode = pSender->pSyncNode;
11,689✔
181
  SSnapshot  snapInfo = {.type = TDMT_SYNC_PREP_SNAPSHOT};
11,689✔
182
  if ((code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapInfo)) != 0) {
11,689!
183
    sSError(pSender, "snapshot get info failure since %s", tstrerror(code));
×
184
    goto _out;
×
185
  }
186

187
  void   *pData = snapInfo.data;
11,689✔
188
  int32_t type = (pData) ? snapInfo.type : 0;
11,689!
189
  int32_t dataLen = 0;
11,689✔
190
  if (pData) {
11,689!
191
    SSyncTLV *datHead = pData;
11,689✔
192
    if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT) {
11,689!
193
      sSError(pSender, "unexpected data typ in data of snapshot info. typ: %d", datHead->typ);
×
194
      code = TSDB_CODE_INVALID_DATA_FMT;
×
195
      goto _out;
×
196
    }
197
    dataLen = sizeof(SSyncTLV) + datHead->len;
11,689✔
198
  }
199

200
  sInfo("vgId:%d, send msg:%s", pSyncNode->vgId, TMSG_INFO(type));
11,689!
201
  if ((code = syncSnapSendMsg(pSender, pSender->seq, pData, dataLen, type)) != 0) {
11,689!
202
    goto _out;
×
203
  }
204

205
  SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
11,689✔
206
  sSInfo(pSender, "snapshot sender start, to dnode:%d.", DID(&destId));
11,689!
207
_out:
11,689✔
208
  if (snapInfo.data) {
11,689!
209
    taosMemoryFree(snapInfo.data);
11,689!
210
    snapInfo.data = NULL;
11,689✔
211
  }
212
  TAOS_RETURN(code);
11,689✔
213
}
214

215
void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
11,689✔
216
  sSDebug(pSender, "snapshot sender stop, finish:%d reader:%p", finish, pSender->pReader);
11,689!
217

218
  // update flag
219
  int8_t stopped = !atomic_val_compare_exchange_8(&pSender->start, true, false);
11,689✔
220
  if (stopped) return;
11,689!
221
  (void)taosThreadMutexLock(&pSender->pSndBuf->mutex);
11,689✔
222
  {
223
    pSender->finish = finish;
11,689✔
224

225
    // close reader
226
    if (pSender->pReader != NULL) {
11,689!
227
      pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
11,689✔
228
      pSender->pReader = NULL;
11,689✔
229
    }
230

231
    syncSnapBufferReset(pSender->pSndBuf);
11,689✔
232

233
    (void)snapshotSenderClearInfoData(pSender);
11,689✔
234

235
    SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
11,689✔
236
    sSInfo(pSender, "snapshot sender stop, to dnode:%d, finish:%d", DID(&destId), finish);
11,689!
237
  }
238
  (void)taosThreadMutexUnlock(&pSender->pSndBuf->mutex);
11,689✔
239
}
240

241
int32_t syncSnapSendMsg(SSyncSnapshotSender *pSender, int32_t seq, void *pBlock, int32_t blockLen, int32_t typ) {
299,286✔
242
  int32_t code = 0;
299,286✔
243
  SRpcMsg rpcMsg = {0};
299,286✔
244

245
  if ((code = syncBuildSnapshotSend(&rpcMsg, blockLen, pSender->pSyncNode->vgId)) != 0) {
299,286!
246
    sSError(pSender, "failed to build snap replication msg since %s", tstrerror(code));
×
247
    goto _OUT;
×
248
  }
249

250
  SyncSnapshotSend *pMsg = rpcMsg.pCont;
299,286✔
251
  pMsg->srcId = pSender->pSyncNode->myRaftId;
299,286✔
252
  pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
299,286✔
253
  pMsg->term = pSender->term;
299,286✔
254
  pMsg->beginIndex = pSender->snapshotParam.start;
299,286✔
255
  pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
299,286✔
256
  pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
299,286✔
257
  pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
299,286✔
258
  pMsg->lastConfig = pSender->lastConfig;
299,286✔
259
  pMsg->startTime = pSender->startTime;
299,286✔
260
  pMsg->seq = seq;
299,286✔
261

262
  if (pBlock != NULL && blockLen > 0) {
299,286!
263
    (void)memcpy(pMsg->data, pBlock, blockLen);
275,908!
264
  }
265
  pMsg->payloadType = typ;
299,286✔
266

267
  // send msg
268
  if ((code = syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg)) != 0) {
299,286!
269
    sSError(pSender, "failed to send snap replication msg since %s. seq:%d", tstrerror(code), seq);
×
270
    goto _OUT;
×
271
  }
272

273
_OUT:
299,286✔
274
  TAOS_RETURN(code);
299,286✔
275
}
276

277
// when sender receive ack, call this function to send msg from seq
278
// seq = ack + 1, already updated
279
static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
299,286✔
280
  int32_t        code = 0;
299,286✔
281
  SyncSnapBlock *pBlk = NULL;
299,286✔
282

283
  if (pSender->seq < SYNC_SNAPSHOT_SEQ_END) {
299,286✔
284
    pSender->seq++;
287,597✔
285

286
    if (pSender->seq > SYNC_SNAPSHOT_SEQ_BEGIN) {
287,597✔
287
      pBlk = taosMemoryCalloc(1, sizeof(SyncSnapBlock));
275,908!
288
      if (pBlk == NULL) {
275,908!
289
        code = terrno;
×
290
        goto _OUT;
×
291
      }
292

293
      pBlk->seq = pSender->seq;
275,908✔
294

295
      // read data
296
      code = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader, &pBlk->pBlock,
275,908✔
297
                                                        &pBlk->blockLen);
298
      if (code != 0) {
275,908!
299
        sSError(pSender, "snapshot sender read failed since %s", tstrerror(code));
×
300
        goto _OUT;
×
301
      }
302

303
      if (pBlk->blockLen > 0) {
275,908✔
304
        // has read data
305
        sSDebug(pSender, "snapshot sender continue to read, blockLen:%d seq:%d", pBlk->blockLen, pBlk->seq);
264,219!
306
      } else {
307
        // read finish, update seq to end
308
        pSender->seq = SYNC_SNAPSHOT_SEQ_END;
11,689✔
309
        sSInfo(pSender, "snapshot sender read to the end");
11,689!
310
        goto _OUT;
11,689✔
311
      }
312
    }
313
  }
314

315
  if (!(pSender->seq >= SYNC_SNAPSHOT_SEQ_BEGIN && pSender->seq <= SYNC_SNAPSHOT_SEQ_END)) {
287,597!
316
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
317
    goto _OUT;
×
318
  }
319

320
  // send msg
321
  int32_t blockLen = (pBlk) ? pBlk->blockLen : 0;
287,597✔
322
  void   *pBlock = (pBlk) ? pBlk->pBlock : NULL;
287,597✔
323
  if ((code = syncSnapSendMsg(pSender, pSender->seq, pBlock, blockLen, 0)) != 0) {
287,597!
324
    goto _OUT;
×
325
  }
326

327
  // put in buffer
328
  int64_t nowMs = taosGetTimestampMs();
287,597✔
329
  if (pBlk) {
287,597✔
330
    if (!(pBlk->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pBlk->seq < SYNC_SNAPSHOT_SEQ_END)) {
264,219!
331
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
332
      goto _OUT;
×
333
    }
334
    pBlk->sendTimeMs = nowMs;
264,219✔
335
    pSender->pSndBuf->entries[pSender->seq % pSender->pSndBuf->size] = pBlk;
264,219!
336
    pBlk = NULL;
264,219✔
337
    pSender->pSndBuf->end = TMAX(pSender->seq + 1, pSender->pSndBuf->end);
264,219✔
338
  }
339
  pSender->lastSendTime = nowMs;
287,597✔
340

341
_OUT:;
299,286✔
342
  if (pBlk != NULL) {
299,286✔
343
    syncSnapBlockDestroy(pBlk);
11,689✔
344
    pBlk = NULL;
11,689✔
345
  }
346
  TAOS_RETURN(code);
299,286✔
347
}
348

349
// send snapshot data from cache
350
int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
×
351
  SSyncSnapBuffer *pSndBuf = pSender->pSndBuf;
×
352
  int32_t          code = 0;
×
353
  (void)taosThreadMutexLock(&pSndBuf->mutex);
×
354
  if (pSender->pReader == NULL || pSender->finish || !snapshotSenderIsStart(pSender)) {
×
355
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
356
    goto _out;
×
357
  }
358

359
  for (int32_t seq = pSndBuf->cursor + 1; seq < pSndBuf->end; ++seq) {
×
360
    SyncSnapBlock *pBlk = pSndBuf->entries[seq % pSndBuf->size];
×
361
    if (!pBlk) {
×
362
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
363
      goto _out;
×
364
    }
365
    int64_t nowMs = taosGetTimestampMs();
×
366
    if (pBlk->acked || nowMs < pBlk->sendTimeMs + SYNC_SNAP_RESEND_MS) {
×
367
      continue;
×
368
    }
369
    if ((code = syncSnapSendMsg(pSender, pBlk->seq, pBlk->pBlock, pBlk->blockLen, 0)) != 0) {
×
370
      goto _out;
×
371
    }
372
    pBlk->sendTimeMs = nowMs;
×
373
  }
374

375
  if (pSender->seq != SYNC_SNAPSHOT_SEQ_END && pSndBuf->end <= pSndBuf->start) {
×
376
    if ((code = snapshotSend(pSender)) != 0) {
×
377
      goto _out;
×
378
    }
379
  }
380

381
  if (pSender->seq == SYNC_SNAPSHOT_SEQ_END && pSndBuf->end <= pSndBuf->start) {
×
382
    if ((code = syncSnapSendMsg(pSender, pSender->seq, NULL, 0, 0)) != 0) {
×
383
      goto _out;
×
384
    }
385
  }
386
_out:;
×
387
  (void)taosThreadMutexUnlock(&pSndBuf->mutex);
×
388
  TAOS_RETURN(code);
×
389
}
390

391
int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) {
14,335✔
392
  SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, pDestId);
14,335✔
393
  if (pSender == NULL) {
14,335!
394
    sNError(pSyncNode, "snapshot sender start error since get failed");
×
395
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
396
  }
397

398
  if (snapshotSenderIsStart(pSender)) {
14,335✔
399
    sSDebug(pSender, "snapshot sender already start, ignore");
2,646!
400
    return 0;
2,646✔
401
  }
402

403
  taosMsleep(1);
11,689✔
404

405
  int32_t code = snapshotSenderStart(pSender);
11,689✔
406
  if (code != 0) {
11,689!
407
    sSError(pSender, "snapshot sender start error since %s", tstrerror(code));
×
408
    TAOS_RETURN(code);
×
409
  }
410

411
  return 0;
11,689✔
412
}
413

414
// receiver
415
int32_t snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId, SSyncSnapshotReceiver **ppReceiver) {
4,684,294✔
416
  int32_t code = 0;
4,684,294✔
417
  *ppReceiver = NULL;
4,684,294✔
418
  bool condition = (pSyncNode->pFsm->FpSnapshotStartWrite != NULL) && (pSyncNode->pFsm->FpSnapshotStopWrite != NULL) &&
9,368,950!
419
                   (pSyncNode->pFsm->FpSnapshotDoWrite != NULL);
4,684,475!
420
  if (!condition) {
4,684,475!
421
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
422
  }
423

424
  SSyncSnapshotReceiver *pReceiver = taosMemoryCalloc(1, sizeof(SSyncSnapshotReceiver));
4,684,475!
425
  if (pReceiver == NULL) {
4,684,475!
426
    TAOS_RETURN(terrno);
×
427
  }
428

429
  pReceiver->start = false;
4,684,475✔
430
  pReceiver->startTime = 0;
4,684,475✔
431
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
4,684,475✔
432
  pReceiver->pWriter = NULL;
4,684,475✔
433
  code = taosThreadMutexInit(&pReceiver->writerMutex, NULL);
4,684,475✔
434
  if (code != 0) {
4,684,475!
435
    taosMemoryFree(pReceiver);
×
436
    pReceiver = NULL;
×
437
    TAOS_RETURN(code);
×
438
  }
439
  pReceiver->pSyncNode = pSyncNode;
4,684,475✔
440
  pReceiver->fromId = fromId;
4,684,475✔
441
  pReceiver->term = raftStoreGetTerm(pSyncNode);
4,684,475✔
442
  pReceiver->snapshot.data = NULL;
4,684,475✔
443
  pReceiver->snapshot.lastApplyIndex = SYNC_INDEX_INVALID;
4,684,475✔
444
  pReceiver->snapshot.lastApplyTerm = 0;
4,684,475✔
445
  pReceiver->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;
4,684,475✔
446

447
  SSyncSnapBuffer *pRcvBuf = NULL;
4,684,475✔
448
  code = syncSnapBufferCreate(&pRcvBuf);
4,684,475✔
449
  if (pRcvBuf == NULL) {
4,684,475!
450
    int32_t ret = taosThreadMutexDestroy(&pReceiver->writerMutex);
×
451
    if (ret != 0) {
×
452
      sError("failed to destroy mutex since %s", tstrerror(ret));
×
453
    }
454
    taosMemoryFree(pReceiver);
×
455
    pReceiver = NULL;
×
456
    TAOS_RETURN(code);
×
457
  }
458
  pRcvBuf->entryDeleteCb = rpcFreeCont;
4,684,475✔
459
  pReceiver->pRcvBuf = pRcvBuf;
4,684,475✔
460

461
  syncSnapBufferReset(pReceiver->pRcvBuf);
4,684,475✔
462
  *ppReceiver = pReceiver;
4,684,475✔
463
  TAOS_RETURN(code);
4,684,475✔
464
}
465

466
static int32_t snapshotReceiverClearInfoData(SSyncSnapshotReceiver *pReceiver) {
4,694,681✔
467
  if (pReceiver->snapshotParam.data) {
4,694,681✔
468
    taosMemoryFree(pReceiver->snapshotParam.data);
11,241!
469
    pReceiver->snapshotParam.data = NULL;
11,241✔
470
  }
471

472
  if (pReceiver->snapshot.data) {
4,695,489!
473
    taosMemoryFree(pReceiver->snapshot.data);
×
474
    pReceiver->snapshot.data = NULL;
×
475
  }
476
  return 0;
4,694,570✔
477
}
478

479
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
4,684,248✔
480
  if (pReceiver == NULL) return;
4,684,248!
481

482
  (void)taosThreadMutexLock(&pReceiver->writerMutex);
4,684,248✔
483
  // close writer
484
  if (pReceiver->pWriter != NULL) {
4,684,248!
485
    int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
×
486
                                                                   false, &pReceiver->snapshot);
487
    if (code != 0) {
×
488
      sError("vgId:%d, snapshot receiver stop failed while destroy since %s", pReceiver->pSyncNode->vgId,
×
489
             tstrerror(code));
490
    }
491
    pReceiver->pWriter = NULL;
×
492
  }
493
  (void)taosThreadMutexUnlock(&pReceiver->writerMutex);
4,684,248✔
494

495
  (void)taosThreadMutexDestroy(&pReceiver->writerMutex);
4,684,248✔
496

497
  // free snap buf
498
  if (pReceiver->pRcvBuf) {
4,683,941!
499
    syncSnapBufferDestroy(&pReceiver->pRcvBuf);
4,684,248✔
500
  }
501

502
  (void)snapshotReceiverClearInfoData(pReceiver);
4,683,022✔
503

504
  // free receiver
505
  taosMemoryFree(pReceiver);
4,683,329!
506
}
507

508
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) {
13,155,226✔
509
  return (pReceiver != NULL ? atomic_load_8(&pReceiver->start) : false);
13,155,226!
510
}
511

512
static int32_t snapshotReceiverSignatureCmp(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
279,657✔
513
  int32_t code = 0;
279,657✔
514
  if (pReceiver->term < pMsg->term) code = -1;
279,657!
515
  if (pReceiver->term > pMsg->term) code = 1;
279,657!
516
  if (pReceiver->startTime < pMsg->startTime) code = -2;
279,657!
517
  if (pReceiver->startTime > pMsg->startTime) code = 2;
279,657!
518
  if (code != 0)
279,657!
519
    sRError(pReceiver, "receiver signature failed, result:%d, msg signature:(%" PRId64 ", %" PRId64 ")", code,
×
520
            pMsg->term, pMsg->startTime);
521
  return 0;
279,657✔
522
}
523

524
static int32_t snapshotReceiverStartWriter(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) {
11,241✔
525
  if (pReceiver->pWriter != NULL) {
11,241!
526
    sRError(pReceiver, "vgId:%d, snapshot receiver writer is not null", pReceiver->pSyncNode->vgId);
×
527
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
528
  }
529

530
  // update ack
531
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
11,241✔
532

533
  // update snapshot
534
  pReceiver->snapshot.lastApplyIndex = pBeginMsg->lastIndex;
11,241✔
535
  pReceiver->snapshot.lastApplyTerm = pBeginMsg->lastTerm;
11,241✔
536
  pReceiver->snapshot.lastConfigIndex = pBeginMsg->lastConfigIndex;
11,241✔
537
  pReceiver->snapshotParam.start = pBeginMsg->beginIndex;
11,241✔
538
  pReceiver->snapshotParam.end = pBeginMsg->lastIndex;
11,241✔
539

540
  // start writer
541
  int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, &pReceiver->snapshotParam,
11,241✔
542
                                                                  &pReceiver->pWriter);
543
  if (code != 0) {
11,241!
544
    sRError(pReceiver, "snapshot receiver start write failed since %s", tstrerror(code));
×
545
    TAOS_RETURN(code);
×
546
  }
547

548
  // event log
549
  sRInfo(pReceiver, "snapshot receiver start write");
11,241!
550
  return 0;
11,241✔
551
}
552

553
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pPreMsg) {
11,241✔
554
  if (snapshotReceiverIsStart(pReceiver)) {
11,241!
555
    sRInfo(pReceiver, "snapshot receiver has started");
×
556
    return;
×
557
  }
558

559
  int8_t started = atomic_val_compare_exchange_8(&pReceiver->start, false, true);
11,241✔
560
  if (started) return;
11,241!
561

562
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_PREP;
11,241✔
563
  pReceiver->term = pPreMsg->term;
11,241✔
564
  pReceiver->fromId = pPreMsg->srcId;
11,241✔
565
  pReceiver->startTime = pPreMsg->startTime;
11,241✔
566

567
  pReceiver->snapshotParam.start = syncNodeGetSnapBeginIndex(pReceiver->pSyncNode);
11,241✔
568
  pReceiver->snapshotParam.end = -1;
11,241✔
569

570
  sRInfo(pReceiver, "snapshot receiver start, from dnode:%d.", DID(&pReceiver->fromId));
11,241!
571
}
572

573
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
11,695✔
574
  sRDebug(pReceiver, "snapshot receiver stop, not apply, writer:%p", pReceiver->pWriter);
11,695!
575

576
  int8_t stopped = !atomic_val_compare_exchange_8(&pReceiver->start, true, false);
11,695✔
577
  if (stopped) return;
11,695✔
578

579
  (void)taosThreadMutexLock(&pReceiver->writerMutex);
11,241✔
580
  {
581
    if (pReceiver->pWriter != NULL) {
11,241✔
582
      int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
440✔
583
                                                                     false, &pReceiver->snapshot);
584
      if (code != 0) {
440!
585
        sRError(pReceiver, "snapshot receiver stop write failed since %s", tstrerror(code));
×
586
      }
587
      pReceiver->pWriter = NULL;
440✔
588
    } else {
589
      sRInfo(pReceiver, "snapshot receiver stop, writer is null");
10,801!
590
    }
591
  }
592
  (void)taosThreadMutexUnlock(&pReceiver->writerMutex);
11,241✔
593

594
  (void)taosThreadMutexLock(&pReceiver->pRcvBuf->mutex);
11,241✔
595
  {
596
    syncSnapBufferReset(pReceiver->pRcvBuf);
11,241✔
597

598
    (void)snapshotReceiverClearInfoData(pReceiver);
11,241✔
599
  }
600
  (void)taosThreadMutexUnlock(&pReceiver->pRcvBuf->mutex);
11,241✔
601
}
602

603
static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
10,801✔
604
  int32_t code = 0;
10,801✔
605
  if (pReceiver->pWriter != NULL) {
10,801!
606
    // write data
607
    sRInfo(pReceiver, "snapshot receiver write about to finish, blockLen:%d seq:%d", pMsg->dataLen, pMsg->seq);
10,801!
608
    if (pMsg->dataLen > 0) {
10,801!
609
      (void)taosThreadMutexLock(&pReceiver->writerMutex);
×
610
      code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, pMsg->data,
×
611
                                                           pMsg->dataLen);
×
612
      (void)taosThreadMutexUnlock(&pReceiver->writerMutex);
×
613
      if (code != 0) {
×
614
        sRError(pReceiver, "failed to finish snapshot receiver write since %s", tstrerror(code));
×
615
        TAOS_RETURN(code);
×
616
      }
617
    }
618

619
    // update commit index
620
    if (pReceiver->snapshot.lastApplyIndex > pReceiver->pSyncNode->commitIndex) {
10,801!
621
      pReceiver->pSyncNode->commitIndex = pReceiver->snapshot.lastApplyIndex;
10,801✔
622
    }
623

624
    // maybe update term
625
    if (pReceiver->snapshot.lastApplyTerm > raftStoreGetTerm(pReceiver->pSyncNode)) {
10,801!
626
      raftStoreSetTerm(pReceiver->pSyncNode, pReceiver->snapshot.lastApplyTerm);
×
627
    }
628

629
    (void)taosThreadMutexLock(&pReceiver->writerMutex);
10,801✔
630
    if (pReceiver->pWriter != NULL) {
10,801!
631
      // stop writer, apply data
632
      code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true,
10,801✔
633
                                                             &pReceiver->snapshot);
634
      if (code != 0) {
10,801!
635
        sRError(pReceiver, "snapshot receiver apply failed since %s", tstrerror(code));
×
636
        TAOS_RETURN(code);
×
637
      }
638
      pReceiver->pWriter = NULL;
10,801✔
639
      sRInfo(pReceiver, "snapshot receiver write stopped");
10,801!
640
    }
641
    (void)taosThreadMutexUnlock(&pReceiver->writerMutex);
10,801✔
642

643
    // update progress
644
    pReceiver->ack = SYNC_SNAPSHOT_SEQ_END;
10,801✔
645

646
    // get fsmState
647
    SSnapshot snapshot = {0};
10,801✔
648
    code = pReceiver->pSyncNode->pFsm->FpGetSnapshotInfo(pReceiver->pSyncNode->pFsm, &snapshot);
10,801✔
649
    if (code != 0) {
10,801!
650
      sRError(pReceiver, "snapshot receiver get snapshot info failed since %s", tstrerror(code));
×
651
      TAOS_RETURN(code);
×
652
    }
653
    pReceiver->pSyncNode->fsmState = snapshot.state;
10,801✔
654

655
    // reset wal
656
    code =
657
        pReceiver->pSyncNode->pLogStore->syncLogRestoreFromSnapshot(pReceiver->pSyncNode->pLogStore, pMsg->lastIndex);
10,801✔
658
    if (code != 0) {
10,801!
659
      sRError(pReceiver, "failed to snapshot receiver log restore since %s", tstrerror(code));
×
660
      TAOS_RETURN(code);
×
661
    }
662
    sRInfo(pReceiver, "wal log restored from snapshot");
10,801!
663
  } else {
664
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
665
    sRError(pReceiver, "snapshot receiver finish error since writer is null");
×
666
    TAOS_RETURN(code);
×
667
  }
668

669
  return 0;
10,801✔
670
}
671

672
static int32_t snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
257,615✔
673
  if (pMsg->seq != pReceiver->ack + 1) {
257,615!
674
    sRError(pReceiver, "snapshot receiver invalid seq, ack:%d seq:%d", pReceiver->ack, pMsg->seq);
×
675
    TAOS_RETURN(TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG);
×
676
  }
677

678
  (void)taosThreadMutexLock(&pReceiver->writerMutex);
257,615✔
679

680
  if (pReceiver->pWriter == NULL) {
257,615!
681
    (void)taosThreadMutexUnlock(&pReceiver->writerMutex);
×
682
    sRError(pReceiver, "snapshot receiver failed to write data since writer is null");
×
683
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
684
  }
685

686
  sRDebug(pReceiver, "snapshot receiver continue to write, blockLen:%d seq:%d", pMsg->dataLen, pMsg->seq);
257,615!
687

688
  if (pMsg->dataLen > 0) {
257,615!
689
    // apply data block
690
    int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
515,230✔
691
                                                                 pMsg->data, pMsg->dataLen);
257,615✔
692
    if (code != 0) {
257,615!
693
      (void)taosThreadMutexUnlock(&pReceiver->writerMutex);
×
694
      sRError(pReceiver, "snapshot receiver continue write failed since %s", tstrerror(code));
×
695
      TAOS_RETURN(code);
×
696
    }
697
  }
698

699
  (void)taosThreadMutexUnlock(&pReceiver->writerMutex);
257,615✔
700

701
  // update progress
702
  pReceiver->ack = pMsg->seq;
257,615✔
703

704
  // event log
705
  sRDebug(pReceiver, "snapshot receiver continue to write finish");
257,615!
706
  return 0;
257,615✔
707
}
708

709
SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths) {
22,482✔
710
  SyncIndex snapStart = SYNC_INDEX_INVALID;
22,482✔
711

712
  if (syncNodeIsMnode(ths)) {
22,482!
713
    snapStart = SYNC_INDEX_BEGIN;
×
714
    sNInfo(ths, "snapshot begin index is %" PRId64 " since its mnode", snapStart);
×
715
  } else {
716
    SSyncLogStoreData *pData = ths->pLogStore->data;
22,482✔
717
    SWal              *pWal = pData->pWal;
22,482✔
718

719
    int64_t walCommitVer = walGetCommittedVer(pWal);
22,482✔
720
    snapStart = TMAX(ths->commitIndex, walCommitVer) + 1;
22,482✔
721

722
    sNInfo(ths, "snapshot begin index is %" PRId64, snapStart);
22,482!
723
  }
724

725
  return snapStart;
22,482✔
726
}
727

728
static int32_t syncSnapReceiverExchgSnapInfo(SSyncNode *pSyncNode, SSyncSnapshotReceiver *pReceiver,
11,241✔
729
                                             SyncSnapshotSend *pMsg, SSnapshot *pInfo) {
730
  if (pMsg->payloadType != TDMT_SYNC_PREP_SNAPSHOT) return TSDB_CODE_SYN_INTERNAL_ERROR;
11,241!
731
  int32_t code = 0, lino = 0;
11,241✔
732

733
  // copy snap info from leader
734
  void *data = taosMemoryCalloc(1, pMsg->dataLen);
11,241!
735
  if (data == NULL) {
11,241!
736
    TAOS_CHECK_EXIT(terrno);
×
737
  }
738
  pInfo->data = data;
11,241✔
739
  data = NULL;
11,241✔
740
  (void)memcpy(pInfo->data, pMsg->data, pMsg->dataLen);
11,241!
741

742
  // exchange snap info
743
  if ((code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, pInfo)) != 0) {
11,241!
744
    sRError(pReceiver, "failed to get snapshot info. type: %d", pMsg->payloadType);
×
745
    goto _exit;
×
746
  }
747
  SSyncTLV *datHead = pInfo->data;
11,241✔
748
  if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
11,241!
749
    sRError(pReceiver, "unexpected data typ in data of snapshot info. typ: %d", datHead->typ);
×
750
    code = TSDB_CODE_INVALID_DATA_FMT;
×
751
    goto _exit;
×
752
  }
753
  int32_t dataLen = sizeof(SSyncTLV) + datHead->len;
11,241✔
754

755
  // save exchanged snap info
756
  SSnapshotParam *pParam = &pReceiver->snapshotParam;
11,241✔
757
  data = taosMemoryRealloc(pParam->data, dataLen);
11,241!
758
  if (data == NULL) {
11,241!
759
    code = terrno;
×
760
    sError("vgId:%d, failed to realloc memory for snapshot prep due to %s. dataLen:%d", pSyncNode->vgId,
×
761
           tstrerror(code), dataLen);
762
    goto _exit;
×
763
  }
764
  pParam->data = data;
11,241✔
765
  data = NULL;
11,241✔
766
  (void)memcpy(pParam->data, pInfo->data, dataLen);
11,241!
767

768
_exit:
11,241✔
769
  TAOS_RETURN(code);
11,241✔
770
}
771

772
static int32_t syncNodeOnSnapshotPrep(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
11,241✔
773
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
11,241✔
774
  int64_t                timeNow = taosGetTimestampMs();
11,241✔
775
  int32_t                code = 0;
11,241✔
776

777
  if (snapshotReceiverIsStart(pReceiver)) {
11,241!
778
    // already start
779
    int32_t order = 0;
×
780
    if ((order = snapshotReceiverSignatureCmp(pReceiver, pMsg)) < 0) {
×
781
      sInfo("failed to prepare snapshot, received a new snapshot preparation. restart receiver.");
×
782
      goto _START_RECEIVER;
×
783
    } else if (order == 0) {
×
784
      sInfo("prepare snapshot, received a duplicate snapshot preparation. send reply.");
×
785
      goto _SEND_REPLY;
×
786
    } else {
787
      // ignore
788
      sError("failed to prepare snapshot, received a stale snapshot preparation. ignore.");
×
789
      code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
790
      goto _SEND_REPLY;
×
791
    }
792
  } else {
793
    // start new
794
    sRInfo(pReceiver, "snapshot receiver not start yet so start new one");
11,241!
795
    goto _START_RECEIVER;
11,241✔
796
  }
797

798
_START_RECEIVER:
11,241✔
799
  if (snapshotReceiverIsStart(pReceiver)) {
11,241!
800
    sRInfo(pReceiver, "snapshot receiver already start and force stop pre one");
×
801
    snapshotReceiverStop(pReceiver);
×
802
  }
803

804
  snapshotReceiverStart(pReceiver, pMsg);
11,241✔
805

806
_SEND_REPLY:;
11,241✔
807

808
  SSnapshot snapInfo = {.type = TDMT_SYNC_PREP_SNAPSHOT_REPLY};
11,241✔
809
  int32_t   dataLen = 0;
11,241✔
810
  if (pMsg->payloadType == TDMT_SYNC_PREP_SNAPSHOT) {
11,241!
811
    if ((code = syncSnapReceiverExchgSnapInfo(pSyncNode, pReceiver, pMsg, &snapInfo)) != 0) {
11,241!
812
      goto _out;
×
813
    }
814
    SSyncTLV *datHead = snapInfo.data;
11,241✔
815
    dataLen = sizeof(SSyncTLV) + datHead->len;
11,241✔
816
  }
817

818
  // send response
819
  int32_t type = (snapInfo.data) ? snapInfo.type : 0;
11,241!
820
  if ((code = syncSnapSendRsp(pReceiver, pMsg, snapInfo.data, dataLen, type, code)) != 0) {
11,241!
821
    goto _out;
×
822
  }
823

824
_out:
11,241✔
825
  if (snapInfo.data) {
11,241!
826
    taosMemoryFree(snapInfo.data);
11,241!
827
    snapInfo.data = NULL;
11,241✔
828
  }
829
  TAOS_RETURN(code);
11,241✔
830
}
831

832
static int32_t syncNodeOnSnapshotBegin(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
11,241✔
833
  // condition 1
834
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
11,241✔
835
  int32_t                code = TSDB_CODE_SYN_INTERNAL_ERROR;
11,241✔
836

837
  if (!snapshotReceiverIsStart(pReceiver)) {
11,241!
838
    sRError(pReceiver, "failed to begin snapshot receiver since not started");
×
839
    goto _SEND_REPLY;
×
840
  }
841

842
  if (snapshotReceiverSignatureCmp(pReceiver, pMsg) != 0) {
11,241!
843
    code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
844
    sError("failed to begin snapshot, since %s", tstrerror(code));
×
845
    goto _SEND_REPLY;
×
846
  }
847

848
  // start writer
849
  if ((code = snapshotReceiverStartWriter(pReceiver, pMsg)) != 0) {
11,241!
850
    sRError(pReceiver, "failed to start snapshot writer since %s", tstrerror(code));
×
851
    goto _SEND_REPLY;
×
852
  }
853

854
  SyncIndex beginIndex = syncNodeGetSnapBeginIndex(pSyncNode);
11,241✔
855
  if (pReceiver->snapshotParam.start != beginIndex) {
11,241!
856
    sRError(pReceiver, "snapshot begin index is changed unexpectedly. sver:%" PRId64 ", beginIndex:%" PRId64,
×
857
            pReceiver->snapshotParam.start, beginIndex);
858
    goto _SEND_REPLY;
×
859
  }
860

861
  code = 0;
11,241✔
862
_SEND_REPLY:
11,241✔
863

864
  // send response
865
  TAOS_CHECK_RETURN(syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, code));
11,241!
866

867
  TAOS_RETURN(code);
11,241✔
868
}
869

870
int32_t syncSnapSendRsp(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg, void *pBlock, int32_t blockLen,
280,097✔
871
                        int32_t type, int32_t rspCode) {
872
  int32_t    code = 0;
280,097✔
873
  SSyncNode *pSyncNode = pReceiver->pSyncNode;
280,097✔
874
  // build msg
875
  SRpcMsg rpcMsg = {0};
280,097✔
876
  if ((code = syncBuildSnapshotSendRsp(&rpcMsg, blockLen, pSyncNode->vgId)) != 0) {
280,097!
877
    sRError(pReceiver, "failed to build snapshot receiver resp since %s", tstrerror(code));
×
878
    TAOS_RETURN(code);
×
879
  }
880

881
  SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
280,097✔
882
  pRspMsg->srcId = pSyncNode->myRaftId;
280,097✔
883
  pRspMsg->destId = pMsg->srcId;
280,097✔
884
  pRspMsg->term = pMsg->term;
280,097✔
885
  pRspMsg->lastIndex = pMsg->lastIndex;
280,097✔
886
  pRspMsg->lastTerm = pMsg->lastTerm;
280,097✔
887
  pRspMsg->startTime = pMsg->startTime;
280,097✔
888
  pRspMsg->ack = pMsg->seq;
280,097✔
889
  pRspMsg->code = rspCode;
280,097✔
890
  pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;
280,097✔
891
  pRspMsg->payloadType = type;
280,097✔
892

893
  if (pBlock != NULL && blockLen > 0) {
280,097!
894
    (void)memcpy(pRspMsg->data, pBlock, blockLen);
11,241!
895
  }
896

897
  // send msg
898
  if ((code = syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg)) != 0) {
280,097!
899
    sRError(pReceiver, "failed to send snapshot receiver resp since %s", tstrerror(code));
×
900
    TAOS_RETURN(code);
×
901
  }
902
  return 0;
280,097✔
903
}
904

905
static int32_t syncSnapBufferRecv(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend **ppMsg) {
257,615✔
906
  int32_t           code = 0;
257,615✔
907
  SSyncSnapBuffer  *pRcvBuf = pReceiver->pRcvBuf;
257,615✔
908
  SyncSnapshotSend *pMsg = ppMsg[0];
257,615✔
909

910
  (void)taosThreadMutexLock(&pRcvBuf->mutex);
257,615✔
911

912
  if (pMsg->seq - pRcvBuf->start >= pRcvBuf->size) {
257,615!
913
    code = TSDB_CODE_SYN_BUFFER_FULL;
×
914
    goto _out;
×
915
  }
916

917
  if (!(pRcvBuf->start <= pRcvBuf->cursor + 1 && pRcvBuf->cursor < pRcvBuf->end)) return TSDB_CODE_SYN_INTERNAL_ERROR;
257,615!
918

919
  if (pMsg->seq > pRcvBuf->cursor) {
257,615!
920
    if (pRcvBuf->entries[pMsg->seq % pRcvBuf->size]) {
257,615!
921
      pRcvBuf->entryDeleteCb(pRcvBuf->entries[pMsg->seq % pRcvBuf->size]);
×
922
    }
923
    pRcvBuf->entries[pMsg->seq % pRcvBuf->size] = pMsg;
257,615!
924
    ppMsg[0] = NULL;
257,615✔
925
    pRcvBuf->end = TMAX(pMsg->seq + 1, pRcvBuf->end);
257,615✔
926
  } else if (pMsg->seq < pRcvBuf->start) {
×
927
    code = syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, code);
×
928
    goto _out;
×
929
  }
930

931
  for (int64_t seq = pRcvBuf->cursor + 1; seq < pRcvBuf->end; ++seq) {
515,230✔
932
    if (pRcvBuf->entries[seq % pRcvBuf->size]) {
335,718!
933
      pRcvBuf->cursor = seq;
257,615✔
934
    } else {
935
      break;
78,103✔
936
    }
937
  }
938

939
  for (int64_t seq = pRcvBuf->start; seq <= pRcvBuf->cursor; ++seq) {
515,230✔
940
    if ((code = snapshotReceiverGotData(pReceiver, pRcvBuf->entries[seq % pRcvBuf->size])) != 0) {
257,615!
941
      if (code >= SYNC_SNAPSHOT_SEQ_INVALID) {
×
942
        code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
943
      }
944
    }
945
    pRcvBuf->start = seq + 1;
257,615✔
946
    if (syncSnapSendRsp(pReceiver, pRcvBuf->entries[seq % pRcvBuf->size], NULL, 0, 0, code) != 0) {
257,615!
947
      sError("failed to send snap rsp");
×
948
    }
949
    pRcvBuf->entryDeleteCb(pRcvBuf->entries[seq % pRcvBuf->size]);
257,615!
950
    pRcvBuf->entries[seq % pRcvBuf->size] = NULL;
257,615!
951
    if (code) goto _out;
257,615!
952
  }
953

954
_out:
257,615✔
955
  (void)taosThreadMutexUnlock(&pRcvBuf->mutex);
257,615✔
956
  TAOS_RETURN(code);
257,615✔
957
}
958

959
static int32_t syncNodeOnSnapshotReceive(SSyncNode *pSyncNode, SyncSnapshotSend **ppMsg) {
257,615✔
960
  // condition 4
961
  // transfering
962
  SyncSnapshotSend *pMsg = ppMsg[0];
257,615✔
963
  if (!pMsg) return TSDB_CODE_SYN_INTERNAL_ERROR;
257,615!
964
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
257,615✔
965
  int64_t                timeNow = taosGetTimestampMs();
257,615✔
966
  int32_t                code = 0;
257,615✔
967

968
  if (snapshotReceiverSignatureCmp(pReceiver, pMsg) != 0) {
257,615!
969
    code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
970
    sError("failed to receive snapshot data, since %s", tstrerror(code));
×
971
    return syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, code);
×
972
  }
973

974
  return syncSnapBufferRecv(pReceiver, ppMsg);
257,615✔
975
}
976

977
static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
10,801✔
978
  // condition 2
979
  // end, finish FSM
980
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
10,801✔
981
  int64_t                timeNow = taosGetTimestampMs();
10,801✔
982
  int32_t                code = 0;
10,801✔
983

984
  if (snapshotReceiverSignatureCmp(pReceiver, pMsg) != 0) {
10,801!
985
    code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
986
    sError("failed to end snapshot, since %s", tstrerror(code));
×
987
    goto _SEND_REPLY;
×
988
  }
989

990
  code = snapshotReceiverFinish(pReceiver, pMsg);
10,801✔
991
  if (code == 0) {
10,801!
992
    snapshotReceiverStop(pReceiver);
10,801✔
993
  }
994

995
_SEND_REPLY:;
×
996

997
  // build msg
998
  SRpcMsg rpcMsg = {0};
10,801✔
999
  if ((code = syncBuildSnapshotSendRsp(&rpcMsg, 0, pSyncNode->vgId)) != 0) {
10,801!
1000
    sRError(pReceiver, "snapshot receiver build rsp failed since %s", tstrerror(code));
×
1001
    TAOS_RETURN(code);
×
1002
  }
1003

1004
  SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
10,801✔
1005
  pRspMsg->srcId = pSyncNode->myRaftId;
10,801✔
1006
  pRspMsg->destId = pMsg->srcId;
10,801✔
1007
  pRspMsg->term = raftStoreGetTerm(pSyncNode);
10,801✔
1008
  pRspMsg->lastIndex = pMsg->lastIndex;
10,801✔
1009
  pRspMsg->lastTerm = pMsg->lastTerm;
10,801✔
1010
  pRspMsg->startTime = pMsg->startTime;
10,801✔
1011
  pRspMsg->ack = pReceiver->ack;  // receiver maybe already closed
10,801✔
1012
  pRspMsg->code = code;
10,801✔
1013
  pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;
10,801✔
1014

1015
  // send msg
1016
  syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver end", &rpcMsg.info.traceId);
10,801✔
1017
  if ((code = syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg)) != 0) {
10,801!
1018
    sRError(pReceiver, "snapshot receiver send rsp failed since %s", tstrerror(code));
×
1019
    TAOS_RETURN(code);
×
1020
  }
1021

1022
  TAOS_RETURN(code);
10,801✔
1023
}
1024

1025
int64_t lastRecvPrintLog = 0;
1026

1027
int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
290,898✔
1028
  SyncSnapshotSend     **ppMsg = (SyncSnapshotSend **)&pRpcMsg->pCont;
290,898✔
1029
  SyncSnapshotSend      *pMsg = ppMsg[0];
290,898✔
1030
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
290,898✔
1031
  int32_t                code = 0;
290,898✔
1032

1033
  // if already drop replica, do not process
1034
  if (!syncNodeInRaftGroup(pSyncNode, &pMsg->srcId)) {
290,898!
1035
    syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "not in my config", &pRpcMsg->info.traceId);
×
1036
    code = TSDB_CODE_SYN_NOT_IN_RAFT_GROUP;
×
1037
    TAOS_RETURN(code);
×
1038
  }
1039

1040
  if (pMsg->term < raftStoreGetTerm(pSyncNode)) {
290,898!
1041
    sRError(pReceiver, "reject snap replication with smaller term. msg term:%" PRId64 ", seq:%d", pMsg->term,
×
1042
            pMsg->seq);
1043
    code = TSDB_CODE_SYN_TERM_NOT_MATCH;
×
1044
    if (syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, code) != 0) sError("failed to send snap rsp");
×
1045
    TAOS_RETURN(code);
×
1046
  }
1047

1048
  if (pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole != TAOS_SYNC_ROLE_LEARNER) {
290,898✔
1049
    if (pMsg->term > raftStoreGetTerm(pSyncNode)) {
126,349!
1050
      syncNodeStepDown(pSyncNode, pMsg->term, pMsg->srcId, "snapshot");
×
1051
    }
1052
  } else {
1053
    syncNodeUpdateTermWithoutStepDown(pSyncNode, pMsg->term);
164,549✔
1054
  }
1055

1056
  if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER && pSyncNode->state != TAOS_SYNC_STATE_LEARNER) {
290,898!
1057
    sRError(pReceiver, "snapshot receiver not a follower or learner");
×
1058
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1059
    TAOS_RETURN(code);
×
1060
  }
1061

1062
  if (pMsg->seq < SYNC_SNAPSHOT_SEQ_PREP || pMsg->seq > SYNC_SNAPSHOT_SEQ_END) {
290,898!
1063
    sRError(pReceiver, "snap replication msg with invalid seq:%d", pMsg->seq);
×
1064
    code = TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG;
×
1065
    TAOS_RETURN(code);
×
1066
  }
1067

1068
  // prepare
1069
  if (pMsg->seq == SYNC_SNAPSHOT_SEQ_PREP) {
290,898✔
1070
    sInfo(
11,241!
1071
        "vgId:%d, snapshot replication progress:2/8:follower:1/4, start to prepare, recv msg:%s, snap seq:%d, msg "
1072
        "signature:(%" PRId64 ", %" PRId64 ")",
1073
        pSyncNode->vgId, TMSG_INFO(pRpcMsg->msgType), pMsg->seq, pMsg->term, pMsg->startTime);
1074
    code = syncNodeOnSnapshotPrep(pSyncNode, pMsg);
11,241✔
1075
    sDebug(
11,241!
1076
        "vgId:%d, snapshot replication progress:2/8:follower:1/4, finish to prepare, recv msg:%s, snap seq:%d, msg "
1077
        "signature:(%" PRId64 ", %" PRId64 ")",
1078
        pSyncNode->vgId, TMSG_INFO(pRpcMsg->msgType), pMsg->seq, pMsg->term, pMsg->startTime);
1079
    goto _out;
11,241✔
1080
  }
1081

1082
  // begin
1083
  if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) {
279,657✔
1084
    sInfo("vgId:%d, snapshot replication progress:4/8:follower:2/4, start to begin,replication. msg signature:(%" PRId64
11,241!
1085
          ", %" PRId64 "), snapshot msg seq:%d",
1086
          pSyncNode->vgId, pMsg->term, pMsg->startTime, pMsg->seq);
1087
    code = syncNodeOnSnapshotBegin(pSyncNode, pMsg);
11,241✔
1088
    sDebug("vgId:%d, snapshot replication progress:4/8:follower:2/4, finish to begin. msg signature:(%" PRId64
11,241!
1089
          ", %" PRId64 ")",
1090
          pSyncNode->vgId, pMsg->term, pMsg->startTime);
1091
    goto _out;
11,241✔
1092
  }
1093

1094
  // data
1095
  if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
268,416!
1096
    int64_t currentTimestamp = taosGetTimestampMs()/1000;
257,615✔
1097
    if (currentTimestamp > lastRecvPrintLog) {
257,615✔
1098
      sInfo("vgId:%d, snapshot replication progress:6/8:follower:3/4, start to receive. msg signature:(%" PRId64
16,323!
1099
            ", %" PRId64 "), snapshot msg seq:%d",
1100
            pSyncNode->vgId, pMsg->term, pMsg->startTime, pMsg->seq);
1101

1102
    } else {
1103
      sDebug("vgId:%d, snapshot replication progress:6/8:follower:3/4, start to receive. msg signature:(%" PRId64
241,292!
1104
             ", %" PRId64 "), snapshot msg seq:%d",
1105
             pSyncNode->vgId, pMsg->term, pMsg->startTime, pMsg->seq);
1106
    }
1107
    lastRecvPrintLog = currentTimestamp;
257,615✔
1108
    code = syncNodeOnSnapshotReceive(pSyncNode, ppMsg);
257,615✔
1109
    sDebug("vgId:%d, snapshot replication progress:6/8:follower:3/4, finish to receive.", pSyncNode->vgId);
257,615!
1110
    goto _out;
257,615✔
1111
  }
1112

1113
  // end
1114
  if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
10,801!
1115
    sInfo("vgId:%d, snapshot replication progress:7/8:follower:4/4, start to end. msg signature:(%" PRId64 ", %" PRId64
10,801!
1116
          "), snapshot msg seq:%d",
1117
          pSyncNode->vgId, pMsg->term, pMsg->startTime, pMsg->seq);
1118
    code = syncNodeOnSnapshotEnd(pSyncNode, pMsg);
10,801✔
1119
    if (code != 0) {
10,801!
1120
      sRError(pReceiver, "failed to end snapshot.");
×
1121
      goto _out;
×
1122
    }
1123

1124
    code = syncLogBufferReInit(pSyncNode->pLogBuf, pSyncNode);
10,801✔
1125
    if (code != 0) {
10,801!
1126
      sRError(pReceiver, "failed to reinit log buffer since %s", tstrerror(code));
×
1127
    }
1128
    sDebug("vgId:%d, snapshot replication progress:7/7:follower:4/4, finish to end. msg signature:(%" PRId64 ", %" PRId64
10,801!
1129
          ")",
1130
          pSyncNode->vgId, pMsg->term, pMsg->startTime);
1131
    goto _out;
10,801✔
1132
  }
1133

1134
_out:;
×
1135
  syncNodeResetElectTimer(pSyncNode);
290,898✔
1136
  TAOS_RETURN(code);
290,898✔
1137
}
1138

1139
static int32_t syncSnapSenderExchgSnapInfo(SSyncNode *pSyncNode, SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
11,689✔
1140
  if (pMsg->payloadType != TDMT_SYNC_PREP_SNAPSHOT_REPLY) return TSDB_CODE_SYN_INTERNAL_ERROR;
11,689!
1141

1142
  SSyncTLV *datHead = (void *)pMsg->data;
11,689✔
1143
  if (datHead->typ != pMsg->payloadType) {
11,689!
1144
    sSError(pSender, "unexpected data type in data of SyncSnapshotRsp. typ: %d", datHead->typ);
×
1145
    TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT);
×
1146
  }
1147
  int32_t dataLen = sizeof(SSyncTLV) + datHead->len;
11,689✔
1148

1149
  SSnapshotParam *pParam = &pSender->snapshotParam;
11,689✔
1150
  void           *data = taosMemoryRealloc(pParam->data, dataLen);
11,689!
1151
  if (data == NULL) {
11,689!
1152
    TAOS_RETURN(terrno);
×
1153
  }
1154
  (void)memcpy(data, pMsg->data, dataLen);
11,689!
1155

1156
  pParam->data = data;
11,689✔
1157
  data = NULL;
11,689✔
1158
  sSInfo(pSender, "data of snapshot param. len: %d", datHead->len);
11,689!
1159
  return 0;
11,689✔
1160
}
1161

1162
// sender
1163
static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
11,689✔
1164
  int32_t   code = 0;
11,689✔
1165
  SSnapshot snapshot = {0};
11,689✔
1166

1167
  if (pMsg->snapBeginIndex > pSyncNode->commitIndex + 1) {
11,689!
1168
    sSError(pSender,
×
1169
            "snapshot begin index is greater than commit index. msg snapBeginIndex:%" PRId64
1170
            ", node commitIndex:%" PRId64,
1171
            pMsg->snapBeginIndex, pSyncNode->commitIndex);
1172
    TAOS_RETURN(TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG);
×
1173
  }
1174

1175
  (void)taosThreadMutexLock(&pSender->pSndBuf->mutex);
11,689✔
1176
  TAOS_CHECK_GOTO(pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot), NULL, _out);
11,689!
1177

1178
  // prepare <begin, end>
1179
  pSender->snapshotParam.start = pMsg->snapBeginIndex;
11,689✔
1180
  pSender->snapshotParam.end = snapshot.lastApplyIndex;
11,689✔
1181

1182
  sSInfo(pSender, "prepare snapshot, recv-begin:%" PRId64 ", snapshot.last:%" PRId64 ", snapshot.term:%" PRId64,
11,689!
1183
         pMsg->snapBeginIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
1184

1185
  // update sender
1186
  pSender->snapshot = snapshot;
11,689✔
1187

1188
  // start reader
1189
  if (pMsg->payloadType == TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
11,689!
1190
    TAOS_CHECK_GOTO(syncSnapSenderExchgSnapInfo(pSyncNode, pSender, pMsg), NULL, _out);
11,689!
1191
  }
1192

1193
  code = pSyncNode->pFsm->FpSnapshotStartRead(pSyncNode->pFsm, &pSender->snapshotParam, &pSender->pReader);
11,689✔
1194
  if (code != 0) {
11,689!
1195
    sSError(pSender, "prepare snapshot failed since %s", tstrerror(code));
×
1196
    goto _out;
×
1197
  }
1198

1199
  // update next index
1200
  syncIndexMgrSetIndex(pSyncNode->pNextIndex, &pMsg->srcId, snapshot.lastApplyIndex + 1);
11,689✔
1201

1202
  code = snapshotSend(pSender);
11,689✔
1203

1204
_out:
11,689✔
1205
  (void)taosThreadMutexUnlock(&pSender->pSndBuf->mutex);
11,689✔
1206
  TAOS_RETURN(code);
11,689✔
1207
}
1208

1209
static int32_t snapshotSenderSignatureCmp(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
574,740✔
1210
  int32_t code = 0;
574,740✔
1211
  if (pSender->term < pMsg->term) return -1;
574,740!
1212
  if (pSender->term > pMsg->term) return 1;
574,740!
1213
  if (pSender->startTime < pMsg->startTime) return -2;
574,740!
1214
  if (pSender->startTime > pMsg->startTime) return 2;
574,740!
1215
  if (code != 0)
574,740!
1216
    sSError(pSender, "sender signature failed, result:%d, msg signature:(%" PRId64 ", %" PRId64 ")", code, pMsg->term,
×
1217
            pMsg->startTime);
1218
  return 0;
574,740✔
1219
}
1220

1221
static int32_t syncSnapBufferSend(SSyncSnapshotSender *pSender, SyncSnapshotRsp **ppMsg) {
275,908✔
1222
  int32_t          code = 0;
275,908✔
1223
  SSyncSnapBuffer *pSndBuf = pSender->pSndBuf;
275,908✔
1224
  SyncSnapshotRsp *pMsg = ppMsg[0];
275,908✔
1225

1226
  (void)taosThreadMutexLock(&pSndBuf->mutex);
275,908✔
1227
  if (snapshotSenderSignatureCmp(pSender, pMsg) != 0) {
275,908!
1228
    code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
1229
    sError("failed to send snapshot data, since %s", tstrerror(code));
×
1230
    goto _out;
×
1231
  }
1232

1233
  if (pSender->pReader == NULL || pSender->finish || !snapshotSenderIsStart(pSender)) {
275,908!
1234
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1235
    goto _out;
×
1236
  }
1237

1238
  if (pMsg->ack - pSndBuf->start >= pSndBuf->size) {
275,908!
1239
    code = TSDB_CODE_SYN_BUFFER_FULL;
×
1240
    goto _out;
×
1241
  }
1242

1243
  if (!(pSndBuf->start <= pSndBuf->cursor + 1 && pSndBuf->cursor < pSndBuf->end)) {
275,908!
1244
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1245
    goto _out;
×
1246
  }
1247

1248
  if (pMsg->ack > pSndBuf->cursor && pMsg->ack < pSndBuf->end) {
275,908!
1249
    SyncSnapBlock *pBlk = pSndBuf->entries[pMsg->ack % pSndBuf->size];
264,219!
1250
    if (!pBlk) {
264,219!
1251
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1252
      goto _out;
×
1253
    }
1254
    pBlk->acked = 1;
264,219✔
1255
  }
1256

1257
  for (int64_t ack = pSndBuf->cursor + 1; ack < pSndBuf->end; ++ack) {
540,127✔
1258
    SyncSnapBlock *pBlk = pSndBuf->entries[ack % pSndBuf->size];
516,749!
1259
    if (pBlk->acked) {
516,749✔
1260
      pSndBuf->cursor = ack;
264,219✔
1261
    } else {
1262
      break;
252,530✔
1263
    }
1264
  }
1265

1266
  for (int64_t ack = pSndBuf->start; ack <= pSndBuf->cursor; ++ack) {
540,127✔
1267
    pSndBuf->entryDeleteCb(pSndBuf->entries[ack % pSndBuf->size]);
264,219!
1268
    pSndBuf->entries[ack % pSndBuf->size] = NULL;
264,219!
1269
    pSndBuf->start = ack + 1;
264,219✔
1270
  }
1271

1272
  while (pSender->seq != SYNC_SNAPSHOT_SEQ_END && pSender->seq - pSndBuf->start < tsSnapReplMaxWaitN) {
551,816!
1273
    if ((code = snapshotSend(pSender)) != 0) {
275,908!
1274
      goto _out;
×
1275
    }
1276
  }
1277

1278
  if (pSender->seq == SYNC_SNAPSHOT_SEQ_END && pSndBuf->end <= pSndBuf->start) {
275,908!
1279
    if ((code = snapshotSend(pSender)) != 0) {
11,689!
1280
      goto _out;
×
1281
    }
1282
  }
1283
_out:
275,908✔
1284
  (void)taosThreadMutexUnlock(&pSndBuf->mutex);
275,908✔
1285
  TAOS_RETURN(code);
275,908✔
1286
}
1287

1288
int64_t lastSendPrintLog = 0;
1289

1290
int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
298,832✔
1291
  SyncSnapshotRsp **ppMsg = (SyncSnapshotRsp **)&pRpcMsg->pCont;
298,832✔
1292
  SyncSnapshotRsp  *pMsg = ppMsg[0];
298,832✔
1293
  int32_t           code = 0;
298,832✔
1294

1295
  // if already drop replica, do not process
1296
  if (!syncNodeInRaftGroup(pSyncNode, &pMsg->srcId)) {
298,832!
1297
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "maybe replica already dropped", &pRpcMsg->info.traceId);
×
1298
    TAOS_RETURN(TSDB_CODE_SYN_NOT_IN_RAFT_GROUP);
×
1299
  }
1300

1301
  // get sender
1302
  SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, &pMsg->srcId);
298,832✔
1303
  if (pSender == NULL) {
298,832!
1304
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "sender is null", &pRpcMsg->info.traceId);
×
1305
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
1306
  }
1307

1308
  if (!snapshotSenderIsStart(pSender)) {
298,832!
1309
    sSError(pSender, "snapshot sender stopped. sender startTime:%" PRId64 ", msg startTime:%" PRId64,
×
1310
            pSender->startTime, pMsg->startTime);
1311
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
1312
  }
1313

1314
  // check signature
1315
  int32_t order = 0;
298,832✔
1316
  if ((order = snapshotSenderSignatureCmp(pSender, pMsg)) > 0) {
298,832!
1317
    sError("failed to check snapshot rsp signature, ignore a stale snap rsp.");
×
1318
    TAOS_RETURN(TSDB_CODE_SYN_MISMATCHED_SIGNATURE);
×
1319
  } else if (order < 0) {
298,832!
1320
    sError("failed to check snapshot rsp signature, snapshot sender is stale. stop");
×
1321
    code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
1322
    goto _ERROR;
×
1323
  }
1324

1325
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
298,832!
1326
    sSError(pSender, "snapshot sender not leader");
×
1327
    code = TSDB_CODE_SYN_NOT_LEADER;
×
1328
    goto _ERROR;
×
1329
  }
1330

1331
  SyncTerm currentTerm = raftStoreGetTerm(pSyncNode);
298,832✔
1332
  if (pMsg->term != currentTerm) {
298,832!
1333
    sSError(pSender, "snapshot sender term mismatch, msg term:%" PRId64 " currentTerm:%" PRId64, pMsg->term,
×
1334
            currentTerm);
1335
    code = TSDB_CODE_SYN_TERM_NOT_MATCH;
×
1336
    goto _ERROR;
×
1337
  }
1338

1339
  if (pMsg->code != 0) {
298,832!
1340
    sSError(pSender, "snapshot sender receive error:%s 0x%x and stop sender", tstrerror(pMsg->code), pMsg->code);
×
1341
    code = pMsg->code;
×
1342
    goto _ERROR;
×
1343
  }
1344

1345
  // send begin
1346
  if (pMsg->ack == SYNC_SNAPSHOT_SEQ_PREP) {
298,832✔
1347
    sSInfo(pSender, "snapshot replication progress:3/8:leader:2/4, process prepare rsp, msg:%s, snap ack:%d, ",
11,689!
1348
           TMSG_INFO(pRpcMsg->msgType), pMsg->ack);
1349
    if ((code = syncNodeOnSnapshotPrepRsp(pSyncNode, pSender, pMsg)) != 0) {
11,689!
1350
      goto _ERROR;
×
1351
    }
1352
  }
1353

1354
  // send msg of data or end
1355
  if (pMsg->ack >= SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->ack < SYNC_SNAPSHOT_SEQ_END) {
298,832✔
1356
    int64_t currentTimestamp = taosGetTimestampMs()/1000;
275,908✔
1357
    if (currentTimestamp > lastSendPrintLog) {
275,908✔
1358
      sSInfo(pSender, "snapshot replication progress:5/8:leader:3/4, send buffer, msg:%s, snap ack:%d",
21,515!
1359
             TMSG_INFO(pRpcMsg->msgType), pMsg->ack);
1360
    } else {
1361
      sSDebug(pSender, "snapshot replication progress:5/8:leader:3/4, send buffer, msg:%s, snap ack:%d",
254,393!
1362
              TMSG_INFO(pRpcMsg->msgType), pMsg->ack);
1363
    }
1364
    lastSendPrintLog = currentTimestamp;
275,908✔
1365
    if ((code = syncSnapBufferSend(pSender, ppMsg)) != 0) {
275,908!
1366
      sSError(pSender, "failed to replicate snap since %s. seq:%d, pReader:%p, finish:%d", tstrerror(code),
×
1367
              pSender->seq, pSender->pReader, pSender->finish);
1368
      goto _ERROR;
×
1369
    }
1370
  }
1371

1372
  // end
1373
  if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
298,832✔
1374
    sSInfo(pSender, "snapshot replication progress:8/8:leader:4/4, process end rsp");
11,235!
1375
    snapshotSenderStop(pSender, true);
11,235✔
1376
    TAOS_CHECK_GOTO(syncNodeReplicateReset(pSyncNode, &pMsg->srcId), NULL, _ERROR);
11,235!
1377
  }
1378

1379
  return 0;
298,832✔
1380

1381
_ERROR:
×
1382
  snapshotSenderStop(pSender, false);
×
1383
  if (syncNodeReplicateReset(pSyncNode, &pMsg->srcId) != 0) sError("failed to reset replicate");
×
1384
  TAOS_RETURN(code);
×
1385
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc