• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4720

08 Sep 2025 08:43AM UTC coverage: 58.139% (-0.6%) from 58.762%
#4720

push

travis-ci

web-flow
Merge pull request #32881 from taosdata/enh/add-new-windows-ci

fix(ci): update workflow reference to use new Windows CI YAML

133181 of 292179 branches covered (45.58%)

Branch coverage included in aggregate %.

201691 of 283811 relevant lines covered (71.07%)

5442780.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.01
/source/libs/sync/src/syncSnapshot.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "syncSnapshot.h"
18
#include "syncIndexMgr.h"
19
#include "syncPipeline.h"
20
#include "syncRaftCfg.h"
21
#include "syncRaftLog.h"
22
#include "syncRaftStore.h"
23
#include "syncReplication.h"
24
#include "syncUtil.h"
25
#include "tglobal.h"
26

27
static SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths);
28

29
static void syncSnapBufferReset(SSyncSnapBuffer *pBuf) {
466,020✔
30
  for (int64_t i = pBuf->start; i < pBuf->end; ++i) {
466,020!
31
    if (pBuf->entryDeleteCb) {
×
32
      pBuf->entryDeleteCb(pBuf->entries[i % pBuf->size]);
×
33
    }
34
    pBuf->entries[i % pBuf->size] = NULL;
×
35
  }
36
  pBuf->start = SYNC_SNAPSHOT_SEQ_BEGIN + 1;
466,020✔
37
  pBuf->end = pBuf->start;
466,020✔
38
  pBuf->cursor = pBuf->start - 1;
466,020✔
39
}
466,020✔
40

41
static void syncSnapBufferDestroy(SSyncSnapBuffer **ppBuf) {
232,932✔
42
  if (ppBuf == NULL || ppBuf[0] == NULL) return;
232,932!
43
  SSyncSnapBuffer *pBuf = ppBuf[0];
232,932✔
44

45
  syncSnapBufferReset(pBuf);
232,932✔
46

47
  (void)taosThreadMutexDestroy(&pBuf->mutex);
232,920✔
48
  taosMemoryFree(ppBuf[0]);
232,909!
49
  ppBuf[0] = NULL;
232,860✔
50
  return;
232,860✔
51
}
52

53
static int32_t syncSnapBufferCreate(SSyncSnapBuffer **ppBuf) {
232,907✔
54
  SSyncSnapBuffer *pBuf = taosMemoryCalloc(1, sizeof(SSyncSnapBuffer));
232,907!
55
  if (pBuf == NULL) {
232,917!
56
    *ppBuf = NULL;
×
57
    TAOS_RETURN(terrno);
×
58
  }
59
  pBuf->size = sizeof(pBuf->entries) / sizeof(void *);
232,917✔
60
  if (pBuf->size != TSDB_SYNC_SNAP_BUFFER_SIZE) return TSDB_CODE_SYN_INTERNAL_ERROR;
232,917!
61
  (void)taosThreadMutexInit(&pBuf->mutex, NULL);
232,917✔
62
  *ppBuf = pBuf;
232,918✔
63
  TAOS_RETURN(0);
232,918✔
64
}
65

66
int32_t snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex, SSyncSnapshotSender **ppSender) {
218,527✔
67
  int32_t code = 0;
218,527✔
68
  *ppSender = NULL;
218,527✔
69
  bool condition = (pSyncNode->pFsm->FpSnapshotStartRead != NULL) && (pSyncNode->pFsm->FpSnapshotStopRead != NULL) &&
437,059!
70
                   (pSyncNode->pFsm->FpSnapshotDoRead != NULL);
218,532!
71
  if (!condition) {
218,527!
72
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
73
  }
74

75
  SSyncSnapshotSender *pSender = taosMemoryCalloc(1, sizeof(SSyncSnapshotSender));
218,527!
76
  if (pSender == NULL) {
218,482!
77
    TAOS_RETURN(terrno);
×
78
  }
79

80
  pSender->start = false;
218,482✔
81
  pSender->seq = SYNC_SNAPSHOT_SEQ_INVALID;
218,482✔
82
  pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
218,482✔
83
  pSender->pReader = NULL;
218,482✔
84
  pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS;
218,482✔
85
  pSender->pSyncNode = pSyncNode;
218,482✔
86
  pSender->replicaIndex = replicaIndex;
218,482✔
87
  pSender->term = raftStoreGetTerm(pSyncNode);
218,482✔
88
  pSender->startTime = -1;
218,601✔
89
  pSender->finish = false;
218,601✔
90

91
  code = pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &pSender->snapshot);
218,601✔
92
  if (code != 0) {
218,586!
93
    taosMemoryFreeClear(pSender);
×
94
    TAOS_RETURN(code);
×
95
  }
96
  SSyncSnapBuffer *pSndBuf = NULL;
218,586✔
97
  code = syncSnapBufferCreate(&pSndBuf);
218,586✔
98
  if (pSndBuf == NULL) {
218,575!
99
    taosMemoryFreeClear(pSender);
×
100
    TAOS_RETURN(code);
×
101
  }
102
  pSndBuf->entryDeleteCb = syncSnapBlockDestroy;
218,575✔
103
  pSender->pSndBuf = pSndBuf;
218,575✔
104

105
  syncSnapBufferReset(pSender->pSndBuf);
218,575✔
106
  *ppSender = pSender;
218,574✔
107
  TAOS_RETURN(code);
218,574✔
108
}
109

110
void syncSnapBlockDestroy(void *ptr) {
1,437✔
111
  SyncSnapBlock *pBlk = ptr;
1,437✔
112
  if (pBlk->pBlock != NULL) {
1,437✔
113
    taosMemoryFree(pBlk->pBlock);
1,343!
114
    pBlk->pBlock = NULL;
1,343✔
115
    pBlk->blockLen = 0;
1,343✔
116
  }
117
  taosMemoryFree(pBlk);
1,437!
118
}
1,437✔
119

120
static int32_t snapshotSenderClearInfoData(SSyncSnapshotSender *pSender) {
218,632✔
121
  if (pSender->snapshotParam.data) {
218,632✔
122
    taosMemoryFree(pSender->snapshotParam.data);
94!
123
    pSender->snapshotParam.data = NULL;
94✔
124
  }
125

126
  if (pSender->snapshot.data) {
218,632!
127
    taosMemoryFree(pSender->snapshot.data);
×
128
    pSender->snapshot.data = NULL;
×
129
  }
130
  return 0;
218,632✔
131
}
132

133
void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
218,598✔
134
  if (pSender == NULL) return;
218,598!
135

136
  // close reader
137
  if (pSender->pReader != NULL) {
218,598!
138
    pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
×
139
    pSender->pReader = NULL;
×
140
  }
141

142
  // free snap buffer
143
  if (pSender->pSndBuf) {
218,598✔
144
    syncSnapBufferDestroy(&pSender->pSndBuf);
218,596✔
145
  }
146

147
  (void)snapshotSenderClearInfoData(pSender);
218,534✔
148

149
  // free sender
150
  taosMemoryFree(pSender);
218,539!
151
}
152

153
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return atomic_load_8(&pSender->start); }
218,169✔
154

155
int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
94✔
156
  int32_t code = 0;
94✔
157

158
  int8_t started = atomic_val_compare_exchange_8(&pSender->start, false, true);
94✔
159
  if (started) return 0;
94!
160

161
  pSender->seq = SYNC_SNAPSHOT_SEQ_PREP;
94✔
162
  pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
94✔
163
  pSender->pReader = NULL;
94✔
164
  pSender->snapshotParam.start = SYNC_INDEX_INVALID;
94✔
165
  pSender->snapshotParam.end = SYNC_INDEX_INVALID;
94✔
166
  pSender->snapshot.data = NULL;
94✔
167
  pSender->snapshotParam.end = SYNC_INDEX_INVALID;
94✔
168
  pSender->snapshot.lastApplyIndex = SYNC_INDEX_INVALID;
94✔
169
  pSender->snapshot.lastApplyTerm = SYNC_TERM_INVALID;
94✔
170
  pSender->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;
94✔
171

172
  (void)memset(&pSender->lastConfig, 0, sizeof(pSender->lastConfig));
94✔
173
  pSender->sendingMS = 0;
94✔
174
  pSender->term = raftStoreGetTerm(pSender->pSyncNode);
94✔
175
  pSender->startTime = taosGetMonoTimestampMs();
94✔
176
  pSender->lastSendTime = taosGetTimestampMs();
94✔
177
  pSender->finish = false;
94✔
178

179
  // Get snapshot info
180
  SSyncNode *pSyncNode = pSender->pSyncNode;
94✔
181
  SSnapshot  snapInfo = {.type = TDMT_SYNC_PREP_SNAPSHOT};
94✔
182
  if ((code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapInfo)) != 0) {
94!
183
    sSError(pSender, "snapshot get info failure since %s", tstrerror(code));
×
184
    goto _out;
×
185
  }
186

187
  void   *pData = snapInfo.data;
94✔
188
  int32_t type = (pData) ? snapInfo.type : 0;
94!
189
  int32_t dataLen = 0;
94✔
190
  if (pData) {
94!
191
    SSyncTLV *datHead = pData;
94✔
192
    if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT) {
94!
193
      sSError(pSender, "unexpected data typ in data of snapshot info. typ: %d", datHead->typ);
×
194
      code = TSDB_CODE_INVALID_DATA_FMT;
×
195
      goto _out;
×
196
    }
197
    dataLen = sizeof(SSyncTLV) + datHead->len;
94✔
198
  }
199

200
  sInfo("vgId:%d, send msg:%s", pSyncNode->vgId, TMSG_INFO(type));
94!
201
  if ((code = syncSnapSendMsg(pSender, pSender->seq, pData, dataLen, type)) != 0) {
94!
202
    goto _out;
×
203
  }
204

205
  SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
94✔
206
  sSInfo(pSender, "snapshot sender start, to dnode:%d.", DID(&destId));
94!
207
_out:
×
208
  if (snapInfo.data) {
94!
209
    taosMemoryFree(snapInfo.data);
94!
210
    snapInfo.data = NULL;
94✔
211
  }
212
  TAOS_RETURN(code);
94✔
213
}
214

215
void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
94✔
216
  sSDebug(pSender, "snapshot sender stop, finish:%d reader:%p", finish, pSender->pReader);
94!
217

218
  // update flag
219
  int8_t stopped = !atomic_val_compare_exchange_8(&pSender->start, true, false);
94✔
220
  if (stopped) return;
94!
221
  (void)taosThreadMutexLock(&pSender->pSndBuf->mutex);
94✔
222
  {
223
    pSender->finish = finish;
94✔
224

225
    // close reader
226
    if (pSender->pReader != NULL) {
94!
227
      pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
94✔
228
      pSender->pReader = NULL;
94✔
229
    }
230

231
    syncSnapBufferReset(pSender->pSndBuf);
94✔
232

233
    (void)snapshotSenderClearInfoData(pSender);
94✔
234

235
    SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
94✔
236
    sSInfo(pSender, "snapshot sender stop, to dnode:%d, finish:%d", DID(&destId), finish);
94!
237
  }
238
  (void)taosThreadMutexUnlock(&pSender->pSndBuf->mutex);
94✔
239
}
240

241
int32_t syncSnapSendMsg(SSyncSnapshotSender *pSender, int32_t seq, void *pBlock, int32_t blockLen, int32_t typ) {
1,625✔
242
  int32_t code = 0;
1,625✔
243
  SRpcMsg rpcMsg = {0};
1,625✔
244

245
  if ((code = syncBuildSnapshotSend(&rpcMsg, blockLen, pSender->pSyncNode->vgId)) != 0) {
1,625!
246
    sSError(pSender, "failed to build snap replication msg since %s", tstrerror(code));
×
247
    goto _OUT;
×
248
  }
249

250
  SyncSnapshotSend *pMsg = rpcMsg.pCont;
1,625✔
251
  pMsg->srcId = pSender->pSyncNode->myRaftId;
1,625✔
252
  pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
1,625✔
253
  pMsg->term = pSender->term;
1,625✔
254
  pMsg->beginIndex = pSender->snapshotParam.start;
1,625✔
255
  pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
1,625✔
256
  pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
1,625✔
257
  pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
1,625✔
258
  pMsg->lastConfig = pSender->lastConfig;
1,625✔
259
  pMsg->startTime = pSender->startTime;
1,625✔
260
  pMsg->seq = seq;
1,625✔
261

262
  if (pBlock != NULL && blockLen > 0) {
1,625!
263
    (void)memcpy(pMsg->data, pBlock, blockLen);
1,437✔
264
  }
265
  pMsg->payloadType = typ;
1,625✔
266

267
  // send msg
268
  if ((code = syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg)) != 0) {
1,625!
269
    sSError(pSender, "failed to send snap replication msg since %s. seq:%d", tstrerror(code), seq);
×
270
    goto _OUT;
×
271
  }
272

273
_OUT:
1,625✔
274
  TAOS_RETURN(code);
1,625✔
275
}
276

277
// when sender receive ack, call this function to send msg from seq
278
// seq = ack + 1, already updated
279
static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
1,625✔
280
  int32_t        code = 0;
1,625✔
281
  SyncSnapBlock *pBlk = NULL;
1,625✔
282

283
  if (pSender->seq < SYNC_SNAPSHOT_SEQ_END) {
1,625✔
284
    pSender->seq++;
1,531✔
285

286
    if (pSender->seq > SYNC_SNAPSHOT_SEQ_BEGIN) {
1,531✔
287
      pBlk = taosMemoryCalloc(1, sizeof(SyncSnapBlock));
1,437!
288
      if (pBlk == NULL) {
1,437!
289
        code = terrno;
×
290
        goto _OUT;
×
291
      }
292

293
      pBlk->seq = pSender->seq;
1,437✔
294

295
      // read data
296
      code = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader, &pBlk->pBlock,
1,437✔
297
                                                        &pBlk->blockLen);
298
      if (code != 0) {
1,437!
299
        sSError(pSender, "snapshot sender read failed since %s", tstrerror(code));
×
300
        goto _OUT;
×
301
      }
302

303
      if (pBlk->blockLen > 0) {
1,437✔
304
        // has read data
305
        sSDebug(pSender, "snapshot sender continue to read, blockLen:%d seq:%d", pBlk->blockLen, pBlk->seq);
1,343!
306
      } else {
307
        // read finish, update seq to end
308
        pSender->seq = SYNC_SNAPSHOT_SEQ_END;
94✔
309
        sSInfo(pSender, "snapshot sender read to the end");
94!
310
        goto _OUT;
94✔
311
      }
312
    }
313
  }
314

315
  if (!(pSender->seq >= SYNC_SNAPSHOT_SEQ_BEGIN && pSender->seq <= SYNC_SNAPSHOT_SEQ_END)) {
1,531!
316
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
317
    goto _OUT;
×
318
  }
319

320
  // send msg
321
  int32_t blockLen = (pBlk) ? pBlk->blockLen : 0;
1,531✔
322
  void   *pBlock = (pBlk) ? pBlk->pBlock : NULL;
1,531✔
323
  if ((code = syncSnapSendMsg(pSender, pSender->seq, pBlock, blockLen, 0)) != 0) {
1,531!
324
    goto _OUT;
×
325
  }
326

327
  // put in buffer
328
  int64_t nowMs = taosGetTimestampMs();
1,531✔
329
  if (pBlk) {
1,531✔
330
    if (!(pBlk->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pBlk->seq < SYNC_SNAPSHOT_SEQ_END)) {
1,343!
331
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
332
      goto _OUT;
×
333
    }
334
    pBlk->sendTimeMs = nowMs;
1,343✔
335
    pSender->pSndBuf->entries[pSender->seq % pSender->pSndBuf->size] = pBlk;
1,343✔
336
    pBlk = NULL;
1,343✔
337
    pSender->pSndBuf->end = TMAX(pSender->seq + 1, pSender->pSndBuf->end);
1,343✔
338
  }
339
  pSender->lastSendTime = nowMs;
1,531✔
340

341
_OUT:;
1,625✔
342
  if (pBlk != NULL) {
1,625✔
343
    syncSnapBlockDestroy(pBlk);
94✔
344
    pBlk = NULL;
94✔
345
  }
346
  TAOS_RETURN(code);
1,625✔
347
}
348

349
// send snapshot data from cache
350
int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
×
351
  SSyncSnapBuffer *pSndBuf = pSender->pSndBuf;
×
352
  int32_t          code = 0;
×
353
  (void)taosThreadMutexLock(&pSndBuf->mutex);
×
354
  if (pSender->pReader == NULL || pSender->finish || !snapshotSenderIsStart(pSender)) {
×
355
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
356
    goto _out;
×
357
  }
358

359
  for (int32_t seq = pSndBuf->cursor + 1; seq < pSndBuf->end; ++seq) {
×
360
    SyncSnapBlock *pBlk = pSndBuf->entries[seq % pSndBuf->size];
×
361
    if (!pBlk) {
×
362
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
363
      goto _out;
×
364
    }
365
    int64_t nowMs = taosGetTimestampMs();
×
366
    if (pBlk->acked || nowMs < pBlk->sendTimeMs + SYNC_SNAP_RESEND_MS) {
×
367
      continue;
×
368
    }
369
    if ((code = syncSnapSendMsg(pSender, pBlk->seq, pBlk->pBlock, pBlk->blockLen, 0)) != 0) {
×
370
      goto _out;
×
371
    }
372
    pBlk->sendTimeMs = nowMs;
×
373
  }
374

375
  if (pSender->seq != SYNC_SNAPSHOT_SEQ_END && pSndBuf->end <= pSndBuf->start) {
×
376
    if ((code = snapshotSend(pSender)) != 0) {
×
377
      goto _out;
×
378
    }
379
  }
380

381
  if (pSender->seq == SYNC_SNAPSHOT_SEQ_END && pSndBuf->end <= pSndBuf->start) {
×
382
    if ((code = syncSnapSendMsg(pSender, pSender->seq, NULL, 0, 0)) != 0) {
×
383
      goto _out;
×
384
    }
385
  }
386
_out:;
×
387
  (void)taosThreadMutexUnlock(&pSndBuf->mutex);
×
388
  TAOS_RETURN(code);
×
389
}
390

391
int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) {
96✔
392
  SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, pDestId);
96✔
393
  if (pSender == NULL) {
96!
394
    sNError(pSyncNode, "snapshot sender start error since get failed");
×
395
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
396
  }
397

398
  if (snapshotSenderIsStart(pSender)) {
96✔
399
    sSDebug(pSender, "snapshot sender already start, ignore");
2!
400
    return 0;
2✔
401
  }
402

403
  taosMsleep(1);
94✔
404

405
  int32_t code = snapshotSenderStart(pSender);
94✔
406
  if (code != 0) {
94!
407
    sSError(pSender, "snapshot sender start error since %s", tstrerror(code));
×
408
    TAOS_RETURN(code);
×
409
  }
410

411
  return 0;
94✔
412
}
413

414
// receiver
415
int32_t snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId, SSyncSnapshotReceiver **ppReceiver) {
14,337✔
416
  int32_t code = 0;
14,337✔
417
  *ppReceiver = NULL;
14,337✔
418
  bool condition = (pSyncNode->pFsm->FpSnapshotStartWrite != NULL) && (pSyncNode->pFsm->FpSnapshotStopWrite != NULL) &&
28,675!
419
                   (pSyncNode->pFsm->FpSnapshotDoWrite != NULL);
14,338!
420
  if (!condition) {
14,337!
421
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
422
  }
423

424
  SSyncSnapshotReceiver *pReceiver = taosMemoryCalloc(1, sizeof(SSyncSnapshotReceiver));
14,337!
425
  if (pReceiver == NULL) {
14,338!
426
    TAOS_RETURN(terrno);
×
427
  }
428

429
  pReceiver->start = false;
14,338✔
430
  pReceiver->startTime = 0;
14,338✔
431
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
14,338✔
432
  pReceiver->pWriter = NULL;
14,338✔
433
  code = taosThreadMutexInit(&pReceiver->writerMutex, NULL);
14,338✔
434
  if (code != 0) {
14,337!
435
    taosMemoryFree(pReceiver);
×
436
    pReceiver = NULL;
×
437
    TAOS_RETURN(code);
×
438
  }
439
  pReceiver->pSyncNode = pSyncNode;
14,337✔
440
  pReceiver->fromId = fromId;
14,337✔
441
  pReceiver->term = raftStoreGetTerm(pSyncNode);
14,337✔
442
  pReceiver->snapshot.data = NULL;
14,338✔
443
  pReceiver->snapshot.lastApplyIndex = SYNC_INDEX_INVALID;
14,338✔
444
  pReceiver->snapshot.lastApplyTerm = 0;
14,338✔
445
  pReceiver->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;
14,338✔
446

447
  SSyncSnapBuffer *pRcvBuf = NULL;
14,338✔
448
  code = syncSnapBufferCreate(&pRcvBuf);
14,338✔
449
  if (pRcvBuf == NULL) {
14,338!
450
    int32_t ret = taosThreadMutexDestroy(&pReceiver->writerMutex);
×
451
    if (ret != 0) {
×
452
      sError("failed to destroy mutex since %s", tstrerror(ret));
×
453
    }
454
    taosMemoryFree(pReceiver);
×
455
    pReceiver = NULL;
×
456
    TAOS_RETURN(code);
×
457
  }
458
  pRcvBuf->entryDeleteCb = rpcFreeCont;
14,338✔
459
  pReceiver->pRcvBuf = pRcvBuf;
14,338✔
460

461
  syncSnapBufferReset(pReceiver->pRcvBuf);
14,338✔
462
  *ppReceiver = pReceiver;
14,338✔
463
  TAOS_RETURN(code);
14,338✔
464
}
465

466
static int32_t snapshotReceiverClearInfoData(SSyncSnapshotReceiver *pReceiver) {
14,432✔
467
  if (pReceiver->snapshotParam.data) {
14,432✔
468
    taosMemoryFree(pReceiver->snapshotParam.data);
94!
469
    pReceiver->snapshotParam.data = NULL;
94✔
470
  }
471

472
  if (pReceiver->snapshot.data) {
14,432!
473
    taosMemoryFree(pReceiver->snapshot.data);
×
474
    pReceiver->snapshot.data = NULL;
×
475
  }
476
  return 0;
14,432✔
477
}
478

479
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
14,338✔
480
  if (pReceiver == NULL) return;
14,338!
481

482
  (void)taosThreadMutexLock(&pReceiver->writerMutex);
14,338✔
483
  // close writer
484
  if (pReceiver->pWriter != NULL) {
14,338!
485
    int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
×
486
                                                                   false, &pReceiver->snapshot);
487
    if (code != 0) {
×
488
      sError("vgId:%d, snapshot receiver stop failed while destroy since %s", pReceiver->pSyncNode->vgId,
×
489
             tstrerror(code));
490
    }
491
    pReceiver->pWriter = NULL;
×
492
  }
493
  (void)taosThreadMutexUnlock(&pReceiver->writerMutex);
14,338✔
494

495
  (void)taosThreadMutexDestroy(&pReceiver->writerMutex);
14,338✔
496

497
  // free snap buf
498
  if (pReceiver->pRcvBuf) {
14,338!
499
    syncSnapBufferDestroy(&pReceiver->pRcvBuf);
14,338✔
500
  }
501

502
  (void)snapshotReceiverClearInfoData(pReceiver);
14,338✔
503

504
  // free receiver
505
  taosMemoryFree(pReceiver);
14,338!
506
}
507

508
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) {
40,953✔
509
  return (pReceiver != NULL ? atomic_load_8(&pReceiver->start) : false);
40,953!
510
}
511

512
static int32_t snapshotReceiverSignatureCmp(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
1,531✔
513
  int32_t code = 0;
1,531✔
514
  if (pReceiver->term < pMsg->term) code = -1;
1,531!
515
  if (pReceiver->term > pMsg->term) code = 1;
1,531!
516
  if (pReceiver->startTime < pMsg->startTime) code = -2;
1,531!
517
  if (pReceiver->startTime > pMsg->startTime) code = 2;
1,531!
518
  if (code != 0)
1,531!
519
    sRError(pReceiver, "receiver signature failed, result:%d, msg signature:(%" PRId64 ", %" PRId64 ")", code,
×
520
            pMsg->term, pMsg->startTime);
521
  return 0;
1,531✔
522
}
523

524
static int32_t snapshotReceiverStartWriter(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) {
94✔
525
  if (pReceiver->pWriter != NULL) {
94!
526
    sRError(pReceiver, "vgId:%d, snapshot receiver writer is not null", pReceiver->pSyncNode->vgId);
×
527
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
528
  }
529

530
  // update ack
531
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
94✔
532

533
  // update snapshot
534
  pReceiver->snapshot.lastApplyIndex = pBeginMsg->lastIndex;
94✔
535
  pReceiver->snapshot.lastApplyTerm = pBeginMsg->lastTerm;
94✔
536
  pReceiver->snapshot.lastConfigIndex = pBeginMsg->lastConfigIndex;
94✔
537
  pReceiver->snapshotParam.start = pBeginMsg->beginIndex;
94✔
538
  pReceiver->snapshotParam.end = pBeginMsg->lastIndex;
94✔
539

540
  // start writer
541
  int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, &pReceiver->snapshotParam,
94✔
542
                                                                  &pReceiver->pWriter);
543
  if (code != 0) {
94!
544
    sRError(pReceiver, "snapshot receiver start write failed since %s", tstrerror(code));
×
545
    TAOS_RETURN(code);
×
546
  }
547

548
  // event log
549
  sRInfo(pReceiver, "snapshot receiver start write");
94!
550
  return 0;
94✔
551
}
552

553
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pPreMsg) {
94✔
554
  if (snapshotReceiverIsStart(pReceiver)) {
94!
555
    sRInfo(pReceiver, "snapshot receiver has started");
×
556
    return;
×
557
  }
558

559
  int8_t started = atomic_val_compare_exchange_8(&pReceiver->start, false, true);
94✔
560
  if (started) return;
94!
561

562
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_PREP;
94✔
563
  pReceiver->term = pPreMsg->term;
94✔
564
  pReceiver->fromId = pPreMsg->srcId;
94✔
565
  pReceiver->startTime = pPreMsg->startTime;
94✔
566

567
  pReceiver->snapshotParam.start = syncNodeGetSnapBeginIndex(pReceiver->pSyncNode);
94✔
568
  pReceiver->snapshotParam.end = -1;
94✔
569

570
  sRInfo(pReceiver, "snapshot receiver start, from dnode:%d.", DID(&pReceiver->fromId));
94!
571
}
572

573
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
94✔
574
  sRDebug(pReceiver, "snapshot receiver stop, not apply, writer:%p", pReceiver->pWriter);
94!
575

576
  int8_t stopped = !atomic_val_compare_exchange_8(&pReceiver->start, true, false);
94✔
577
  if (stopped) return;
94!
578

579
  (void)taosThreadMutexLock(&pReceiver->writerMutex);
94✔
580
  {
581
    if (pReceiver->pWriter != NULL) {
94!
582
      int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
×
583
                                                                     false, &pReceiver->snapshot);
584
      if (code != 0) {
×
585
        sRError(pReceiver, "snapshot receiver stop write failed since %s", tstrerror(code));
×
586
      }
587
      pReceiver->pWriter = NULL;
×
588
    } else {
589
      sRInfo(pReceiver, "snapshot receiver stop, writer is null");
94!
590
    }
591
  }
592
  (void)taosThreadMutexUnlock(&pReceiver->writerMutex);
94✔
593

594
  (void)taosThreadMutexLock(&pReceiver->pRcvBuf->mutex);
94✔
595
  {
596
    syncSnapBufferReset(pReceiver->pRcvBuf);
94✔
597

598
    (void)snapshotReceiverClearInfoData(pReceiver);
94✔
599
  }
600
  (void)taosThreadMutexUnlock(&pReceiver->pRcvBuf->mutex);
94✔
601
}
602

603
static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
94✔
604
  int32_t code = 0;
94✔
605
  if (pReceiver->pWriter != NULL) {
94!
606
    // write data
607
    sRInfo(pReceiver, "snapshot receiver write about to finish, blockLen:%d seq:%d", pMsg->dataLen, pMsg->seq);
94!
608
    if (pMsg->dataLen > 0) {
94!
609
      (void)taosThreadMutexLock(&pReceiver->writerMutex);
×
610
      code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, pMsg->data,
×
611
                                                           pMsg->dataLen);
×
612
      (void)taosThreadMutexUnlock(&pReceiver->writerMutex);
×
613
      if (code != 0) {
×
614
        sRError(pReceiver, "failed to finish snapshot receiver write since %s", tstrerror(code));
×
615
        TAOS_RETURN(code);
×
616
      }
617
    }
618

619
    // update commit index
620
    if (pReceiver->snapshot.lastApplyIndex > pReceiver->pSyncNode->commitIndex) {
94!
621
      pReceiver->pSyncNode->commitIndex = pReceiver->snapshot.lastApplyIndex;
94✔
622
    }
623

624
    // maybe update term
625
    if (pReceiver->snapshot.lastApplyTerm > raftStoreGetTerm(pReceiver->pSyncNode)) {
94!
626
      raftStoreSetTerm(pReceiver->pSyncNode, pReceiver->snapshot.lastApplyTerm);
×
627
    }
628

629
    (void)taosThreadMutexLock(&pReceiver->writerMutex);
94✔
630
    if (pReceiver->pWriter != NULL) {
94!
631
      // stop writer, apply data
632
      code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true,
94✔
633
                                                             &pReceiver->snapshot);
634
      if (code != 0) {
94!
635
        sRError(pReceiver, "snapshot receiver apply failed since %s", tstrerror(code));
×
636
        TAOS_RETURN(code);
×
637
      }
638
      pReceiver->pWriter = NULL;
94✔
639
      sRInfo(pReceiver, "snapshot receiver write stopped");
94!
640
    }
641
    (void)taosThreadMutexUnlock(&pReceiver->writerMutex);
94✔
642

643
    // update progress
644
    pReceiver->ack = SYNC_SNAPSHOT_SEQ_END;
94✔
645

646
    // get fsmState
647
    SSnapshot snapshot = {0};
94✔
648
    code = pReceiver->pSyncNode->pFsm->FpGetSnapshotInfo(pReceiver->pSyncNode->pFsm, &snapshot);
94✔
649
    if (code != 0) {
94!
650
      sRError(pReceiver, "snapshot receiver get snapshot info failed since %s", tstrerror(code));
×
651
      TAOS_RETURN(code);
×
652
    }
653
    pReceiver->pSyncNode->fsmState = snapshot.state;
94✔
654

655
    // reset wal
656
    code =
657
        pReceiver->pSyncNode->pLogStore->syncLogRestoreFromSnapshot(pReceiver->pSyncNode->pLogStore, pMsg->lastIndex);
94✔
658
    if (code != 0) {
94!
659
      sRError(pReceiver, "failed to snapshot receiver log restore since %s", tstrerror(code));
×
660
      TAOS_RETURN(code);
×
661
    }
662
    sRInfo(pReceiver, "wal log restored from snapshot");
94!
663
  } else {
664
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
665
    sRError(pReceiver, "snapshot receiver finish error since writer is null");
×
666
    TAOS_RETURN(code);
×
667
  }
668

669
  return 0;
94✔
670
}
671

672
static int32_t snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
1,343✔
673
  if (pMsg->seq != pReceiver->ack + 1) {
1,343!
674
    sRError(pReceiver, "snapshot receiver invalid seq, ack:%d seq:%d", pReceiver->ack, pMsg->seq);
×
675
    TAOS_RETURN(TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG);
×
676
  }
677

678
  (void)taosThreadMutexLock(&pReceiver->writerMutex);
1,343✔
679

680
  if (pReceiver->pWriter == NULL) {
1,343!
681
    (void)taosThreadMutexUnlock(&pReceiver->writerMutex);
×
682
    sRError(pReceiver, "snapshot receiver failed to write data since writer is null");
×
683
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
684
  }
685

686
  sRDebug(pReceiver, "snapshot receiver continue to write, blockLen:%d seq:%d", pMsg->dataLen, pMsg->seq);
1,343!
687

688
  if (pMsg->dataLen > 0) {
1,343!
689
    // apply data block
690
    int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
1,343✔
691
                                                                 pMsg->data, pMsg->dataLen);
1,343✔
692
    if (code != 0) {
1,343!
693
      (void)taosThreadMutexUnlock(&pReceiver->writerMutex);
×
694
      sRError(pReceiver, "snapshot receiver continue write failed since %s", tstrerror(code));
×
695
      TAOS_RETURN(code);
×
696
    }
697
  }
698

699
  (void)taosThreadMutexUnlock(&pReceiver->writerMutex);
1,343✔
700

701
  // update progress
702
  pReceiver->ack = pMsg->seq;
1,343✔
703

704
  // event log
705
  sRDebug(pReceiver, "snapshot receiver continue to write finish");
1,343!
706
  return 0;
1,343✔
707
}
708

709
SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths) {
188✔
710
  SyncIndex snapStart = SYNC_INDEX_INVALID;
188✔
711

712
  if (syncNodeIsMnode(ths)) {
188!
713
    snapStart = SYNC_INDEX_BEGIN;
×
714
    sNInfo(ths, "snapshot begin index is %" PRId64 " since its mnode", snapStart);
×
715
  } else {
716
    SSyncLogStoreData *pData = ths->pLogStore->data;
188✔
717
    SWal              *pWal = pData->pWal;
188✔
718

719
    int64_t walCommitVer = walGetCommittedVer(pWal);
188✔
720
    snapStart = TMAX(ths->commitIndex, walCommitVer) + 1;
188✔
721

722
    sNInfo(ths, "snapshot begin index is %" PRId64, snapStart);
188!
723
  }
724

725
  return snapStart;
188✔
726
}
727

728
static int32_t syncSnapReceiverExchgSnapInfo(SSyncNode *pSyncNode, SSyncSnapshotReceiver *pReceiver,
94✔
729
                                             SyncSnapshotSend *pMsg, SSnapshot *pInfo) {
730
  if (pMsg->payloadType != TDMT_SYNC_PREP_SNAPSHOT) return TSDB_CODE_SYN_INTERNAL_ERROR;
94!
731
  int32_t code = 0, lino = 0;
94✔
732

733
  // copy snap info from leader
734
  void *data = taosMemoryCalloc(1, pMsg->dataLen);
94!
735
  if (data == NULL) {
94!
736
    TAOS_CHECK_EXIT(terrno);
×
737
  }
738
  pInfo->data = data;
94✔
739
  data = NULL;
94✔
740
  (void)memcpy(pInfo->data, pMsg->data, pMsg->dataLen);
94✔
741

742
  // exchange snap info
743
  if ((code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, pInfo)) != 0) {
94!
744
    sRError(pReceiver, "failed to get snapshot info. type: %d", pMsg->payloadType);
×
745
    goto _exit;
×
746
  }
747
  SSyncTLV *datHead = pInfo->data;
94✔
748
  if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
94!
749
    sRError(pReceiver, "unexpected data typ in data of snapshot info. typ: %d", datHead->typ);
×
750
    code = TSDB_CODE_INVALID_DATA_FMT;
×
751
    goto _exit;
×
752
  }
753
  int32_t dataLen = sizeof(SSyncTLV) + datHead->len;
94✔
754

755
  // save exchanged snap info
756
  SSnapshotParam *pParam = &pReceiver->snapshotParam;
94✔
757
  data = taosMemoryRealloc(pParam->data, dataLen);
94!
758
  if (data == NULL) {
94!
759
    code = terrno;
×
760
    sError("vgId:%d, failed to realloc memory for snapshot prep due to %s. dataLen:%d", pSyncNode->vgId,
×
761
           tstrerror(code), dataLen);
762
    goto _exit;
×
763
  }
764
  pParam->data = data;
94✔
765
  data = NULL;
94✔
766
  (void)memcpy(pParam->data, pInfo->data, dataLen);
94✔
767

768
_exit:
94✔
769
  TAOS_RETURN(code);
94✔
770
}
771

772
static int32_t syncNodeOnSnapshotPrep(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
94✔
773
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
94✔
774
  int64_t                timeNow = taosGetTimestampMs();
94✔
775
  int32_t                code = 0;
94✔
776

777
  if (snapshotReceiverIsStart(pReceiver)) {
94!
778
    // already start
779
    int32_t order = 0;
×
780
    if ((order = snapshotReceiverSignatureCmp(pReceiver, pMsg)) < 0) {
×
781
      sInfo("failed to prepare snapshot, received a new snapshot preparation. restart receiver.");
×
782
      goto _START_RECEIVER;
×
783
    } else if (order == 0) {
×
784
      sInfo("prepare snapshot, received a duplicate snapshot preparation. send reply.");
×
785
      goto _SEND_REPLY;
×
786
    } else {
787
      // ignore
788
      sError("failed to prepare snapshot, received a stale snapshot preparation. ignore.");
×
789
      code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
790
      goto _SEND_REPLY;
×
791
    }
792
  } else {
793
    // start new
794
    sRInfo(pReceiver, "snapshot receiver not start yet so start new one");
94!
795
    goto _START_RECEIVER;
94✔
796
  }
797

798
_START_RECEIVER:
94✔
799
  if (snapshotReceiverIsStart(pReceiver)) {
94!
800
    sRInfo(pReceiver, "snapshot receiver already start and force stop pre one");
×
801
    snapshotReceiverStop(pReceiver);
×
802
  }
803

804
  snapshotReceiverStart(pReceiver, pMsg);
94✔
805

806
_SEND_REPLY:;
94✔
807

808
  SSnapshot snapInfo = {.type = TDMT_SYNC_PREP_SNAPSHOT_REPLY};
94✔
809
  int32_t   dataLen = 0;
94✔
810
  if (pMsg->payloadType == TDMT_SYNC_PREP_SNAPSHOT) {
94!
811
    if ((code = syncSnapReceiverExchgSnapInfo(pSyncNode, pReceiver, pMsg, &snapInfo)) != 0) {
94!
812
      goto _out;
×
813
    }
814
    SSyncTLV *datHead = snapInfo.data;
94✔
815
    dataLen = sizeof(SSyncTLV) + datHead->len;
94✔
816
  }
817

818
  // send response
819
  int32_t type = (snapInfo.data) ? snapInfo.type : 0;
94!
820
  if ((code = syncSnapSendRsp(pReceiver, pMsg, snapInfo.data, dataLen, type, code)) != 0) {
94!
821
    goto _out;
×
822
  }
823

824
_out:
94✔
825
  if (snapInfo.data) {
94!
826
    taosMemoryFree(snapInfo.data);
94!
827
    snapInfo.data = NULL;
94✔
828
  }
829
  TAOS_RETURN(code);
94✔
830
}
831

832
static int32_t syncNodeOnSnapshotBegin(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
94✔
833
  // condition 1
834
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
94✔
835
  int32_t                code = TSDB_CODE_SYN_INTERNAL_ERROR;
94✔
836

837
  if (!snapshotReceiverIsStart(pReceiver)) {
94!
838
    sRError(pReceiver, "failed to begin snapshot receiver since not started");
×
839
    goto _SEND_REPLY;
×
840
  }
841

842
  if (snapshotReceiverSignatureCmp(pReceiver, pMsg) != 0) {
94!
843
    code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
844
    sError("failed to begin snapshot, since %s", tstrerror(code));
×
845
    goto _SEND_REPLY;
×
846
  }
847

848
  // start writer
849
  if ((code = snapshotReceiverStartWriter(pReceiver, pMsg)) != 0) {
94!
850
    sRError(pReceiver, "failed to start snapshot writer since %s", tstrerror(code));
×
851
    goto _SEND_REPLY;
×
852
  }
853

854
  SyncIndex beginIndex = syncNodeGetSnapBeginIndex(pSyncNode);
94✔
855
  if (pReceiver->snapshotParam.start != beginIndex) {
94!
856
    sRError(pReceiver, "snapshot begin index is changed unexpectedly. sver:%" PRId64 ", beginIndex:%" PRId64,
×
857
            pReceiver->snapshotParam.start, beginIndex);
858
    goto _SEND_REPLY;
×
859
  }
860

861
  code = 0;
94✔
862
_SEND_REPLY:
94✔
863

864
  // send response
865
  TAOS_CHECK_RETURN(syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, code));
94!
866

867
  TAOS_RETURN(code);
94✔
868
}
869

870
int32_t syncSnapSendRsp(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg, void *pBlock, int32_t blockLen,
1,531✔
871
                        int32_t type, int32_t rspCode) {
872
  int32_t    code = 0;
1,531✔
873
  SSyncNode *pSyncNode = pReceiver->pSyncNode;
1,531✔
874
  // build msg
875
  SRpcMsg rpcMsg = {0};
1,531✔
876
  if ((code = syncBuildSnapshotSendRsp(&rpcMsg, blockLen, pSyncNode->vgId)) != 0) {
1,531!
877
    sRError(pReceiver, "failed to build snapshot receiver resp since %s", tstrerror(code));
×
878
    TAOS_RETURN(code);
×
879
  }
880

881
  SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
1,531✔
882
  pRspMsg->srcId = pSyncNode->myRaftId;
1,531✔
883
  pRspMsg->destId = pMsg->srcId;
1,531✔
884
  pRspMsg->term = pMsg->term;
1,531✔
885
  pRspMsg->lastIndex = pMsg->lastIndex;
1,531✔
886
  pRspMsg->lastTerm = pMsg->lastTerm;
1,531✔
887
  pRspMsg->startTime = pMsg->startTime;
1,531✔
888
  pRspMsg->ack = pMsg->seq;
1,531✔
889
  pRspMsg->code = rspCode;
1,531✔
890
  pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;
1,531✔
891
  pRspMsg->payloadType = type;
1,531✔
892

893
  if (pBlock != NULL && blockLen > 0) {
1,531!
894
    (void)memcpy(pRspMsg->data, pBlock, blockLen);
94✔
895
  }
896

897
  // send msg
898
  if ((code = syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg)) != 0) {
1,531!
899
    sRError(pReceiver, "failed to send snapshot receiver resp since %s", tstrerror(code));
×
900
    TAOS_RETURN(code);
×
901
  }
902
  return 0;
1,531✔
903
}
904

905
static int32_t syncSnapBufferRecv(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend **ppMsg) {
1,343✔
906
  int32_t           code = 0;
1,343✔
907
  SSyncSnapBuffer  *pRcvBuf = pReceiver->pRcvBuf;
1,343✔
908
  SyncSnapshotSend *pMsg = ppMsg[0];
1,343✔
909

910
  (void)taosThreadMutexLock(&pRcvBuf->mutex);
1,343✔
911

912
  if (pMsg->seq - pRcvBuf->start >= pRcvBuf->size) {
1,343!
913
    code = TSDB_CODE_SYN_BUFFER_FULL;
×
914
    goto _out;
×
915
  }
916

917
  if (!(pRcvBuf->start <= pRcvBuf->cursor + 1 && pRcvBuf->cursor < pRcvBuf->end)) return TSDB_CODE_SYN_INTERNAL_ERROR;
1,343!
918

919
  if (pMsg->seq > pRcvBuf->cursor) {
1,343!
920
    if (pRcvBuf->entries[pMsg->seq % pRcvBuf->size]) {
1,343!
921
      pRcvBuf->entryDeleteCb(pRcvBuf->entries[pMsg->seq % pRcvBuf->size]);
×
922
    }
923
    pRcvBuf->entries[pMsg->seq % pRcvBuf->size] = pMsg;
1,343✔
924
    ppMsg[0] = NULL;
1,343✔
925
    pRcvBuf->end = TMAX(pMsg->seq + 1, pRcvBuf->end);
1,343✔
926
  } else if (pMsg->seq < pRcvBuf->start) {
×
927
    code = syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, code);
×
928
    goto _out;
×
929
  }
930

931
  for (int64_t seq = pRcvBuf->cursor + 1; seq < pRcvBuf->end; ++seq) {
2,686✔
932
    if (pRcvBuf->entries[seq % pRcvBuf->size]) {
1,718✔
933
      pRcvBuf->cursor = seq;
1,343✔
934
    } else {
935
      break;
375✔
936
    }
937
  }
938

939
  for (int64_t seq = pRcvBuf->start; seq <= pRcvBuf->cursor; ++seq) {
2,686✔
940
    if ((code = snapshotReceiverGotData(pReceiver, pRcvBuf->entries[seq % pRcvBuf->size])) != 0) {
1,343!
941
      if (code >= SYNC_SNAPSHOT_SEQ_INVALID) {
×
942
        code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
943
      }
944
    }
945
    pRcvBuf->start = seq + 1;
1,343✔
946
    if (syncSnapSendRsp(pReceiver, pRcvBuf->entries[seq % pRcvBuf->size], NULL, 0, 0, code) != 0) {
1,343!
947
      sError("failed to send snap rsp");
×
948
    }
949
    pRcvBuf->entryDeleteCb(pRcvBuf->entries[seq % pRcvBuf->size]);
1,343✔
950
    pRcvBuf->entries[seq % pRcvBuf->size] = NULL;
1,343✔
951
    if (code) goto _out;
1,343!
952
  }
953

954
_out:
1,343✔
955
  (void)taosThreadMutexUnlock(&pRcvBuf->mutex);
1,343✔
956
  TAOS_RETURN(code);
1,343✔
957
}
958

959
static int32_t syncNodeOnSnapshotReceive(SSyncNode *pSyncNode, SyncSnapshotSend **ppMsg) {
1,343✔
960
  // condition 4
961
  // transfering
962
  SyncSnapshotSend *pMsg = ppMsg[0];
1,343✔
963
  if (!pMsg) return TSDB_CODE_SYN_INTERNAL_ERROR;
1,343!
964
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
1,343✔
965
  int64_t                timeNow = taosGetTimestampMs();
1,343✔
966
  int32_t                code = 0;
1,343✔
967

968
  if (snapshotReceiverSignatureCmp(pReceiver, pMsg) != 0) {
1,343!
969
    code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
970
    sError("failed to receive snapshot data, since %s", tstrerror(code));
×
971
    return syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, code);
×
972
  }
973

974
  return syncSnapBufferRecv(pReceiver, ppMsg);
1,343✔
975
}
976

977
static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
94✔
978
  // condition 2
979
  // end, finish FSM
980
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
94✔
981
  int64_t                timeNow = taosGetTimestampMs();
94✔
982
  int32_t                code = 0;
94✔
983

984
  if (snapshotReceiverSignatureCmp(pReceiver, pMsg) != 0) {
94!
985
    code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
986
    sError("failed to end snapshot, since %s", tstrerror(code));
×
987
    goto _SEND_REPLY;
×
988
  }
989

990
  code = snapshotReceiverFinish(pReceiver, pMsg);
94✔
991
  if (code == 0) {
94!
992
    snapshotReceiverStop(pReceiver);
94✔
993
  }
994

995
_SEND_REPLY:;
×
996

997
  // build msg
998
  SRpcMsg rpcMsg = {0};
94✔
999
  if ((code = syncBuildSnapshotSendRsp(&rpcMsg, 0, pSyncNode->vgId)) != 0) {
94!
1000
    sRError(pReceiver, "snapshot receiver build rsp failed since %s", tstrerror(code));
×
1001
    TAOS_RETURN(code);
×
1002
  }
1003

1004
  SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
94✔
1005
  pRspMsg->srcId = pSyncNode->myRaftId;
94✔
1006
  pRspMsg->destId = pMsg->srcId;
94✔
1007
  pRspMsg->term = raftStoreGetTerm(pSyncNode);
94✔
1008
  pRspMsg->lastIndex = pMsg->lastIndex;
94✔
1009
  pRspMsg->lastTerm = pMsg->lastTerm;
94✔
1010
  pRspMsg->startTime = pMsg->startTime;
94✔
1011
  pRspMsg->ack = pReceiver->ack;  // receiver maybe already closed
94✔
1012
  pRspMsg->code = code;
94✔
1013
  pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;
94✔
1014

1015
  // send msg
1016
  syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver end", &rpcMsg.info.traceId);
94✔
1017
  if ((code = syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg)) != 0) {
94!
1018
    sRError(pReceiver, "snapshot receiver send rsp failed since %s", tstrerror(code));
×
1019
    TAOS_RETURN(code);
×
1020
  }
1021

1022
  TAOS_RETURN(code);
94✔
1023
}
1024

1025
int64_t lastRecvPrintLog = 0;
1026

1027
int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
1,625✔
1028
  SyncSnapshotSend     **ppMsg = (SyncSnapshotSend **)&pRpcMsg->pCont;
1,625✔
1029
  SyncSnapshotSend      *pMsg = ppMsg[0];
1,625✔
1030
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
1,625✔
1031
  int32_t                code = 0;
1,625✔
1032

1033
  // if already drop replica, do not process
1034
  if (!syncNodeInRaftGroup(pSyncNode, &pMsg->srcId)) {
1,625!
1035
    syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "not in my config", &pRpcMsg->info.traceId);
×
1036
    code = TSDB_CODE_SYN_NOT_IN_RAFT_GROUP;
×
1037
    TAOS_RETURN(code);
×
1038
  }
1039

1040
  if (pMsg->term < raftStoreGetTerm(pSyncNode)) {
1,625!
1041
    sRError(pReceiver, "reject snap replication with smaller term. msg term:%" PRId64 ", seq:%d", pMsg->term,
×
1042
            pMsg->seq);
1043
    code = TSDB_CODE_SYN_TERM_NOT_MATCH;
×
1044
    if (syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, code) != 0) sError("failed to send snap rsp");
×
1045
    TAOS_RETURN(code);
×
1046
  }
1047

1048
  if (pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole != TAOS_SYNC_ROLE_LEARNER) {
1,625✔
1049
    if (pMsg->term > raftStoreGetTerm(pSyncNode)) {
772!
1050
      syncNodeStepDown(pSyncNode, pMsg->term, pMsg->srcId);
×
1051
    }
1052
  } else {
1053
    syncNodeUpdateTermWithoutStepDown(pSyncNode, pMsg->term);
853✔
1054
  }
1055

1056
  if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER && pSyncNode->state != TAOS_SYNC_STATE_LEARNER) {
1,625!
1057
    sRError(pReceiver, "snapshot receiver not a follower or learner");
×
1058
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1059
    TAOS_RETURN(code);
×
1060
  }
1061

1062
  if (pMsg->seq < SYNC_SNAPSHOT_SEQ_PREP || pMsg->seq > SYNC_SNAPSHOT_SEQ_END) {
1,625!
1063
    sRError(pReceiver, "snap replication msg with invalid seq:%d", pMsg->seq);
×
1064
    code = TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG;
×
1065
    TAOS_RETURN(code);
×
1066
  }
1067

1068
  // prepare
1069
  if (pMsg->seq == SYNC_SNAPSHOT_SEQ_PREP) {
1,625✔
1070
    sInfo(
94!
1071
        "vgId:%d, snapshot replication progress:2/8:follower:1/4, start to prepare, recv msg:%s, snap seq:%d, msg "
1072
        "signature:(%" PRId64 ", %" PRId64 ")",
1073
        pSyncNode->vgId, TMSG_INFO(pRpcMsg->msgType), pMsg->seq, pMsg->term, pMsg->startTime);
1074
    code = syncNodeOnSnapshotPrep(pSyncNode, pMsg);
94✔
1075
    sDebug(
94!
1076
        "vgId:%d, snapshot replication progress:2/8:follower:1/4, finish to prepare, recv msg:%s, snap seq:%d, msg "
1077
        "signature:(%" PRId64 ", %" PRId64 ")",
1078
        pSyncNode->vgId, TMSG_INFO(pRpcMsg->msgType), pMsg->seq, pMsg->term, pMsg->startTime);
1079
    goto _out;
94✔
1080
  }
1081

1082
  // begin
1083
  if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) {
1,531✔
1084
    sInfo("vgId:%d, snapshot replication progress:4/8:follower:2/4, start to begin,replication. msg signature:(%" PRId64
94!
1085
          ", %" PRId64 "), snapshot msg seq:%d",
1086
          pSyncNode->vgId, pMsg->term, pMsg->startTime, pMsg->seq);
1087
    code = syncNodeOnSnapshotBegin(pSyncNode, pMsg);
94✔
1088
    sDebug("vgId:%d, snapshot replication progress:4/8:follower:2/4, finish to begin. msg signature:(%" PRId64
94!
1089
          ", %" PRId64 ")",
1090
          pSyncNode->vgId, pMsg->term, pMsg->startTime);
1091
    goto _out;
94✔
1092
  }
1093

1094
  // data
1095
  if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
1,437!
1096
    int64_t currentTimestamp = taosGetTimestampMs()/1000;
1,343✔
1097
    if (currentTimestamp > lastRecvPrintLog) {
1,343✔
1098
      sInfo("vgId:%d, snapshot replication progress:6/8:follower:3/4, start to receive. msg signature:(%" PRId64
101!
1099
            ", %" PRId64 "), snapshot msg seq:%d",
1100
            pSyncNode->vgId, pMsg->term, pMsg->startTime, pMsg->seq);
1101

1102
    } else {
1103
      sDebug("vgId:%d, snapshot replication progress:6/8:follower:3/4, start to receive. msg signature:(%" PRId64
1,242!
1104
             ", %" PRId64 "), snapshot msg seq:%d",
1105
             pSyncNode->vgId, pMsg->term, pMsg->startTime, pMsg->seq);
1106
    }
1107
    lastRecvPrintLog = currentTimestamp;
1,343✔
1108
    code = syncNodeOnSnapshotReceive(pSyncNode, ppMsg);
1,343✔
1109
    sDebug("vgId:%d, snapshot replication progress:6/8:follower:3/4, finish to receive.", pSyncNode->vgId);
1,343!
1110
    goto _out;
1,343✔
1111
  }
1112

1113
  // end
1114
  if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
94!
1115
    sInfo("vgId:%d, snapshot replication progress:7/8:follower:4/4, start to end. msg signature:(%" PRId64 ", %" PRId64
94!
1116
          "), snapshot msg seq:%d",
1117
          pSyncNode->vgId, pMsg->term, pMsg->startTime, pMsg->seq);
1118
    code = syncNodeOnSnapshotEnd(pSyncNode, pMsg);
94✔
1119
    if (code != 0) {
94!
1120
      sRError(pReceiver, "failed to end snapshot.");
×
1121
      goto _out;
×
1122
    }
1123

1124
    code = syncLogBufferReInit(pSyncNode->pLogBuf, pSyncNode);
94✔
1125
    if (code != 0) {
94!
1126
      sRError(pReceiver, "failed to reinit log buffer since %s", tstrerror(code));
×
1127
    }
1128
    sDebug("vgId:%d, snapshot replication progress:7/7:follower:4/4, finish to end. msg signature:(%" PRId64 ", %" PRId64
94!
1129
          ")",
1130
          pSyncNode->vgId, pMsg->term, pMsg->startTime);
1131
    goto _out;
94✔
1132
  }
1133

1134
_out:;
×
1135
  syncNodeResetElectTimer(pSyncNode);
1,625✔
1136
  TAOS_RETURN(code);
1,625✔
1137
}
1138

1139
static int32_t syncSnapSenderExchgSnapInfo(SSyncNode *pSyncNode, SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
94✔
1140
  if (pMsg->payloadType != TDMT_SYNC_PREP_SNAPSHOT_REPLY) return TSDB_CODE_SYN_INTERNAL_ERROR;
94!
1141

1142
  SSyncTLV *datHead = (void *)pMsg->data;
94✔
1143
  if (datHead->typ != pMsg->payloadType) {
94!
1144
    sSError(pSender, "unexpected data type in data of SyncSnapshotRsp. typ: %d", datHead->typ);
×
1145
    TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT);
×
1146
  }
1147
  int32_t dataLen = sizeof(SSyncTLV) + datHead->len;
94✔
1148

1149
  SSnapshotParam *pParam = &pSender->snapshotParam;
94✔
1150
  void           *data = taosMemoryRealloc(pParam->data, dataLen);
94!
1151
  if (data == NULL) {
94!
1152
    TAOS_RETURN(terrno);
×
1153
  }
1154
  (void)memcpy(data, pMsg->data, dataLen);
94✔
1155

1156
  pParam->data = data;
94✔
1157
  data = NULL;
94✔
1158
  sSInfo(pSender, "data of snapshot param. len: %d", datHead->len);
94!
1159
  return 0;
94✔
1160
}
1161

1162
// sender
1163
static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
94✔
1164
  int32_t   code = 0;
94✔
1165
  SSnapshot snapshot = {0};
94✔
1166

1167
  if (pMsg->snapBeginIndex > pSyncNode->commitIndex + 1) {
94!
1168
    sSError(pSender,
×
1169
            "snapshot begin index is greater than commit index. msg snapBeginIndex:%" PRId64
1170
            ", node commitIndex:%" PRId64,
1171
            pMsg->snapBeginIndex, pSyncNode->commitIndex);
1172
    TAOS_RETURN(TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG);
×
1173
  }
1174

1175
  (void)taosThreadMutexLock(&pSender->pSndBuf->mutex);
94✔
1176
  TAOS_CHECK_GOTO(pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot), NULL, _out);
94!
1177

1178
  // prepare <begin, end>
1179
  pSender->snapshotParam.start = pMsg->snapBeginIndex;
94✔
1180
  pSender->snapshotParam.end = snapshot.lastApplyIndex;
94✔
1181

1182
  sSInfo(pSender, "prepare snapshot, recv-begin:%" PRId64 ", snapshot.last:%" PRId64 ", snapshot.term:%" PRId64,
94!
1183
         pMsg->snapBeginIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
1184

1185
  // update sender
1186
  pSender->snapshot = snapshot;
94✔
1187

1188
  // start reader
1189
  if (pMsg->payloadType == TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
94!
1190
    TAOS_CHECK_GOTO(syncSnapSenderExchgSnapInfo(pSyncNode, pSender, pMsg), NULL, _out);
94!
1191
  }
1192

1193
  code = pSyncNode->pFsm->FpSnapshotStartRead(pSyncNode->pFsm, &pSender->snapshotParam, &pSender->pReader);
94✔
1194
  if (code != 0) {
94!
1195
    sSError(pSender, "prepare snapshot failed since %s", tstrerror(code));
×
1196
    goto _out;
×
1197
  }
1198

1199
  // update next index
1200
  syncIndexMgrSetIndex(pSyncNode->pNextIndex, &pMsg->srcId, snapshot.lastApplyIndex + 1);
94✔
1201

1202
  code = snapshotSend(pSender);
94✔
1203

1204
_out:
94✔
1205
  (void)taosThreadMutexUnlock(&pSender->pSndBuf->mutex);
94✔
1206
  TAOS_RETURN(code);
94✔
1207
}
1208

1209
static int32_t snapshotSenderSignatureCmp(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
3,062✔
1210
  int32_t code = 0;
3,062✔
1211
  if (pSender->term < pMsg->term) return -1;
3,062!
1212
  if (pSender->term > pMsg->term) return 1;
3,062!
1213
  if (pSender->startTime < pMsg->startTime) return -2;
3,062!
1214
  if (pSender->startTime > pMsg->startTime) return 2;
3,062!
1215
  if (code != 0)
3,062!
1216
    sSError(pSender, "sender signature failed, result:%d, msg signature:(%" PRId64 ", %" PRId64 ")", code, pMsg->term,
×
1217
            pMsg->startTime);
1218
  return 0;
3,062✔
1219
}
1220

1221
static int32_t syncSnapBufferSend(SSyncSnapshotSender *pSender, SyncSnapshotRsp **ppMsg) {
1,437✔
1222
  int32_t          code = 0;
1,437✔
1223
  SSyncSnapBuffer *pSndBuf = pSender->pSndBuf;
1,437✔
1224
  SyncSnapshotRsp *pMsg = ppMsg[0];
1,437✔
1225

1226
  (void)taosThreadMutexLock(&pSndBuf->mutex);
1,437✔
1227
  if (snapshotSenderSignatureCmp(pSender, pMsg) != 0) {
1,437!
1228
    code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
1229
    sError("failed to send snapshot data, since %s", tstrerror(code));
×
1230
    goto _out;
×
1231
  }
1232

1233
  if (pSender->pReader == NULL || pSender->finish || !snapshotSenderIsStart(pSender)) {
1,437!
1234
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1235
    goto _out;
×
1236
  }
1237

1238
  if (pMsg->ack - pSndBuf->start >= pSndBuf->size) {
1,437!
1239
    code = TSDB_CODE_SYN_BUFFER_FULL;
×
1240
    goto _out;
×
1241
  }
1242

1243
  if (!(pSndBuf->start <= pSndBuf->cursor + 1 && pSndBuf->cursor < pSndBuf->end)) {
1,437!
1244
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1245
    goto _out;
×
1246
  }
1247

1248
  if (pMsg->ack > pSndBuf->cursor && pMsg->ack < pSndBuf->end) {
1,437!
1249
    SyncSnapBlock *pBlk = pSndBuf->entries[pMsg->ack % pSndBuf->size];
1,343✔
1250
    if (!pBlk) {
1,343!
1251
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1252
      goto _out;
×
1253
    }
1254
    pBlk->acked = 1;
1,343✔
1255
  }
1256

1257
  for (int64_t ack = pSndBuf->cursor + 1; ack < pSndBuf->end; ++ack) {
2,780✔
1258
    SyncSnapBlock *pBlk = pSndBuf->entries[ack % pSndBuf->size];
2,592✔
1259
    if (pBlk->acked) {
2,592✔
1260
      pSndBuf->cursor = ack;
1,343✔
1261
    } else {
1262
      break;
1,249✔
1263
    }
1264
  }
1265

1266
  for (int64_t ack = pSndBuf->start; ack <= pSndBuf->cursor; ++ack) {
2,780✔
1267
    pSndBuf->entryDeleteCb(pSndBuf->entries[ack % pSndBuf->size]);
1,343✔
1268
    pSndBuf->entries[ack % pSndBuf->size] = NULL;
1,343✔
1269
    pSndBuf->start = ack + 1;
1,343✔
1270
  }
1271

1272
  while (pSender->seq != SYNC_SNAPSHOT_SEQ_END && pSender->seq - pSndBuf->start < tsSnapReplMaxWaitN) {
2,874!
1273
    if ((code = snapshotSend(pSender)) != 0) {
1,437!
1274
      goto _out;
×
1275
    }
1276
  }
1277

1278
  if (pSender->seq == SYNC_SNAPSHOT_SEQ_END && pSndBuf->end <= pSndBuf->start) {
1,437!
1279
    if ((code = snapshotSend(pSender)) != 0) {
94!
1280
      goto _out;
×
1281
    }
1282
  }
1283
_out:
1,437✔
1284
  (void)taosThreadMutexUnlock(&pSndBuf->mutex);
1,437✔
1285
  TAOS_RETURN(code);
1,437✔
1286
}
1287

1288
int64_t lastSendPrintLog = 0;
1289

1290
int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
1,625✔
1291
  SyncSnapshotRsp **ppMsg = (SyncSnapshotRsp **)&pRpcMsg->pCont;
1,625✔
1292
  SyncSnapshotRsp  *pMsg = ppMsg[0];
1,625✔
1293
  int32_t           code = 0;
1,625✔
1294

1295
  // if already drop replica, do not process
1296
  if (!syncNodeInRaftGroup(pSyncNode, &pMsg->srcId)) {
1,625!
1297
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "maybe replica already dropped", &pRpcMsg->info.traceId);
×
1298
    TAOS_RETURN(TSDB_CODE_SYN_NOT_IN_RAFT_GROUP);
×
1299
  }
1300

1301
  // get sender
1302
  SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, &pMsg->srcId);
1,625✔
1303
  if (pSender == NULL) {
1,625!
1304
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "sender is null", &pRpcMsg->info.traceId);
×
1305
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
1306
  }
1307

1308
  if (!snapshotSenderIsStart(pSender)) {
1,625!
1309
    sSError(pSender, "snapshot sender stopped. sender startTime:%" PRId64 ", msg startTime:%" PRId64,
×
1310
            pSender->startTime, pMsg->startTime);
1311
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
1312
  }
1313

1314
  // check signature
1315
  int32_t order = 0;
1,625✔
1316
  if ((order = snapshotSenderSignatureCmp(pSender, pMsg)) > 0) {
1,625!
1317
    sError("failed to check snapshot rsp signature, ignore a stale snap rsp.");
×
1318
    TAOS_RETURN(TSDB_CODE_SYN_MISMATCHED_SIGNATURE);
×
1319
  } else if (order < 0) {
1,625!
1320
    sError("failed to check snapshot rsp signature, snapshot sender is stale. stop");
×
1321
    code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
1322
    goto _ERROR;
×
1323
  }
1324

1325
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
1,625!
1326
    sSError(pSender, "snapshot sender not leader");
×
1327
    code = TSDB_CODE_SYN_NOT_LEADER;
×
1328
    goto _ERROR;
×
1329
  }
1330

1331
  SyncTerm currentTerm = raftStoreGetTerm(pSyncNode);
1,625✔
1332
  if (pMsg->term != currentTerm) {
1,625!
1333
    sSError(pSender, "snapshot sender term mismatch, msg term:%" PRId64 " currentTerm:%" PRId64, pMsg->term,
×
1334
            currentTerm);
1335
    code = TSDB_CODE_SYN_TERM_NOT_MATCH;
×
1336
    goto _ERROR;
×
1337
  }
1338

1339
  if (pMsg->code != 0) {
1,625!
1340
    sSError(pSender, "snapshot sender receive error:%s 0x%x and stop sender", tstrerror(pMsg->code), pMsg->code);
×
1341
    code = pMsg->code;
×
1342
    goto _ERROR;
×
1343
  }
1344

1345
  // send begin
1346
  if (pMsg->ack == SYNC_SNAPSHOT_SEQ_PREP) {
1,625✔
1347
    sSInfo(pSender, "snapshot replication progress:3/8:leader:2/4, process prepare rsp, msg:%s, snap ack:%d, ",
94!
1348
           TMSG_INFO(pRpcMsg->msgType), pMsg->ack);
1349
    if ((code = syncNodeOnSnapshotPrepRsp(pSyncNode, pSender, pMsg)) != 0) {
94!
1350
      goto _ERROR;
×
1351
    }
1352
  }
1353

1354
  // send msg of data or end
1355
  if (pMsg->ack >= SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->ack < SYNC_SNAPSHOT_SEQ_END) {
1,625✔
1356
    int64_t currentTimestamp = taosGetTimestampMs()/1000;
1,437✔
1357
    if (currentTimestamp > lastSendPrintLog) {
1,437✔
1358
      sSInfo(pSender, "snapshot replication progress:5/8:leader:3/4, send buffer, msg:%s, snap ack:%d",
102!
1359
             TMSG_INFO(pRpcMsg->msgType), pMsg->ack);
1360
    } else {
1361
      sSDebug(pSender, "snapshot replication progress:5/8:leader:3/4, send buffer, msg:%s, snap ack:%d",
1,335!
1362
              TMSG_INFO(pRpcMsg->msgType), pMsg->ack);
1363
    }
1364
    lastSendPrintLog = currentTimestamp;
1,437✔
1365
    if ((code = syncSnapBufferSend(pSender, ppMsg)) != 0) {
1,437!
1366
      sSError(pSender, "failed to replicate snap since %s. seq:%d, pReader:%p, finish:%d", tstrerror(code),
×
1367
              pSender->seq, pSender->pReader, pSender->finish);
1368
      goto _ERROR;
×
1369
    }
1370
  }
1371

1372
  // end
1373
  if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
1,625✔
1374
    sSInfo(pSender, "snapshot replication progress:8/8:leader:4/4, process end rsp");
94!
1375
    snapshotSenderStop(pSender, true);
94✔
1376
    TAOS_CHECK_GOTO(syncNodeReplicateReset(pSyncNode, &pMsg->srcId), NULL, _ERROR);
94!
1377
  }
1378

1379
  return 0;
1,625✔
1380

1381
_ERROR:
×
1382
  snapshotSenderStop(pSender, false);
×
1383
  if (syncNodeReplicateReset(pSyncNode, &pMsg->srcId) != 0) sError("failed to reset replicate");
×
1384
  TAOS_RETURN(code);
×
1385
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc