• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4864

26 Nov 2025 05:46AM UTC coverage: 64.548% (+0.009%) from 64.539%
#4864

push

travis-ci

guanshengliang
Merge branch '3.0' into cover/3.0

769 of 945 new or added lines in 33 files covered. (81.38%)

3006 existing lines in 116 files now uncovered.

158227 of 245129 relevant lines covered (64.55%)

111826500.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.68
/source/libs/sync/src/syncSnapshot.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "syncSnapshot.h"
18
#include "syncIndexMgr.h"
19
#include "syncPipeline.h"
20
#include "syncRaftCfg.h"
21
#include "syncRaftLog.h"
22
#include "syncRaftStore.h"
23
#include "syncReplication.h"
24
#include "syncUtil.h"
25
#include "tglobal.h"
26

27
static SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths);
28

29
static void syncSnapBufferReset(SSyncSnapBuffer *pBuf) {
148,658,583✔
30
  for (int64_t i = pBuf->start; i < pBuf->end; ++i) {
148,664,019✔
31
    if (pBuf->entryDeleteCb) {
5,436✔
32
      pBuf->entryDeleteCb(pBuf->entries[i % pBuf->size]);
5,436✔
33
    }
34
    pBuf->entries[i % pBuf->size] = NULL;
5,436✔
35
  }
36
  pBuf->start = SYNC_SNAPSHOT_SEQ_BEGIN + 1;
148,659,928✔
37
  pBuf->end = pBuf->start;
148,659,928✔
38
  pBuf->cursor = pBuf->start - 1;
148,660,215✔
39
}
148,659,570✔
40

41
static void syncSnapBufferDestroy(SSyncSnapBuffer **ppBuf) {
74,194,085✔
42
  if (ppBuf == NULL || ppBuf[0] == NULL) return;
74,194,085✔
43
  SSyncSnapBuffer *pBuf = ppBuf[0];
74,194,373✔
44

45
  syncSnapBufferReset(pBuf);
74,194,758✔
46

47
  (void)taosThreadMutexDestroy(&pBuf->mutex);
74,195,431✔
48
  taosMemoryFree(ppBuf[0]);
74,195,431✔
49
  ppBuf[0] = NULL;
74,190,372✔
50
  return;
74,190,372✔
51
}
52

53
static int32_t syncSnapBufferCreate(SSyncSnapBuffer **ppBuf) {
74,426,113✔
54
  SSyncSnapBuffer *pBuf = taosMemoryCalloc(1, sizeof(SSyncSnapBuffer));
74,426,113✔
55
  if (pBuf == NULL) {
74,425,473✔
56
    *ppBuf = NULL;
×
57
    TAOS_RETURN(terrno);
×
58
  }
59
  pBuf->size = sizeof(pBuf->entries) / sizeof(void *);
74,425,473✔
60
  if (pBuf->size != TSDB_SYNC_SNAP_BUFFER_SIZE) return TSDB_CODE_SYN_INTERNAL_ERROR;
74,425,473✔
61
  (void)taosThreadMutexInit(&pBuf->mutex, NULL);
74,425,473✔
62
  *ppBuf = pBuf;
74,427,012✔
63
  TAOS_RETURN(0);
74,427,012✔
64
}
65

66
int32_t snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex, SSyncSnapshotSender **ppSender) {
69,844,770✔
67
  int32_t code = 0;
69,844,770✔
68
  *ppSender = NULL;
69,844,770✔
69
  bool condition = (pSyncNode->pFsm->FpSnapshotStartRead != NULL) && (pSyncNode->pFsm->FpSnapshotStopRead != NULL) &&
139,690,488✔
70
                   (pSyncNode->pFsm->FpSnapshotDoRead != NULL);
69,845,784✔
71
  if (!condition) {
69,845,497✔
72
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
73
  }
74

75
  SSyncSnapshotSender *pSender = taosMemoryCalloc(1, sizeof(SSyncSnapshotSender));
69,845,497✔
76
  if (pSender == NULL) {
69,839,261✔
77
    TAOS_RETURN(terrno);
×
78
  }
79

80
  pSender->start = false;
69,839,261✔
81
  pSender->seq = SYNC_SNAPSHOT_SEQ_INVALID;
69,839,261✔
82
  pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
69,838,599✔
83
  pSender->pReader = NULL;
69,838,599✔
84
  pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS;
69,838,599✔
85
  pSender->pSyncNode = pSyncNode;
69,841,031✔
86
  pSender->replicaIndex = replicaIndex;
69,841,319✔
87
  pSender->term = raftStoreGetTerm(pSyncNode);
69,842,529✔
88
  pSender->senderStartTime = -1;
69,846,218✔
89
  pSender->finish = false;
69,846,218✔
90

91
  code = pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &pSender->snapshot);
69,846,218✔
92
  if (code != 0) {
69,845,519✔
93
    taosMemoryFreeClear(pSender);
×
94
    TAOS_RETURN(code);
×
95
  }
96
  SSyncSnapBuffer *pSndBuf = NULL;
69,845,519✔
97
  code = syncSnapBufferCreate(&pSndBuf);
69,845,519✔
98
  if (pSndBuf == NULL) {
69,845,671✔
99
    taosMemoryFreeClear(pSender);
×
100
    TAOS_RETURN(code);
×
101
  }
102
  pSndBuf->entryDeleteCb = syncSnapBlockDestroy;
69,845,671✔
103
  pSender->pSndBuf = pSndBuf;
69,845,671✔
104

105
  syncSnapBufferReset(pSender->pSndBuf);
69,845,671✔
106
  *ppSender = pSender;
69,843,737✔
107
  TAOS_RETURN(code);
69,843,737✔
108
}
109

110
void syncSnapBlockDestroy(void *ptr) {
265,857✔
111
  SyncSnapBlock *pBlk = ptr;
265,857✔
112
  if (pBlk->pBlock != NULL) {
265,857✔
113
    taosMemoryFree(pBlk->pBlock);
246,109✔
114
    pBlk->pBlock = NULL;
246,109✔
115
    pBlk->blockLen = 0;
246,109✔
116
  }
117
  taosMemoryFree(pBlk);
265,857✔
118
}
265,857✔
119

120
static int32_t snapshotSenderClearInfoData(SSyncSnapshotSender *pSender) {
69,631,815✔
121
  if (pSender->snapshotParam.data) {
69,631,815✔
122
    taosMemoryFree(pSender->snapshotParam.data);
19,748✔
123
    pSender->snapshotParam.data = NULL;
19,748✔
124
  }
125

126
  if (pSender->snapshot.data) {
69,631,815✔
127
    taosMemoryFree(pSender->snapshot.data);
×
128
    pSender->snapshot.data = NULL;
×
129
  }
130
  return 0;
69,631,815✔
131
}
132

133
void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
69,613,981✔
134
  if (pSender == NULL) return;
69,613,981✔
135

136
  // close reader
137
  if (pSender->pReader != NULL) {
69,613,981✔
138
    pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
×
139
    pSender->pReader = NULL;
×
140
  }
141

142
  // free snap buffer
143
  if (pSender->pSndBuf) {
69,613,981✔
144
    syncSnapBufferDestroy(&pSender->pSndBuf);
69,614,269✔
145
  }
146

147
  (void)snapshotSenderClearInfoData(pSender);
69,609,589✔
148

149
  // free sender
150
  taosMemoryFree(pSender);
69,612,067✔
151
}
152

153
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return atomic_load_8(&pSender->start); }
69,055,391✔
154

155
int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
19,748✔
156
  int32_t code = 0;
19,748✔
157

158
  int8_t started = atomic_val_compare_exchange_8(&pSender->start, false, true);
19,748✔
159
  if (started) return 0;
19,748✔
160

161
  pSender->seq = SYNC_SNAPSHOT_SEQ_PREP;
19,748✔
162
  pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
19,748✔
163
  pSender->pReader = NULL;
19,748✔
164
  pSender->snapshotParam.start = SYNC_INDEX_INVALID;
19,748✔
165
  pSender->snapshotParam.end = SYNC_INDEX_INVALID;
19,748✔
166
  pSender->snapshot.data = NULL;
19,748✔
167
  pSender->snapshotParam.end = SYNC_INDEX_INVALID;
19,748✔
168
  pSender->snapshot.lastApplyIndex = SYNC_INDEX_INVALID;
19,748✔
169
  pSender->snapshot.lastApplyTerm = SYNC_TERM_INVALID;
19,748✔
170
  pSender->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;
19,748✔
171

172
  (void)memset(&pSender->lastConfig, 0, sizeof(pSender->lastConfig));
19,748✔
173
  pSender->sendingMS = 0;
19,748✔
174
  pSender->term = raftStoreGetTerm(pSender->pSyncNode);
19,748✔
175
  pSender->senderStartTime = taosGetMonoTimestampMs();
19,748✔
176
  pSender->lastSendTime = taosGetTimestampMs();
19,748✔
177
  pSender->finish = false;
19,748✔
178

179
  // Get snapshot info
180
  SSyncNode *pSyncNode = pSender->pSyncNode;
19,748✔
181
  SSnapshot  snapInfo = {.type = TDMT_SYNC_PREP_SNAPSHOT};
19,748✔
182
  if ((code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapInfo)) != 0) {
19,748✔
183
    sSError(pSender, "snapshot get info failure since %s", tstrerror(code));
×
184
    goto _out;
×
185
  }
186

187
  void   *pData = snapInfo.data;
19,748✔
188
  int32_t type = (pData) ? snapInfo.type : 0;
19,748✔
189
  int32_t dataLen = 0;
19,748✔
190
  if (pData) {
19,748✔
191
    SSyncTLV *datHead = pData;
19,748✔
192
    if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT) {
19,748✔
193
      sSError(pSender, "unexpected data typ in data of snapshot info. typ: %d", datHead->typ);
×
194
      code = TSDB_CODE_INVALID_DATA_FMT;
×
195
      goto _out;
×
196
    }
197
    dataLen = sizeof(SSyncTLV) + datHead->len;
19,748✔
198
  }
199

200
  sInfo("vgId:%d, send msg:%s", pSyncNode->vgId, TMSG_INFO(type));
19,748✔
201
  if ((code = syncSnapSendMsg(pSender, pSender->seq, pData, dataLen, type)) != 0) {
19,748✔
202
    goto _out;
×
203
  }
204

205
  SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
19,748✔
206
  sSInfo(pSender, "snapshot sender start, to dnode:%d.", DID(&destId));
19,748✔
207
_out:
19,748✔
208
  if (snapInfo.data) {
19,748✔
209
    taosMemoryFree(snapInfo.data);
19,748✔
210
    snapInfo.data = NULL;
19,748✔
211
  }
212
  TAOS_RETURN(code);
19,748✔
213
}
214

215
void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
19,748✔
216
  sSDebug(pSender, "snapshot sender stop, finish:%d reader:%p", finish, pSender->pReader);
19,748✔
217

218
  // update flag
219
  int8_t stopped = !atomic_val_compare_exchange_8(&pSender->start, true, false);
19,748✔
220
  if (stopped) return;
19,748✔
221
  (void)taosThreadMutexLock(&pSender->pSndBuf->mutex);
19,748✔
222
  {
223
    pSender->finish = finish;
19,748✔
224

225
    // close reader
226
    if (pSender->pReader != NULL) {
19,748✔
227
      pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
19,748✔
228
      pSender->pReader = NULL;
19,748✔
229
    }
230

231
    syncSnapBufferReset(pSender->pSndBuf);
19,748✔
232

233
    (void)snapshotSenderClearInfoData(pSender);
19,748✔
234

235
    SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
19,748✔
236
    sSInfo(pSender, "snapshot sender stop, to dnode:%d, finish:%d", DID(&destId), finish);
19,748✔
237
  }
238
  (void)taosThreadMutexUnlock(&pSender->pSndBuf->mutex);
19,748✔
239
}
240

241
int32_t syncSnapSendMsg(SSyncSnapshotSender *pSender, int32_t seq, void *pBlock, int32_t blockLen, int32_t typ) {
305,051✔
242
  int32_t code = 0;
305,051✔
243
  SRpcMsg rpcMsg = {0};
305,051✔
244

245
  if ((code = syncBuildSnapshotSend(&rpcMsg, blockLen, pSender->pSyncNode->vgId)) != 0) {
305,051✔
246
    sSError(pSender, "failed to build snap replication msg since %s", tstrerror(code));
×
247
    goto _OUT;
×
248
  }
249

250
  SyncSnapshotSend *pMsg = rpcMsg.pCont;
305,051✔
251
  pMsg->srcId = pSender->pSyncNode->myRaftId;
305,051✔
252
  pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
305,051✔
253
  pMsg->term = pSender->term;
305,051✔
254
  pMsg->beginIndex = pSender->snapshotParam.start;
305,051✔
255
  pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
305,051✔
256
  pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
305,051✔
257
  pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
305,051✔
258
  pMsg->lastConfig = pSender->lastConfig;
305,051✔
259
  pMsg->snapStartTime = pSender->senderStartTime;
305,051✔
260
  pMsg->seq = seq;
305,051✔
261

262
  if (pBlock != NULL && blockLen > 0) {
305,051✔
263
    (void)memcpy(pMsg->data, pBlock, blockLen);
265,857✔
264
  }
265
  pMsg->payloadType = typ;
305,051✔
266

267
  // send msg
268
  if ((code = syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg)) != 0) {
305,051✔
269
    sSError(pSender, "failed to send snap replication msg since %s. seq:%d", tstrerror(code), seq);
×
270
    goto _OUT;
×
271
  }
272

273
_OUT:
305,051✔
274
  TAOS_RETURN(code);
305,051✔
275
}
276

277
// when sender receive ack, call this function to send msg from seq
278
// seq = ack + 1, already updated
279
static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
305,051✔
280
  int32_t        code = 0;
305,051✔
281
  SyncSnapBlock *pBlk = NULL;
305,051✔
282

283
  if (pSender->seq < SYNC_SNAPSHOT_SEQ_END) {
305,051✔
284
    pSender->seq++;
285,605✔
285

286
    if (pSender->seq > SYNC_SNAPSHOT_SEQ_BEGIN) {
285,605✔
287
      pBlk = taosMemoryCalloc(1, sizeof(SyncSnapBlock));
265,857✔
288
      if (pBlk == NULL) {
265,857✔
289
        code = terrno;
×
290
        goto _OUT;
×
291
      }
292

293
      pBlk->seq = pSender->seq;
265,857✔
294

295
      // read data
296
      code = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader, &pBlk->pBlock,
265,857✔
297
                                                        &pBlk->blockLen);
298
      if (code != 0) {
265,857✔
299
        sSError(pSender, "snapshot sender read failed since %s", tstrerror(code));
×
300
        goto _OUT;
×
301
      }
302

303
      if (pBlk->blockLen > 0) {
265,857✔
304
        // has read data
305
        sSDebug(pSender, "snapshot sender continue to read, blockLen:%d seq:%d", pBlk->blockLen, pBlk->seq);
246,109✔
306
      } else {
307
        // read finish, update seq to end
308
        pSender->seq = SYNC_SNAPSHOT_SEQ_END;
19,748✔
309
        sSInfo(pSender, "snapshot sender read to the end");
19,748✔
310
        goto _OUT;
19,748✔
311
      }
312
    }
313
  }
314

315
  if (!(pSender->seq >= SYNC_SNAPSHOT_SEQ_BEGIN && pSender->seq <= SYNC_SNAPSHOT_SEQ_END)) {
285,303✔
316
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
317
    goto _OUT;
×
318
  }
319

320
  // send msg
321
  int32_t blockLen = (pBlk) ? pBlk->blockLen : 0;
285,303✔
322
  void   *pBlock = (pBlk) ? pBlk->pBlock : NULL;
285,303✔
323
  if ((code = syncSnapSendMsg(pSender, pSender->seq, pBlock, blockLen, 0)) != 0) {
285,303✔
324
    goto _OUT;
×
325
  }
326

327
  // put in buffer
328
  int64_t nowMs = taosGetTimestampMs();
285,303✔
329
  if (pBlk) {
285,303✔
330
    if (!(pBlk->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pBlk->seq < SYNC_SNAPSHOT_SEQ_END)) {
246,109✔
331
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
332
      goto _OUT;
×
333
    }
334
    pBlk->sendTimeMs = nowMs;
246,109✔
335
    pSender->pSndBuf->entries[pSender->seq % pSender->pSndBuf->size] = pBlk;
246,109✔
336
    pBlk = NULL;
246,109✔
337
    pSender->pSndBuf->end = TMAX(pSender->seq + 1, pSender->pSndBuf->end);
246,109✔
338
  }
339
  pSender->lastSendTime = nowMs;
285,303✔
340

341
_OUT:;
305,051✔
342
  if (pBlk != NULL) {
305,051✔
343
    syncSnapBlockDestroy(pBlk);
19,748✔
344
    pBlk = NULL;
19,748✔
345
  }
346
  TAOS_RETURN(code);
305,051✔
347
}
348

349
// send snapshot data from cache
350
int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
×
351
  SSyncSnapBuffer *pSndBuf = pSender->pSndBuf;
×
352
  int32_t          code = 0;
×
353
  (void)taosThreadMutexLock(&pSndBuf->mutex);
×
354
  if (pSender->pReader == NULL || pSender->finish || !snapshotSenderIsStart(pSender)) {
×
355
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
356
    goto _out;
×
357
  }
358

359
  for (int32_t seq = pSndBuf->cursor + 1; seq < pSndBuf->end; ++seq) {
×
360
    SyncSnapBlock *pBlk = pSndBuf->entries[seq % pSndBuf->size];
×
361
    if (!pBlk) {
×
362
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
363
      goto _out;
×
364
    }
365
    int64_t nowMs = taosGetTimestampMs();
×
366
    if (pBlk->acked || nowMs < pBlk->sendTimeMs + SYNC_SNAP_RESEND_MS) {
×
367
      continue;
×
368
    }
369
    if ((code = syncSnapSendMsg(pSender, pBlk->seq, pBlk->pBlock, pBlk->blockLen, 0)) != 0) {
×
370
      goto _out;
×
371
    }
372
    pBlk->sendTimeMs = nowMs;
×
373
  }
374

375
  if (pSender->seq != SYNC_SNAPSHOT_SEQ_END && pSndBuf->end <= pSndBuf->start) {
×
376
    if ((code = snapshotSend(pSender)) != 0) {
×
377
      goto _out;
×
378
    }
379
  }
380

381
  if (pSender->seq == SYNC_SNAPSHOT_SEQ_END && pSndBuf->end <= pSndBuf->start) {
×
382
    if ((code = syncSnapSendMsg(pSender, pSender->seq, NULL, 0, 0)) != 0) {
×
383
      goto _out;
×
384
    }
385
  }
386
_out:;
×
387
  (void)taosThreadMutexUnlock(&pSndBuf->mutex);
×
388
  TAOS_RETURN(code);
×
389
}
390

391
int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) {
19,748✔
392
  SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, pDestId);
19,748✔
393
  if (pSender == NULL) {
19,748✔
394
    sNError(pSyncNode, "snapshot sender start error since get failed");
×
395
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
396
  }
397

398
  if (snapshotSenderIsStart(pSender)) {
19,748✔
399
    sSDebug(pSender, "snapshot sender already start, ignore");
×
400
    return 0;
×
401
  }
402

403
  taosMsleep(1);
19,748✔
404

405
  int32_t code = snapshotSenderStart(pSender);
19,748✔
406
  if (code != 0) {
19,748✔
407
    sSError(pSender, "snapshot sender start error since %s", tstrerror(code));
×
408
    TAOS_RETURN(code);
×
409
  }
410

411
  return 0;
19,748✔
412
}
413

414
// receiver
415
int32_t snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId, SSyncSnapshotReceiver **ppReceiver) {
4,580,594✔
416
  int32_t code = 0;
4,580,594✔
417
  *ppReceiver = NULL;
4,580,594✔
418
  bool condition = (pSyncNode->pFsm->FpSnapshotStartWrite != NULL) && (pSyncNode->pFsm->FpSnapshotStopWrite != NULL) &&
9,161,188✔
419
                   (pSyncNode->pFsm->FpSnapshotDoWrite != NULL);
4,580,594✔
420
  if (!condition) {
4,580,594✔
421
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
422
  }
423

424
  SSyncSnapshotReceiver *pReceiver = taosMemoryCalloc(1, sizeof(SSyncSnapshotReceiver));
4,580,594✔
425
  if (pReceiver == NULL) {
4,580,594✔
426
    TAOS_RETURN(terrno);
×
427
  }
428

429
  pReceiver->start = false;
4,580,594✔
430
  pReceiver->receiverStartTime = 0;
4,580,594✔
431
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
4,580,594✔
432
  pReceiver->pWriter = NULL;
4,580,594✔
433
  code = taosThreadMutexInit(&pReceiver->writerMutex, NULL);
4,580,594✔
434
  if (code != 0) {
4,580,594✔
435
    taosMemoryFree(pReceiver);
×
436
    pReceiver = NULL;
×
437
    TAOS_RETURN(code);
×
438
  }
439
  pReceiver->pSyncNode = pSyncNode;
4,580,594✔
440
  pReceiver->fromId = fromId;
4,580,594✔
441
  pReceiver->term = raftStoreGetTerm(pSyncNode);
4,580,594✔
442
  pReceiver->snapshot.data = NULL;
4,580,594✔
443
  pReceiver->snapshot.lastApplyIndex = SYNC_INDEX_INVALID;
4,580,594✔
444
  pReceiver->snapshot.lastApplyTerm = 0;
4,580,594✔
445
  pReceiver->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;
4,580,594✔
446

447
  SSyncSnapBuffer *pRcvBuf = NULL;
4,580,594✔
448
  code = syncSnapBufferCreate(&pRcvBuf);
4,580,594✔
449
  if (pRcvBuf == NULL) {
4,580,594✔
450
    int32_t ret = taosThreadMutexDestroy(&pReceiver->writerMutex);
×
451
    if (ret != 0) {
×
452
      sError("failed to destroy mutex since %s", tstrerror(ret));
×
453
    }
454
    taosMemoryFree(pReceiver);
×
455
    pReceiver = NULL;
×
456
    TAOS_RETURN(code);
×
457
  }
458
  pRcvBuf->entryDeleteCb = rpcFreeCont;
4,580,594✔
459
  pReceiver->pRcvBuf = pRcvBuf;
4,580,594✔
460

461
  syncSnapBufferReset(pReceiver->pRcvBuf);
4,580,594✔
462
  *ppReceiver = pReceiver;
4,580,594✔
463
  TAOS_RETURN(code);
4,580,594✔
464
}
465

466
static int32_t snapshotReceiverClearInfoData(SSyncSnapshotReceiver *pReceiver) {
4,599,540✔
467
  if (pReceiver->snapshotParam.data) {
4,599,540✔
468
    taosMemoryFree(pReceiver->snapshotParam.data);
19,724✔
469
    pReceiver->snapshotParam.data = NULL;
19,724✔
470
  }
471

472
  if (pReceiver->snapshot.data) {
4,599,540✔
473
    taosMemoryFree(pReceiver->snapshot.data);
×
474
    pReceiver->snapshot.data = NULL;
×
475
  }
476
  return 0;
4,599,463✔
477
}
478

479
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
4,579,816✔
480
  if (pReceiver == NULL) return;
4,579,816✔
481

482
  (void)taosThreadMutexLock(&pReceiver->writerMutex);
4,579,816✔
483
  // close writer
484
  if (pReceiver->pWriter != NULL) {
4,579,816✔
485
    int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
×
486
                                                                   false, &pReceiver->snapshot);
487
    if (code != 0) {
×
488
      sError("vgId:%d, snapshot receiver stop failed while destroy since %s", pReceiver->pSyncNode->vgId,
×
489
             tstrerror(code));
490
    }
491
    pReceiver->pWriter = NULL;
×
492
  }
493
  (void)taosThreadMutexUnlock(&pReceiver->writerMutex);
4,579,816✔
494

495
  (void)taosThreadMutexDestroy(&pReceiver->writerMutex);
4,579,816✔
496

497
  // free snap buf
498
  if (pReceiver->pRcvBuf) {
4,579,816✔
499
    syncSnapBufferDestroy(&pReceiver->pRcvBuf);
4,579,816✔
500
  }
501

502
  (void)snapshotReceiverClearInfoData(pReceiver);
4,579,816✔
503

504
  // free receiver
505
  taosMemoryFree(pReceiver);
4,579,739✔
506
}
507

508
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) {
12,817,968✔
509
  return (pReceiver != NULL ? atomic_load_8(&pReceiver->start) : false);
12,817,968✔
510
}
511

512
static int32_t snapshotReceiverSignatureCmp(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
282,000✔
513
  int32_t code = 0;
282,000✔
514
  if (pReceiver->term < pMsg->term) {
282,000✔
NEW
515
    code = -1;
×
NEW
516
    goto _OVER;
×
517
  }
518
  if (pReceiver->term > pMsg->term) {
282,000✔
NEW
519
    code = 1;
×
NEW
520
    goto _OVER;
×
521
  }
522
  if (pReceiver->receiverStartTime < pMsg->snapStartTime) {
282,000✔
NEW
523
    code = -2;
×
NEW
524
    goto _OVER;
×
525
  }
526
  if (pReceiver->receiverStartTime > pMsg->snapStartTime) {
282,000✔
NEW
527
    code = 2;
×
NEW
528
    goto _OVER;
×
529
  }
530
_OVER:
282,000✔
531
  if (code > 0) {
282,000✔
NEW
532
    sRError(pReceiver, "receiver signature failed, stale snapshot, result:%d, msg signature:(%" PRId64 ", %" PRId64 ")",
×
533
            code, pMsg->term, pMsg->snapStartTime);
534
  } else if (code < 0) {
282,000✔
NEW
535
    sRWarn(pReceiver,
×
536
           "receiver signature failed, result:%d, a newer snapshot, msg signature:(%" PRId64 ", %" PRId64 ")", code,
537
           pMsg->term, pMsg->snapStartTime);
538
  }
539
  return code;
282,000✔
540
}
541

542
static int32_t snapshotReceiverStartWriter(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) {
19,724✔
543
  if (pReceiver->pWriter != NULL) {
19,724✔
UNCOV
544
    sRError(pReceiver, "snapshot receiver writer already started before");
×
545
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
546
  }
547

548
  // update ack
549
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
19,724✔
550

551
  // update snapshot
552
  pReceiver->snapshot.lastApplyIndex = pBeginMsg->lastIndex;
19,724✔
553
  pReceiver->snapshot.lastApplyTerm = pBeginMsg->lastTerm;
19,724✔
554
  pReceiver->snapshot.lastConfigIndex = pBeginMsg->lastConfigIndex;
19,724✔
555
  pReceiver->snapshotParam.start = pBeginMsg->beginIndex;
19,724✔
556
  pReceiver->snapshotParam.end = pBeginMsg->lastIndex;
19,724✔
557

558
  // start writer
559
  int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, &pReceiver->snapshotParam,
19,724✔
560
                                                                  &pReceiver->pWriter);
561
  if (code != 0) {
19,724✔
562
    sRError(pReceiver, "snapshot receiver start writer failed since %s", tstrerror(code));
×
563
    TAOS_RETURN(code);
×
564
  }
565

566
  // event log
567
  sRInfo(pReceiver, "snapshot receiver writer started");
19,724✔
568
  return 0;
19,724✔
569
}
570

571
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pPreMsg) {
19,724✔
572
  if (snapshotReceiverIsStart(pReceiver)) {
19,724✔
UNCOV
573
    sRInfo(pReceiver, "snapshot receiver has started");
×
UNCOV
574
    return;
×
575
  }
576

577
  int8_t started = atomic_val_compare_exchange_8(&pReceiver->start, false, true);
19,724✔
578
  if (started) return;
19,724✔
579

580
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_PREP;
19,724✔
581
  pReceiver->term = pPreMsg->term;
19,724✔
582
  pReceiver->fromId = pPreMsg->srcId;
19,724✔
583
  pReceiver->receiverStartTime = pPreMsg->snapStartTime;
19,724✔
584

585
  pReceiver->snapshotParam.start = syncNodeGetSnapBeginIndex(pReceiver->pSyncNode);
19,724✔
586
  pReceiver->snapshotParam.end = -1;
19,724✔
587

588
  sRInfo(pReceiver, "snapshot receiver start, from dnode:%d.", DID(&pReceiver->fromId));
19,724✔
589
}
590

591
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
19,724✔
592
  sRDebug(pReceiver, "snapshot receiver stop, not apply, writer:%p", pReceiver->pWriter);
19,724✔
593

594
  int8_t stopped = !atomic_val_compare_exchange_8(&pReceiver->start, true, false);
19,724✔
595
  if (stopped) return;
19,724✔
596

597
  (void)taosThreadMutexLock(&pReceiver->writerMutex);
19,724✔
598
  {
599
    if (pReceiver->pWriter != NULL) {
19,724✔
600
      int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
302✔
601
                                                                     false, &pReceiver->snapshot);
602
      if (code != 0) {
302✔
UNCOV
603
        sRError(pReceiver, "snapshot receiver stop write failed since %s", tstrerror(code));
×
604
      }
605
      pReceiver->pWriter = NULL;
302✔
606
    } else {
607
      sRInfo(pReceiver, "snapshot receiver stop, writer is null");
19,422✔
608
    }
609
  }
610
  (void)taosThreadMutexUnlock(&pReceiver->writerMutex);
19,724✔
611

612
  (void)taosThreadMutexLock(&pReceiver->pRcvBuf->mutex);
19,724✔
613
  {
614
    syncSnapBufferReset(pReceiver->pRcvBuf);
19,724✔
615

616
    (void)snapshotReceiverClearInfoData(pReceiver);
19,724✔
617
  }
618
  (void)taosThreadMutexUnlock(&pReceiver->pRcvBuf->mutex);
19,724✔
619
}
620

621
static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
19,422✔
622
  int32_t code = 0;
19,422✔
623
  if (pReceiver->pWriter != NULL) {
19,422✔
624
    // write data
625
    sRInfo(pReceiver, "snapshot receiver write about to finish, blockLen:%d seq:%d", pMsg->dataLen, pMsg->seq);
19,422✔
626
    if (pMsg->dataLen > 0) {
19,422✔
UNCOV
627
      (void)taosThreadMutexLock(&pReceiver->writerMutex);
×
UNCOV
628
      code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, pMsg->data,
×
UNCOV
629
                                                           pMsg->dataLen);
×
UNCOV
630
      (void)taosThreadMutexUnlock(&pReceiver->writerMutex);
×
UNCOV
631
      if (code != 0) {
×
UNCOV
632
        sRError(pReceiver, "failed to finish snapshot receiver write since %s", tstrerror(code));
×
UNCOV
633
        TAOS_RETURN(code);
×
634
      }
635
    }
636

637
    // update commit index
638
    if (pReceiver->snapshot.lastApplyIndex > pReceiver->pSyncNode->commitIndex) {
19,422✔
639
      pReceiver->pSyncNode->commitIndex = pReceiver->snapshot.lastApplyIndex;
19,422✔
640
    }
641

642
    // maybe update term
643
    if (pReceiver->snapshot.lastApplyTerm > raftStoreGetTerm(pReceiver->pSyncNode)) {
19,422✔
UNCOV
644
      raftStoreSetTerm(pReceiver->pSyncNode, pReceiver->snapshot.lastApplyTerm);
×
645
    }
646

647
    (void)taosThreadMutexLock(&pReceiver->writerMutex);
19,422✔
648
    if (pReceiver->pWriter != NULL) {
19,422✔
649
      // stop writer, apply data
650
      code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true,
19,422✔
651
                                                             &pReceiver->snapshot);
652
      if (code != 0) {
19,422✔
UNCOV
653
        sRError(pReceiver, "snapshot receiver apply failed since %s", tstrerror(code));
×
UNCOV
654
        TAOS_RETURN(code);
×
655
      }
656
      pReceiver->pWriter = NULL;
19,422✔
657
      sRInfo(pReceiver, "snapshot receiver write stopped");
19,422✔
658
    }
659
    (void)taosThreadMutexUnlock(&pReceiver->writerMutex);
19,422✔
660

661
    // update progress
662
    pReceiver->ack = SYNC_SNAPSHOT_SEQ_END;
19,422✔
663

664
    // get fsmState
665
    SSnapshot snapshot = {0};
19,422✔
666
    code = pReceiver->pSyncNode->pFsm->FpGetSnapshotInfo(pReceiver->pSyncNode->pFsm, &snapshot);
19,422✔
667
    if (code != 0) {
19,422✔
UNCOV
668
      sRError(pReceiver, "snapshot receiver get snapshot info failed since %s", tstrerror(code));
×
UNCOV
669
      TAOS_RETURN(code);
×
670
    }
671
    pReceiver->pSyncNode->fsmState = snapshot.state;
19,422✔
672

673
    // reset wal
674
    code =
675
        pReceiver->pSyncNode->pLogStore->syncLogRestoreFromSnapshot(pReceiver->pSyncNode->pLogStore, pMsg->lastIndex);
19,422✔
676
    if (code != 0) {
19,422✔
UNCOV
677
      sRError(pReceiver, "failed to snapshot receiver log restore since %s", tstrerror(code));
×
UNCOV
678
      TAOS_RETURN(code);
×
679
    }
680
    sRInfo(pReceiver, "wal log restored from snapshot");
19,422✔
681
  } else {
UNCOV
682
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
UNCOV
683
    sRError(pReceiver, "snapshot receiver finish error since writer is null");
×
UNCOV
684
    TAOS_RETURN(code);
×
685
  }
686

687
  return 0;
19,422✔
688
}
689

690
static int32_t snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
242,854✔
691
  if (pMsg->seq != pReceiver->ack + 1) {
242,854✔
692
    sRError(pReceiver, "snapshot receiver invalid seq, ack:%d seq:%d", pReceiver->ack, pMsg->seq);
1,208✔
693
    TAOS_RETURN(TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG);
1,208✔
694
  }
695

696
  (void)taosThreadMutexLock(&pReceiver->writerMutex);
241,646✔
697

698
  if (pReceiver->pWriter == NULL) {
241,646✔
699
    (void)taosThreadMutexUnlock(&pReceiver->writerMutex);
302✔
700
    sRError(pReceiver, "snapshot receiver failed to write data since writer is null");
302✔
701
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
302✔
702
  }
703

704
  sRDebug(pReceiver, "snapshot receiver continue to write, blockLen:%d seq:%d", pMsg->dataLen, pMsg->seq);
241,344✔
705

706
  if (pMsg->dataLen > 0) {
241,344✔
707
    // apply data block
708
    int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
482,688✔
709
                                                                 pMsg->data, pMsg->dataLen);
241,344✔
710
    if (code != 0) {
241,344✔
UNCOV
711
      (void)taosThreadMutexUnlock(&pReceiver->writerMutex);
×
UNCOV
712
      sRError(pReceiver, "snapshot receiver continue write failed since %s", tstrerror(code));
×
UNCOV
713
      TAOS_RETURN(code);
×
714
    }
715
  }
716

717
  (void)taosThreadMutexUnlock(&pReceiver->writerMutex);
241,344✔
718

719
  // update progress
720
  pReceiver->ack = pMsg->seq;
241,344✔
721

722
  // event log
723
  sRDebug(pReceiver, "snapshot receiver continue to write finish");
241,344✔
724
  return 0;
241,344✔
725
}
726

727
SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths) {
39,448✔
728
  SyncIndex snapStart = SYNC_INDEX_INVALID;
39,448✔
729

730
  if (syncNodeIsMnode(ths)) {
39,448✔
731
    snapStart = SYNC_INDEX_BEGIN;
×
UNCOV
732
    sNInfo(ths, "snapshot begin index is %" PRId64 " since its mnode", snapStart);
×
733
  } else {
734
    SSyncLogStoreData *pData = ths->pLogStore->data;
39,448✔
735
    SWal              *pWal = pData->pWal;
39,448✔
736

737
    int64_t walCommitVer = walGetCommittedVer(pWal);
39,448✔
738
    snapStart = TMAX(ths->commitIndex, walCommitVer) + 1;
39,448✔
739

740
    sNInfo(ths, "snapshot begin index is %" PRId64, snapStart);
39,448✔
741
  }
742

743
  return snapStart;
39,448✔
744
}
745

746
static int32_t syncSnapReceiverExchgSnapInfo(SSyncNode *pSyncNode, SSyncSnapshotReceiver *pReceiver,
19,724✔
747
                                             SyncSnapshotSend *pMsg, SSnapshot *pInfo) {
748
  if (pMsg->payloadType != TDMT_SYNC_PREP_SNAPSHOT) return TSDB_CODE_SYN_INTERNAL_ERROR;
19,724✔
749
  int32_t code = 0, lino = 0;
19,724✔
750

751
  // copy snap info from leader
752
  void *data = taosMemoryCalloc(1, pMsg->dataLen);
19,724✔
753
  if (data == NULL) {
19,724✔
UNCOV
754
    TAOS_CHECK_EXIT(terrno);
×
755
  }
756
  pInfo->data = data;
19,724✔
757
  data = NULL;
19,724✔
758
  (void)memcpy(pInfo->data, pMsg->data, pMsg->dataLen);
19,724✔
759

760
  // exchange snap info
761
  if ((code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, pInfo)) != 0) {
19,724✔
UNCOV
762
    sRError(pReceiver, "failed to get snapshot info. type: %d", pMsg->payloadType);
×
UNCOV
763
    goto _exit;
×
764
  }
765
  SSyncTLV *datHead = pInfo->data;
19,724✔
766
  if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
19,724✔
UNCOV
767
    sRError(pReceiver, "unexpected data typ in data of snapshot info. typ: %d", datHead->typ);
×
UNCOV
768
    code = TSDB_CODE_INVALID_DATA_FMT;
×
UNCOV
769
    goto _exit;
×
770
  }
771
  int32_t dataLen = sizeof(SSyncTLV) + datHead->len;
19,724✔
772

773
  // save exchanged snap info
774
  SSnapshotParam *pParam = &pReceiver->snapshotParam;
19,724✔
775
  data = taosMemoryRealloc(pParam->data, dataLen);
19,724✔
776
  if (data == NULL) {
19,724✔
UNCOV
777
    code = terrno;
×
UNCOV
778
    sError("vgId:%d, failed to realloc memory for snapshot prep due to %s. dataLen:%d", pSyncNode->vgId,
×
779
           tstrerror(code), dataLen);
780
    goto _exit;
×
781
  }
782
  pParam->data = data;
19,724✔
783
  data = NULL;
19,724✔
784
  (void)memcpy(pParam->data, pInfo->data, dataLen);
19,724✔
785

786
_exit:
19,724✔
787
  TAOS_RETURN(code);
19,724✔
788
}
789

790
static int32_t syncNodeOnSnapshotPrep(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
19,724✔
791
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
19,724✔
792
  int64_t                timeNow = taosGetTimestampMs();
19,724✔
793
  int32_t                code = 0;
19,724✔
794

795
  if (snapshotReceiverIsStart(pReceiver)) {
19,724✔
796
    // already start
UNCOV
797
    int32_t order = 0;
×
798
    if ((order = snapshotReceiverSignatureCmp(pReceiver, pMsg)) < 0) {  // order < 0
×
NEW
799
      sWarn("failed to prepare snapshot, received a new snapshot preparation. restart receiver.");
×
UNCOV
800
      goto _START_RECEIVER;
×
UNCOV
801
    } else if (order == 0) {  // order == 0
×
UNCOV
802
      sInfo("prepare snapshot, received a duplicate snapshot preparation. send reply.");
×
UNCOV
803
      goto _SEND_REPLY;
×
804
    } else {  // order > 0
805
      // ignore
UNCOV
806
      sError("failed to prepare snapshot, received a stale snapshot preparation. ignore.");
×
UNCOV
807
      code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
UNCOV
808
      goto _SEND_REPLY;
×
809
    }
810
  } else {
811
    // start new
812
    sRInfo(pReceiver, "snapshot receiver not start yet so start new one");
19,724✔
813
    goto _START_RECEIVER;
19,724✔
814
  }
815

816
_START_RECEIVER:
19,724✔
817
  if (snapshotReceiverIsStart(pReceiver)) {
19,724✔
818
    sRInfo(pReceiver, "snapshot receiver already start and force stop pre one");
×
819
    snapshotReceiverStop(pReceiver);
×
820
  }
821

822
  snapshotReceiverStart(pReceiver, pMsg);
19,724✔
823

824
_SEND_REPLY:;
19,724✔
825

826
  SSnapshot snapInfo = {.type = TDMT_SYNC_PREP_SNAPSHOT_REPLY};
19,724✔
827
  int32_t   dataLen = 0;
19,724✔
828
  if (pMsg->payloadType == TDMT_SYNC_PREP_SNAPSHOT) {
19,724✔
829
    if ((code = syncSnapReceiverExchgSnapInfo(pSyncNode, pReceiver, pMsg, &snapInfo)) != 0) {
19,724✔
UNCOV
830
      goto _out;
×
831
    }
832
    SSyncTLV *datHead = snapInfo.data;
19,724✔
833
    dataLen = sizeof(SSyncTLV) + datHead->len;
19,724✔
834
  }
835

836
  // send response
837
  int32_t type = (snapInfo.data) ? snapInfo.type : 0;
19,724✔
838
  if ((code = syncSnapSendRsp(pReceiver, pMsg, snapInfo.data, dataLen, type, code)) != 0) {
19,724✔
UNCOV
839
    goto _out;
×
840
  }
841

842
_out:
19,724✔
843
  if (snapInfo.data) {
19,724✔
844
    taosMemoryFree(snapInfo.data);
19,724✔
845
    snapInfo.data = NULL;
19,724✔
846
  }
847
  TAOS_RETURN(code);
19,724✔
848
}
849

850
static int32_t syncNodeOnSnapshotBegin(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
19,724✔
851
  // condition 1
852
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
19,724✔
853
  int32_t                code = TSDB_CODE_SYN_INTERNAL_ERROR;
19,724✔
854

855
  if (!snapshotReceiverIsStart(pReceiver)) {
19,724✔
UNCOV
856
    sRError(pReceiver, "failed to begin snapshot receiver since not started");
×
857
    goto _SEND_REPLY;
×
858
  }
859

860
  if (snapshotReceiverSignatureCmp(pReceiver, pMsg) != 0) {
19,724✔
UNCOV
861
    code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
UNCOV
862
    sError("failed to begin snapshot, since %s", tstrerror(code));
×
UNCOV
863
    goto _SEND_REPLY;
×
864
  }
865

866
  // start writer
867
  if ((code = snapshotReceiverStartWriter(pReceiver, pMsg)) != 0) {
19,724✔
UNCOV
868
    sRError(pReceiver, "failed to start snapshot writer since %s", tstrerror(code));
×
UNCOV
869
    goto _SEND_REPLY;
×
870
  }
871

872
  SyncIndex beginIndex = syncNodeGetSnapBeginIndex(pSyncNode);
19,724✔
873
  if (pReceiver->snapshotParam.start != beginIndex) {
19,724✔
874
    sRError(pReceiver, "snapshot begin index is changed unexpectedly. sver:%" PRId64 ", beginIndex:%" PRId64,
×
875
            pReceiver->snapshotParam.start, beginIndex);
UNCOV
876
    goto _SEND_REPLY;
×
877
  }
878

879
  code = 0;
19,724✔
880
_SEND_REPLY:
19,724✔
881

882
  // send response
883
  TAOS_CHECK_RETURN(syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, code));
19,724✔
884

885
  TAOS_RETURN(code);
19,724✔
886
}
887

888
int32_t syncSnapSendRsp(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg, void *pBlock, int32_t blockLen,
282,302✔
889
                        int32_t type, int32_t rspCode) {
890
  int32_t    code = 0;
282,302✔
891
  SSyncNode *pSyncNode = pReceiver->pSyncNode;
282,302✔
892
  // build msg
893
  SRpcMsg rpcMsg = {0};
282,302✔
894
  if ((code = syncBuildSnapshotSendRsp(&rpcMsg, blockLen, pSyncNode->vgId)) != 0) {
282,302✔
UNCOV
895
    sRError(pReceiver, "failed to build snapshot receiver resp since %s", tstrerror(code));
×
UNCOV
896
    TAOS_RETURN(code);
×
897
  }
898

899
  SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
282,302✔
900
  pRspMsg->srcId = pSyncNode->myRaftId;
282,302✔
901
  pRspMsg->destId = pMsg->srcId;
282,302✔
902
  pRspMsg->term = pMsg->term;
282,302✔
903
  pRspMsg->lastIndex = pMsg->lastIndex;
282,302✔
904
  pRspMsg->lastTerm = pMsg->lastTerm;
282,302✔
905
  pRspMsg->startTime = pMsg->snapStartTime;
282,302✔
906
  pRspMsg->ack = pMsg->seq;
282,302✔
907
  pRspMsg->code = rspCode;
282,302✔
908
  pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;
282,302✔
909
  pRspMsg->payloadType = type;
282,302✔
910

911
  if (pBlock != NULL && blockLen > 0) {
282,302✔
912
    (void)memcpy(pRspMsg->data, pBlock, blockLen);
19,724✔
913
  }
914

915
  // send msg
916
  if ((code = syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg)) != 0) {
282,302✔
UNCOV
917
    sRError(pReceiver, "failed to send snapshot receiver resp since %s", tstrerror(code));
×
UNCOV
918
    TAOS_RETURN(code);
×
919
  }
920
  return 0;
282,302✔
921
}
922

923
static int32_t syncSnapBufferRecv(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend **ppMsg) {
242,854✔
924
  int32_t           code = 0;
242,854✔
925
  SSyncSnapBuffer  *pRcvBuf = pReceiver->pRcvBuf;
242,854✔
926
  SyncSnapshotSend *pMsg = ppMsg[0];
242,854✔
927

928
  (void)taosThreadMutexLock(&pRcvBuf->mutex);
242,854✔
929

930
  if (pMsg->seq - pRcvBuf->start >= pRcvBuf->size) {
242,854✔
UNCOV
931
    code = TSDB_CODE_SYN_BUFFER_FULL;
×
UNCOV
932
    goto _out;
×
933
  }
934

935
  if (!(pRcvBuf->start <= pRcvBuf->cursor + 1 && pRcvBuf->cursor < pRcvBuf->end)) return TSDB_CODE_SYN_INTERNAL_ERROR;
242,854✔
936

937
  if (pMsg->seq > pRcvBuf->cursor) {
242,854✔
938
    if (pRcvBuf->entries[pMsg->seq % pRcvBuf->size]) {
242,854✔
UNCOV
939
      pRcvBuf->entryDeleteCb(pRcvBuf->entries[pMsg->seq % pRcvBuf->size]);
×
940
    }
941
    pRcvBuf->entries[pMsg->seq % pRcvBuf->size] = pMsg;
242,854✔
942
    ppMsg[0] = NULL;
242,854✔
943
    pRcvBuf->end = TMAX(pMsg->seq + 1, pRcvBuf->end);
242,854✔
UNCOV
944
  } else if (pMsg->seq < pRcvBuf->start) {
×
UNCOV
945
    code = syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, code);
×
UNCOV
946
    goto _out;
×
947
  }
948

949
  for (int64_t seq = pRcvBuf->cursor + 1; seq < pRcvBuf->end; ++seq) {
485,708✔
950
    if (pRcvBuf->entries[seq % pRcvBuf->size]) {
320,126✔
951
      pRcvBuf->cursor = seq;
242,854✔
952
    } else {
953
      break;
77,272✔
954
    }
955
  }
956

957
  for (int64_t seq = pRcvBuf->start; seq <= pRcvBuf->cursor; ++seq) {
484,198✔
958
    if ((code = snapshotReceiverGotData(pReceiver, pRcvBuf->entries[seq % pRcvBuf->size])) != 0) {
242,854✔
959
      if (code >= SYNC_SNAPSHOT_SEQ_INVALID) {
1,510✔
UNCOV
960
        code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
961
      }
962
    }
963
    pRcvBuf->start = seq + 1;
242,854✔
964
    if (syncSnapSendRsp(pReceiver, pRcvBuf->entries[seq % pRcvBuf->size], NULL, 0, 0, code) != 0) {
242,854✔
UNCOV
965
      sError("failed to send snap rsp");
×
966
    }
967
    pRcvBuf->entryDeleteCb(pRcvBuf->entries[seq % pRcvBuf->size]);
242,854✔
968
    pRcvBuf->entries[seq % pRcvBuf->size] = NULL;
242,854✔
969
    if (code) goto _out;
242,854✔
970
  }
971

972
_out:
241,344✔
973
  (void)taosThreadMutexUnlock(&pRcvBuf->mutex);
242,854✔
974
  TAOS_RETURN(code);
242,854✔
975
}
976

977
static int32_t syncNodeOnSnapshotReceive(SSyncNode *pSyncNode, SyncSnapshotSend **ppMsg) {
242,854✔
978
  // condition 4
979
  // transfering
980
  SyncSnapshotSend *pMsg = ppMsg[0];
242,854✔
981
  if (!pMsg) return TSDB_CODE_SYN_INTERNAL_ERROR;
242,854✔
982
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
242,854✔
983
  int64_t                timeNow = taosGetTimestampMs();
242,854✔
984
  int32_t                code = 0;
242,854✔
985

986
  if (snapshotReceiverSignatureCmp(pReceiver, pMsg) != 0) {
242,854✔
UNCOV
987
    code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
UNCOV
988
    sError("failed to receive snapshot data, since %s", tstrerror(code));
×
UNCOV
989
    return syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, code);
×
990
  }
991

992
  return syncSnapBufferRecv(pReceiver, ppMsg);
242,854✔
993
}
994

995
static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
19,422✔
996
  // condition 2
997
  // end, finish FSM
998
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
19,422✔
999
  int64_t                timeNow = taosGetTimestampMs();
19,422✔
1000
  int32_t                code = 0;
19,422✔
1001

1002
  if (snapshotReceiverSignatureCmp(pReceiver, pMsg) != 0) {
19,422✔
UNCOV
1003
    code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
UNCOV
1004
    sError("failed to end snapshot, since %s", tstrerror(code));
×
1005
    goto _SEND_REPLY;
×
1006
  }
1007

1008
  code = snapshotReceiverFinish(pReceiver, pMsg);
19,422✔
1009
  if (code == 0) {
19,422✔
1010
    snapshotReceiverStop(pReceiver);
19,422✔
1011
  }
1012

UNCOV
1013
_SEND_REPLY:;
×
1014

1015
  // build msg
1016
  SRpcMsg rpcMsg = {0};
19,422✔
1017
  if ((code = syncBuildSnapshotSendRsp(&rpcMsg, 0, pSyncNode->vgId)) != 0) {
19,422✔
UNCOV
1018
    sRError(pReceiver, "snapshot receiver build rsp failed since %s", tstrerror(code));
×
UNCOV
1019
    TAOS_RETURN(code);
×
1020
  }
1021

1022
  SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
19,422✔
1023
  pRspMsg->srcId = pSyncNode->myRaftId;
19,422✔
1024
  pRspMsg->destId = pMsg->srcId;
19,422✔
1025
  pRspMsg->term = raftStoreGetTerm(pSyncNode);
19,422✔
1026
  pRspMsg->lastIndex = pMsg->lastIndex;
19,422✔
1027
  pRspMsg->lastTerm = pMsg->lastTerm;
19,422✔
1028
  pRspMsg->startTime = pMsg->snapStartTime;
19,422✔
1029
  pRspMsg->ack = pReceiver->ack;  // receiver maybe already closed
19,422✔
1030
  pRspMsg->code = code;
19,422✔
1031
  pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;
19,422✔
1032

1033
  // send msg
1034
  syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver end", &rpcMsg.info.traceId);
19,422✔
1035
  if ((code = syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg)) != 0) {
19,422✔
1036
    sRError(pReceiver, "snapshot receiver send rsp failed since %s", tstrerror(code));
×
1037
    TAOS_RETURN(code);
×
1038
  }
1039

1040
  TAOS_RETURN(code);
19,422✔
1041
}
1042

1043
int64_t lastRecvPrintLog = 0;
1044

1045
int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
301,724✔
1046
  SyncSnapshotSend     **ppMsg = (SyncSnapshotSend **)&pRpcMsg->pCont;
301,724✔
1047
  SyncSnapshotSend      *pMsg = ppMsg[0];
301,724✔
1048
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
301,724✔
1049
  int32_t                code = 0;
301,724✔
1050

1051
  // if already drop replica, do not process
1052
  if (!syncNodeInRaftGroup(pSyncNode, &pMsg->srcId)) {
301,724✔
UNCOV
1053
    syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "not in my config", &pRpcMsg->info.traceId);
×
1054
    code = TSDB_CODE_SYN_NOT_IN_RAFT_GROUP;
×
1055
    TAOS_RETURN(code);
×
1056
  }
1057

1058
  if (pMsg->term < raftStoreGetTerm(pSyncNode)) {
301,724✔
UNCOV
1059
    sRError(pReceiver, "reject snap replication with smaller term. msg term:%" PRId64 ", seq:%d", pMsg->term,
×
1060
            pMsg->seq);
UNCOV
1061
    code = TSDB_CODE_SYN_TERM_NOT_MATCH;
×
UNCOV
1062
    if (syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, code) != 0) sError("failed to send snap rsp");
×
UNCOV
1063
    TAOS_RETURN(code);
×
1064
  }
1065

1066
  if (pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole != TAOS_SYNC_ROLE_LEARNER) {
301,724✔
1067
    if (pMsg->term > raftStoreGetTerm(pSyncNode)) {
140,296✔
UNCOV
1068
      syncNodeStepDown(pSyncNode, pMsg->term, pMsg->srcId, "snapshot");
×
1069
    }
1070
  } else {
1071
    syncNodeUpdateTermWithoutStepDown(pSyncNode, pMsg->term);
161,428✔
1072
  }
1073

1074
  if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER && pSyncNode->state != TAOS_SYNC_STATE_LEARNER) {
301,724✔
UNCOV
1075
    sRError(pReceiver, "snapshot receiver not a follower or learner");
×
UNCOV
1076
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1077
    TAOS_RETURN(code);
×
1078
  }
1079

1080
  if (pMsg->seq < SYNC_SNAPSHOT_SEQ_PREP || pMsg->seq > SYNC_SNAPSHOT_SEQ_END) {
301,724✔
1081
    sRError(pReceiver, "snap replication msg with invalid seq:%d", pMsg->seq);
×
UNCOV
1082
    code = TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG;
×
UNCOV
1083
    TAOS_RETURN(code);
×
1084
  }
1085

1086
  // prepare
1087
  if (pMsg->seq == SYNC_SNAPSHOT_SEQ_PREP) {
301,724✔
1088
    sInfo(
19,724✔
1089
        "vgId:%d, snapshot replication progress:2/8:follower:1/4, start to prepare, recv msg:%s, snap seq:%d, msg "
1090
        "signature:(%" PRId64 ", %" PRId64 ")",
1091
        pSyncNode->vgId, TMSG_INFO(pRpcMsg->msgType), pMsg->seq, pMsg->term, pMsg->snapStartTime);
1092
    code = syncNodeOnSnapshotPrep(pSyncNode, pMsg);
19,724✔
1093
    sDebug(
19,724✔
1094
        "vgId:%d, snapshot replication progress:2/8:follower:1/4, finish to prepare, recv msg:%s, snap seq:%d, msg "
1095
        "signature:(%" PRId64 ", %" PRId64 ")",
1096
        pSyncNode->vgId, TMSG_INFO(pRpcMsg->msgType), pMsg->seq, pMsg->term, pMsg->snapStartTime);
1097
    goto _out;
19,724✔
1098
  }
1099

1100
  // begin
1101
  if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) {
282,000✔
1102
    sInfo("vgId:%d, snapshot replication progress:4/8:follower:2/4, start to begin,replication. msg signature:(%" PRId64
19,724✔
1103
          ", %" PRId64 "), snapshot msg seq:%d",
1104
          pSyncNode->vgId, pMsg->term, pMsg->snapStartTime, pMsg->seq);
1105
    code = syncNodeOnSnapshotBegin(pSyncNode, pMsg);
19,724✔
1106
    sDebug("vgId:%d, snapshot replication progress:4/8:follower:2/4, finish to begin. msg signature:(%" PRId64
19,724✔
1107
           ", %" PRId64 ")",
1108
           pSyncNode->vgId, pMsg->term, pMsg->snapStartTime);
1109
    goto _out;
19,724✔
1110
  }
1111

1112
  // data
1113
  if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
262,276✔
1114
    int64_t currentTimestamp = taosGetTimestampMs()/1000;
242,854✔
1115
    if (currentTimestamp > lastRecvPrintLog) {
242,854✔
1116
      sInfo("vgId:%d, snapshot replication progress:6/8:follower:3/4, start to receive. msg signature:(%" PRId64
22,930✔
1117
            ", %" PRId64 "), snapshot msg seq:%d",
1118
            pSyncNode->vgId, pMsg->term, pMsg->snapStartTime, pMsg->seq);
1119

1120
    } else {
1121
      sDebug("vgId:%d, snapshot replication progress:6/8:follower:3/4, start to receive. msg signature:(%" PRId64
219,924✔
1122
             ", %" PRId64 "), snapshot msg seq:%d",
1123
             pSyncNode->vgId, pMsg->term, pMsg->snapStartTime, pMsg->seq);
1124
    }
1125
    lastRecvPrintLog = currentTimestamp;
242,854✔
1126
    code = syncNodeOnSnapshotReceive(pSyncNode, ppMsg);
242,854✔
1127
    sDebug("vgId:%d, snapshot replication progress:6/8:follower:3/4, finish to receive.", pSyncNode->vgId);
242,854✔
1128
    goto _out;
242,854✔
1129
  }
1130

1131
  // end
1132
  if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
19,422✔
1133
    sInfo("vgId:%d, snapshot replication progress:7/8:follower:4/4, start to end. msg signature:(%" PRId64 ", %" PRId64
19,422✔
1134
          "), snapshot msg seq:%d",
1135
          pSyncNode->vgId, pMsg->term, pMsg->snapStartTime, pMsg->seq);
1136
    code = syncNodeOnSnapshotEnd(pSyncNode, pMsg);
19,422✔
1137
    if (code != 0) {
19,422✔
UNCOV
1138
      sRError(pReceiver, "failed to end snapshot.");
×
UNCOV
1139
      goto _out;
×
1140
    }
1141

1142
    code = syncLogBufferReInit(pSyncNode->pLogBuf, pSyncNode);
19,422✔
1143
    if (code != 0) {
19,422✔
UNCOV
1144
      sRError(pReceiver, "failed to reinit log buffer since %s", tstrerror(code));
×
1145
    }
1146
    sDebug("vgId:%d, snapshot replication progress:7/7:follower:4/4, finish to end. msg signature:(%" PRId64
19,422✔
1147
           ", %" PRId64 ")",
1148
           pSyncNode->vgId, pMsg->term, pMsg->snapStartTime);
1149
    goto _out;
19,422✔
1150
  }
1151

UNCOV
1152
_out:;
×
1153
  syncNodeResetElectTimer(pSyncNode);
301,724✔
1154
  TAOS_RETURN(code);
301,724✔
1155
}
1156

1157
static int32_t syncSnapSenderExchgSnapInfo(SSyncNode *pSyncNode, SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
19,748✔
1158
  if (pMsg->payloadType != TDMT_SYNC_PREP_SNAPSHOT_REPLY) return TSDB_CODE_SYN_INTERNAL_ERROR;
19,748✔
1159

1160
  SSyncTLV *datHead = (void *)pMsg->data;
19,748✔
1161
  if (datHead->typ != pMsg->payloadType) {
19,748✔
1162
    sSError(pSender, "unexpected data type in data of SyncSnapshotRsp. typ: %d", datHead->typ);
×
UNCOV
1163
    TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT);
×
1164
  }
1165
  int32_t dataLen = sizeof(SSyncTLV) + datHead->len;
19,748✔
1166

1167
  SSnapshotParam *pParam = &pSender->snapshotParam;
19,748✔
1168
  void           *data = taosMemoryRealloc(pParam->data, dataLen);
19,748✔
1169
  if (data == NULL) {
19,748✔
1170
    TAOS_RETURN(terrno);
×
1171
  }
1172
  (void)memcpy(data, pMsg->data, dataLen);
19,748✔
1173

1174
  pParam->data = data;
19,748✔
1175
  data = NULL;
19,748✔
1176
  sSInfo(pSender, "data of snapshot param. len: %d", datHead->len);
19,748✔
1177
  return 0;
19,748✔
1178
}
1179

1180
// sender
1181
static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
19,748✔
1182
  int32_t   code = 0;
19,748✔
1183
  SSnapshot snapshot = {0};
19,748✔
1184

1185
  if (pMsg->snapBeginIndex > pSyncNode->commitIndex + 1) {
19,748✔
UNCOV
1186
    sSError(pSender,
×
1187
            "snapshot begin index is greater than commit index. msg snapBeginIndex:%" PRId64
1188
            ", node commitIndex:%" PRId64,
1189
            pMsg->snapBeginIndex, pSyncNode->commitIndex);
UNCOV
1190
    TAOS_RETURN(TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG);
×
1191
  }
1192

1193
  (void)taosThreadMutexLock(&pSender->pSndBuf->mutex);
19,748✔
1194
  TAOS_CHECK_GOTO(pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot), NULL, _out);
19,748✔
1195

1196
  // prepare <begin, end>
1197
  pSender->snapshotParam.start = pMsg->snapBeginIndex;
19,748✔
1198
  pSender->snapshotParam.end = snapshot.lastApplyIndex;
19,748✔
1199

1200
  sSInfo(pSender, "prepare snapshot, recv-begin:%" PRId64 ", snapshot.last:%" PRId64 ", snapshot.term:%" PRId64,
19,748✔
1201
         pMsg->snapBeginIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
1202

1203
  // update sender
1204
  pSender->snapshot = snapshot;
19,748✔
1205

1206
  // start reader
1207
  if (pMsg->payloadType == TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
19,748✔
1208
    TAOS_CHECK_GOTO(syncSnapSenderExchgSnapInfo(pSyncNode, pSender, pMsg), NULL, _out);
19,748✔
1209
  }
1210

1211
  code = pSyncNode->pFsm->FpSnapshotStartRead(pSyncNode->pFsm, &pSender->snapshotParam, &pSender->pReader);
19,748✔
1212
  if (code != 0) {
19,748✔
UNCOV
1213
    sSError(pSender, "prepare snapshot failed since %s", tstrerror(code));
×
UNCOV
1214
    goto _out;
×
1215
  }
1216

1217
  // update next index
1218
  syncIndexMgrSetIndex(pSyncNode->pNextIndex, &pMsg->srcId, snapshot.lastApplyIndex + 1);
19,748✔
1219

1220
  code = snapshotSend(pSender);
19,748✔
1221

1222
_out:
19,748✔
1223
  (void)taosThreadMutexUnlock(&pSender->pSndBuf->mutex);
19,748✔
1224
  TAOS_RETURN(code);
19,748✔
1225
}
1226

1227
static int32_t snapshotSenderSignatureCmp(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
560,036✔
1228
  int32_t code = 0;
560,036✔
1229
  if (pSender->term < pMsg->term) return -1;
560,036✔
1230
  if (pSender->term > pMsg->term) return 1;
560,036✔
1231
  if (pSender->senderStartTime < pMsg->startTime) return -2;
560,036✔
1232
  if (pSender->senderStartTime > pMsg->startTime) return 2;
560,036✔
1233
  if (code != 0)
560,036✔
UNCOV
1234
    sSError(pSender, "sender signature failed, result:%d, msg signature:(%" PRId64 ", %" PRId64 ")", code, pMsg->term,
×
1235
            pMsg->startTime);
1236
  return 0;
560,036✔
1237
}
1238

1239
static int32_t syncSnapBufferSend(SSyncSnapshotSender *pSender, SyncSnapshotRsp **ppMsg) {
260,421✔
1240
  int32_t          code = 0;
260,421✔
1241
  SSyncSnapBuffer *pSndBuf = pSender->pSndBuf;
260,421✔
1242
  SyncSnapshotRsp *pMsg = ppMsg[0];
260,421✔
1243

1244
  (void)taosThreadMutexLock(&pSndBuf->mutex);
260,421✔
1245
  if (snapshotSenderSignatureCmp(pSender, pMsg) != 0) {
260,421✔
UNCOV
1246
    code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
UNCOV
1247
    sError("failed to send snapshot data, since %s", tstrerror(code));
×
UNCOV
1248
    goto _out;
×
1249
  }
1250

1251
  if (pSender->pReader == NULL || pSender->finish || !snapshotSenderIsStart(pSender)) {
260,421✔
1252
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
UNCOV
1253
    goto _out;
×
1254
  }
1255

1256
  if (pMsg->ack - pSndBuf->start >= pSndBuf->size) {
260,421✔
UNCOV
1257
    code = TSDB_CODE_SYN_BUFFER_FULL;
×
UNCOV
1258
    goto _out;
×
1259
  }
1260

1261
  if (!(pSndBuf->start <= pSndBuf->cursor + 1 && pSndBuf->cursor < pSndBuf->end)) {
260,421✔
UNCOV
1262
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
UNCOV
1263
    goto _out;
×
1264
  }
1265

1266
  if (pMsg->ack > pSndBuf->cursor && pMsg->ack < pSndBuf->end) {
260,421✔
1267
    SyncSnapBlock *pBlk = pSndBuf->entries[pMsg->ack % pSndBuf->size];
240,673✔
1268
    if (!pBlk) {
240,673✔
UNCOV
1269
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1270
      goto _out;
×
1271
    }
1272
    pBlk->acked = 1;
240,673✔
1273
  }
1274

1275
  for (int64_t ack = pSndBuf->cursor + 1; ack < pSndBuf->end; ++ack) {
501,094✔
1276
    SyncSnapBlock *pBlk = pSndBuf->entries[ack % pSndBuf->size];
461,900✔
1277
    if (pBlk->acked) {
461,900✔
1278
      pSndBuf->cursor = ack;
240,673✔
1279
    } else {
1280
      break;
221,227✔
1281
    }
1282
  }
1283

1284
  for (int64_t ack = pSndBuf->start; ack <= pSndBuf->cursor; ++ack) {
501,094✔
1285
    pSndBuf->entryDeleteCb(pSndBuf->entries[ack % pSndBuf->size]);
240,673✔
1286
    pSndBuf->entries[ack % pSndBuf->size] = NULL;
240,673✔
1287
    pSndBuf->start = ack + 1;
240,673✔
1288
  }
1289

1290
  while (pSender->seq != SYNC_SNAPSHOT_SEQ_END && pSender->seq - pSndBuf->start < tsSnapReplMaxWaitN) {
526,278✔
1291
    if ((code = snapshotSend(pSender)) != 0) {
265,857✔
UNCOV
1292
      goto _out;
×
1293
    }
1294
  }
1295

1296
  if (pSender->seq == SYNC_SNAPSHOT_SEQ_END && pSndBuf->end <= pSndBuf->start) {
260,421✔
1297
    if ((code = snapshotSend(pSender)) != 0) {
19,446✔
UNCOV
1298
      goto _out;
×
1299
    }
1300
  }
1301
_out:
260,421✔
1302
  (void)taosThreadMutexUnlock(&pSndBuf->mutex);
260,421✔
1303
  TAOS_RETURN(code);
260,421✔
1304
}
1305

1306
int64_t lastSendPrintLog = 0;
1307

1308
int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
299,615✔
1309
  SyncSnapshotRsp **ppMsg = (SyncSnapshotRsp **)&pRpcMsg->pCont;
299,615✔
1310
  SyncSnapshotRsp  *pMsg = ppMsg[0];
299,615✔
1311
  int32_t           code = 0;
299,615✔
1312

1313
  // if already drop replica, do not process
1314
  if (!syncNodeInRaftGroup(pSyncNode, &pMsg->srcId)) {
299,615✔
UNCOV
1315
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "maybe replica already dropped", &pRpcMsg->info.traceId);
×
1316
    TAOS_RETURN(TSDB_CODE_SYN_NOT_IN_RAFT_GROUP);
×
1317
  }
1318

1319
  // get sender
1320
  SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, &pMsg->srcId);
299,615✔
1321
  if (pSender == NULL) {
299,615✔
UNCOV
1322
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "sender is null", &pRpcMsg->info.traceId);
×
UNCOV
1323
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
1324
  }
1325

1326
  if (!snapshotSenderIsStart(pSender)) {
299,615✔
UNCOV
1327
    sSError(pSender, "snapshot sender stopped. sender startTime:%" PRId64 ", msg startTime:%" PRId64,
×
1328
            pSender->senderStartTime, pMsg->startTime);
UNCOV
1329
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
1330
  }
1331

1332
  // check signature
1333
  int32_t order = 0;
299,615✔
1334
  if ((order = snapshotSenderSignatureCmp(pSender, pMsg)) > 0) {
299,615✔
UNCOV
1335
    sError("failed to check snapshot rsp signature, ignore a stale snap rsp.");
×
UNCOV
1336
    TAOS_RETURN(TSDB_CODE_SYN_MISMATCHED_SIGNATURE);
×
1337
  } else if (order < 0) {
299,615✔
UNCOV
1338
    sError("failed to check snapshot rsp signature, snapshot sender is stale. stop");
×
UNCOV
1339
    code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
1340
    goto _ERROR;
×
1341
  }
1342

1343
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
299,615✔
UNCOV
1344
    sSError(pSender, "snapshot sender not leader");
×
1345
    code = TSDB_CODE_SYN_NOT_LEADER;
×
UNCOV
1346
    goto _ERROR;
×
1347
  }
1348

1349
  SyncTerm currentTerm = raftStoreGetTerm(pSyncNode);
299,615✔
1350
  if (pMsg->term != currentTerm) {
299,615✔
UNCOV
1351
    sSError(pSender, "snapshot sender term mismatch, msg term:%" PRId64 " currentTerm:%" PRId64, pMsg->term,
×
1352
            currentTerm);
1353
    code = TSDB_CODE_SYN_TERM_NOT_MATCH;
×
1354
    goto _ERROR;
×
1355
  }
1356

1357
  if (pMsg->code != 0) {
299,615✔
1358
    sSError(pSender, "snapshot sender receive error:%s 0x%x and stop sender", tstrerror(pMsg->code), pMsg->code);
×
UNCOV
1359
    code = pMsg->code;
×
UNCOV
1360
    goto _ERROR;
×
1361
  }
1362

1363
  // send begin
1364
  if (pMsg->ack == SYNC_SNAPSHOT_SEQ_PREP) {
299,615✔
1365
    sSInfo(pSender, "snapshot replication progress:3/8:leader:2/4, process prepare rsp, msg:%s, snap ack:%d, ",
19,748✔
1366
           TMSG_INFO(pRpcMsg->msgType), pMsg->ack);
1367
    if ((code = syncNodeOnSnapshotPrepRsp(pSyncNode, pSender, pMsg)) != 0) {
19,748✔
UNCOV
1368
      goto _ERROR;
×
1369
    }
1370
  }
1371

1372
  // send msg of data or end
1373
  if (pMsg->ack >= SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->ack < SYNC_SNAPSHOT_SEQ_END) {
299,615✔
1374
    int64_t currentTimestamp = taosGetTimestampMs()/1000;
260,421✔
1375
    if (currentTimestamp > lastSendPrintLog) {
260,421✔
1376
      sSInfo(pSender, "snapshot replication progress:5/8:leader:3/4, send buffer, msg:%s, snap ack:%d",
22,041✔
1377
             TMSG_INFO(pRpcMsg->msgType), pMsg->ack);
1378
    } else {
1379
      sSDebug(pSender, "snapshot replication progress:5/8:leader:3/4, send buffer, msg:%s, snap ack:%d",
238,380✔
1380
              TMSG_INFO(pRpcMsg->msgType), pMsg->ack);
1381
    }
1382
    lastSendPrintLog = currentTimestamp;
260,421✔
1383
    if ((code = syncSnapBufferSend(pSender, ppMsg)) != 0) {
260,421✔
UNCOV
1384
      sSError(pSender, "failed to replicate snap since %s. seq:%d, pReader:%p, finish:%d", tstrerror(code),
×
1385
              pSender->seq, pSender->pReader, pSender->finish);
1386
      goto _ERROR;
×
1387
    }
1388
  }
1389

1390
  // end
1391
  if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
299,615✔
1392
    sSInfo(pSender, "snapshot replication progress:8/8:leader:4/4, process end rsp");
19,446✔
1393
    snapshotSenderStop(pSender, true);
19,446✔
1394
    TAOS_CHECK_GOTO(syncNodeReplicateReset(pSyncNode, &pMsg->srcId), NULL, _ERROR);
19,446✔
1395
  }
1396

1397
  return 0;
299,615✔
1398

UNCOV
1399
_ERROR:
×
UNCOV
1400
  snapshotSenderStop(pSender, false);
×
UNCOV
1401
  if (syncNodeReplicateReset(pSyncNode, &pMsg->srcId) != 0) sError("failed to reset replicate");
×
1402
  TAOS_RETURN(code);
×
1403
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc