• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4506

15 Jul 2025 12:33AM UTC coverage: 62.026% (-0.7%) from 62.706%
#4506

push

travis-ci

web-flow
docs: update stream docs (#31874)

155391 of 320094 branches covered (48.55%)

Branch coverage included in aggregate %.

240721 of 318525 relevant lines covered (75.57%)

6529048.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.64
/source/libs/sync/src/syncSnapshot.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "syncSnapshot.h"
18
#include "syncIndexMgr.h"
19
#include "syncPipeline.h"
20
#include "syncRaftCfg.h"
21
#include "syncRaftLog.h"
22
#include "syncRaftStore.h"
23
#include "syncReplication.h"
24
#include "syncUtil.h"
25
#include "tglobal.h"
26

27
static SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths);
28

29
static void syncSnapBufferReset(SSyncSnapBuffer *pBuf) {
554,535✔
30
  for (int64_t i = pBuf->start; i < pBuf->end; ++i) {
554,535!
31
    if (pBuf->entryDeleteCb) {
×
32
      pBuf->entryDeleteCb(pBuf->entries[i % pBuf->size]);
×
33
    }
34
    pBuf->entries[i % pBuf->size] = NULL;
×
35
  }
36
  pBuf->start = SYNC_SNAPSHOT_SEQ_BEGIN + 1;
554,535✔
37
  pBuf->end = pBuf->start;
554,535✔
38
  pBuf->cursor = pBuf->start - 1;
554,535✔
39
}
554,535✔
40

41
static void syncSnapBufferDestroy(SSyncSnapBuffer **ppBuf) {
277,211✔
42
  if (ppBuf == NULL || ppBuf[0] == NULL) return;
277,211!
43
  SSyncSnapBuffer *pBuf = ppBuf[0];
277,211✔
44

45
  syncSnapBufferReset(pBuf);
277,211✔
46

47
  (void)taosThreadMutexDestroy(&pBuf->mutex);
277,210✔
48
  taosMemoryFree(ppBuf[0]);
277,210!
49
  ppBuf[0] = NULL;
277,192✔
50
  return;
277,192✔
51
}
52

53
static int32_t syncSnapBufferCreate(SSyncSnapBuffer **ppBuf) {
277,263✔
54
  SSyncSnapBuffer *pBuf = taosMemoryCalloc(1, sizeof(SSyncSnapBuffer));
277,263!
55
  if (pBuf == NULL) {
277,231!
56
    *ppBuf = NULL;
×
57
    TAOS_RETURN(terrno);
×
58
  }
59
  pBuf->size = sizeof(pBuf->entries) / sizeof(void *);
277,231✔
60
  if (pBuf->size != TSDB_SYNC_SNAP_BUFFER_SIZE) return TSDB_CODE_SYN_INTERNAL_ERROR;
277,231!
61
  (void)taosThreadMutexInit(&pBuf->mutex, NULL);
277,231✔
62
  *ppBuf = pBuf;
277,231✔
63
  TAOS_RETURN(0);
277,231✔
64
}
65

66
int32_t snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex, SSyncSnapshotSender **ppSender) {
260,168✔
67
  int32_t code = 0;
260,168✔
68
  *ppSender = NULL;
260,168✔
69
  bool condition = (pSyncNode->pFsm->FpSnapshotStartRead != NULL) && (pSyncNode->pFsm->FpSnapshotStopRead != NULL) &&
520,337!
70
                   (pSyncNode->pFsm->FpSnapshotDoRead != NULL);
260,169✔
71
  if (!condition) {
260,168!
72
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
73
  }
74

75
  SSyncSnapshotSender *pSender = taosMemoryCalloc(1, sizeof(SSyncSnapshotSender));
260,168!
76
  if (pSender == NULL) {
260,159!
77
    TAOS_RETURN(terrno);
×
78
  }
79

80
  pSender->start = false;
260,159✔
81
  pSender->seq = SYNC_SNAPSHOT_SEQ_INVALID;
260,159✔
82
  pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
260,159✔
83
  pSender->pReader = NULL;
260,159✔
84
  pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS;
260,159✔
85
  pSender->pSyncNode = pSyncNode;
260,159✔
86
  pSender->replicaIndex = replicaIndex;
260,159✔
87
  pSender->term = raftStoreGetTerm(pSyncNode);
260,159✔
88
  pSender->startTime = -1;
260,238✔
89
  pSender->finish = false;
260,238✔
90

91
  code = pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &pSender->snapshot);
260,238✔
92
  if (code != 0) {
260,238!
93
    taosMemoryFreeClear(pSender);
×
94
    TAOS_RETURN(code);
×
95
  }
96
  SSyncSnapBuffer *pSndBuf = NULL;
260,238✔
97
  code = syncSnapBufferCreate(&pSndBuf);
260,238✔
98
  if (pSndBuf == NULL) {
260,192!
99
    taosMemoryFreeClear(pSender);
×
100
    TAOS_RETURN(code);
×
101
  }
102
  pSndBuf->entryDeleteCb = syncSnapBlockDestroy;
260,192✔
103
  pSender->pSndBuf = pSndBuf;
260,192✔
104

105
  syncSnapBufferReset(pSender->pSndBuf);
260,192✔
106
  *ppSender = pSender;
260,192✔
107
  TAOS_RETURN(code);
260,192✔
108
}
109

110
void syncSnapBlockDestroy(void *ptr) {
958✔
111
  SyncSnapBlock *pBlk = ptr;
958✔
112
  if (pBlk->pBlock != NULL) {
958✔
113
    taosMemoryFree(pBlk->pBlock);
904!
114
    pBlk->pBlock = NULL;
904✔
115
    pBlk->blockLen = 0;
904✔
116
  }
117
  taosMemoryFree(pBlk);
958!
118
}
958✔
119

120
static int32_t snapshotSenderClearInfoData(SSyncSnapshotSender *pSender) {
260,218✔
121
  if (pSender->snapshotParam.data) {
260,218✔
122
    taosMemoryFree(pSender->snapshotParam.data);
54!
123
    pSender->snapshotParam.data = NULL;
54✔
124
  }
125

126
  if (pSender->snapshot.data) {
260,218!
127
    taosMemoryFree(pSender->snapshot.data);
×
128
    pSender->snapshot.data = NULL;
×
129
  }
130
  return 0;
260,218✔
131
}
132

133
void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
260,183✔
134
  if (pSender == NULL) return;
260,183!
135

136
  // close reader
137
  if (pSender->pReader != NULL) {
260,183!
138
    pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
×
139
    pSender->pReader = NULL;
×
140
  }
141

142
  // free snap buffer
143
  if (pSender->pSndBuf) {
260,183✔
144
    syncSnapBufferDestroy(&pSender->pSndBuf);
260,176✔
145
  }
146

147
  (void)snapshotSenderClearInfoData(pSender);
260,168✔
148

149
  // free sender
150
  taosMemoryFree(pSender);
260,166!
151
}
152

153
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return atomic_load_8(&pSender->start); }
257,638✔
154

155
int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
54✔
156
  int32_t code = 0;
54✔
157

158
  int8_t started = atomic_val_compare_exchange_8(&pSender->start, false, true);
54✔
159
  if (started) return 0;
54!
160

161
  pSender->seq = SYNC_SNAPSHOT_SEQ_PREP;
54✔
162
  pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
54✔
163
  pSender->pReader = NULL;
54✔
164
  pSender->snapshotParam.start = SYNC_INDEX_INVALID;
54✔
165
  pSender->snapshotParam.end = SYNC_INDEX_INVALID;
54✔
166
  pSender->snapshot.data = NULL;
54✔
167
  pSender->snapshotParam.end = SYNC_INDEX_INVALID;
54✔
168
  pSender->snapshot.lastApplyIndex = SYNC_INDEX_INVALID;
54✔
169
  pSender->snapshot.lastApplyTerm = SYNC_TERM_INVALID;
54✔
170
  pSender->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;
54✔
171

172
  (void)memset(&pSender->lastConfig, 0, sizeof(pSender->lastConfig));
54✔
173
  pSender->sendingMS = 0;
54✔
174
  pSender->term = raftStoreGetTerm(pSender->pSyncNode);
54✔
175
  pSender->startTime = taosGetMonoTimestampMs();
54✔
176
  pSender->lastSendTime = taosGetTimestampMs();
54✔
177
  pSender->finish = false;
54✔
178

179
  // Get snapshot info
180
  SSyncNode *pSyncNode = pSender->pSyncNode;
54✔
181
  SSnapshot  snapInfo = {.type = TDMT_SYNC_PREP_SNAPSHOT};
54✔
182
  if ((code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapInfo)) != 0) {
54!
183
    sSError(pSender, "snapshot get info failure since %s", tstrerror(code));
×
184
    goto _out;
×
185
  }
186

187
  void   *pData = snapInfo.data;
54✔
188
  int32_t type = (pData) ? snapInfo.type : 0;
54!
189
  int32_t dataLen = 0;
54✔
190
  if (pData) {
54!
191
    SSyncTLV *datHead = pData;
54✔
192
    if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT) {
54!
193
      sSError(pSender, "unexpected data typ in data of snapshot info. typ: %d", datHead->typ);
×
194
      code = TSDB_CODE_INVALID_DATA_FMT;
×
195
      goto _out;
×
196
    }
197
    dataLen = sizeof(SSyncTLV) + datHead->len;
54✔
198
  }
199

200
  if ((code = syncSnapSendMsg(pSender, pSender->seq, pData, dataLen, type)) != 0) {
54!
201
    goto _out;
×
202
  }
203

204
  SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
54✔
205
  sSInfo(pSender, "snapshot sender start, to dnode:%d.", DID(&destId));
54!
206
_out:
×
207
  if (snapInfo.data) {
54!
208
    taosMemoryFree(snapInfo.data);
54!
209
    snapInfo.data = NULL;
54✔
210
  }
211
  TAOS_RETURN(code);
54✔
212
}
213

214
void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
54✔
215
  sSDebug(pSender, "snapshot sender stop, finish:%d reader:%p", finish, pSender->pReader);
54!
216

217
  // update flag
218
  int8_t stopped = !atomic_val_compare_exchange_8(&pSender->start, true, false);
54✔
219
  if (stopped) return;
54!
220
  (void)taosThreadMutexLock(&pSender->pSndBuf->mutex);
54✔
221
  {
222
    pSender->finish = finish;
54✔
223

224
    // close reader
225
    if (pSender->pReader != NULL) {
54!
226
      pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
54✔
227
      pSender->pReader = NULL;
54✔
228
    }
229

230
    syncSnapBufferReset(pSender->pSndBuf);
54✔
231

232
    (void)snapshotSenderClearInfoData(pSender);
54✔
233

234
    SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
54✔
235
    sSInfo(pSender, "snapshot sender stop, to dnode:%d, finish:%d", DID(&destId), finish);
54!
236
  }
237
  (void)taosThreadMutexUnlock(&pSender->pSndBuf->mutex);
54✔
238
}
239

240
int32_t syncSnapSendMsg(SSyncSnapshotSender *pSender, int32_t seq, void *pBlock, int32_t blockLen, int32_t typ) {
1,066✔
241
  int32_t code = 0;
1,066✔
242
  SRpcMsg rpcMsg = {0};
1,066✔
243

244
  if ((code = syncBuildSnapshotSend(&rpcMsg, blockLen, pSender->pSyncNode->vgId)) != 0) {
1,066!
245
    sSError(pSender, "failed to build snap replication msg since %s", tstrerror(code));
×
246
    goto _OUT;
×
247
  }
248

249
  SyncSnapshotSend *pMsg = rpcMsg.pCont;
1,066✔
250
  pMsg->srcId = pSender->pSyncNode->myRaftId;
1,066✔
251
  pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
1,066✔
252
  pMsg->term = pSender->term;
1,066✔
253
  pMsg->beginIndex = pSender->snapshotParam.start;
1,066✔
254
  pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
1,066✔
255
  pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
1,066✔
256
  pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
1,066✔
257
  pMsg->lastConfig = pSender->lastConfig;
1,066✔
258
  pMsg->startTime = pSender->startTime;
1,066✔
259
  pMsg->seq = seq;
1,066✔
260

261
  if (pBlock != NULL && blockLen > 0) {
1,066!
262
    (void)memcpy(pMsg->data, pBlock, blockLen);
958✔
263
  }
264
  pMsg->payloadType = typ;
1,066✔
265

266
  // send msg
267
  if ((code = syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg)) != 0) {
1,066!
268
    sSError(pSender, "failed to send snap replication msg since %s. seq:%d", tstrerror(code), seq);
×
269
    goto _OUT;
×
270
  }
271

272
_OUT:
1,066✔
273
  TAOS_RETURN(code);
1,066✔
274
}
275

276
// when sender receive ack, call this function to send msg from seq
277
// seq = ack + 1, already updated
278
static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
1,066✔
279
  int32_t        code = 0;
1,066✔
280
  SyncSnapBlock *pBlk = NULL;
1,066✔
281

282
  if (pSender->seq < SYNC_SNAPSHOT_SEQ_END) {
1,066✔
283
    pSender->seq++;
1,012✔
284

285
    if (pSender->seq > SYNC_SNAPSHOT_SEQ_BEGIN) {
1,012✔
286
      pBlk = taosMemoryCalloc(1, sizeof(SyncSnapBlock));
958!
287
      if (pBlk == NULL) {
958!
288
        code = terrno;
×
289
        goto _OUT;
×
290
      }
291

292
      pBlk->seq = pSender->seq;
958✔
293

294
      // read data
295
      code = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader, &pBlk->pBlock,
958✔
296
                                                        &pBlk->blockLen);
297
      if (code != 0) {
958!
298
        sSError(pSender, "snapshot sender read failed since %s", tstrerror(code));
×
299
        goto _OUT;
×
300
      }
301

302
      if (pBlk->blockLen > 0) {
958✔
303
        // has read data
304
        sSDebug(pSender, "snapshot sender continue to read, blockLen:%d seq:%d", pBlk->blockLen, pBlk->seq);
904!
305
      } else {
306
        // read finish, update seq to end
307
        pSender->seq = SYNC_SNAPSHOT_SEQ_END;
54✔
308
        sSInfo(pSender, "snapshot sender read to the end");
54!
309
        goto _OUT;
54✔
310
      }
311
    }
312
  }
313

314
  if (!(pSender->seq >= SYNC_SNAPSHOT_SEQ_BEGIN && pSender->seq <= SYNC_SNAPSHOT_SEQ_END)) {
1,012!
315
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
316
    goto _OUT;
×
317
  }
318

319
  // send msg
320
  int32_t blockLen = (pBlk) ? pBlk->blockLen : 0;
1,012✔
321
  void   *pBlock = (pBlk) ? pBlk->pBlock : NULL;
1,012✔
322
  if ((code = syncSnapSendMsg(pSender, pSender->seq, pBlock, blockLen, 0)) != 0) {
1,012!
323
    goto _OUT;
×
324
  }
325

326
  // put in buffer
327
  int64_t nowMs = taosGetTimestampMs();
1,012✔
328
  if (pBlk) {
1,012✔
329
    if (!(pBlk->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pBlk->seq < SYNC_SNAPSHOT_SEQ_END)) {
904!
330
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
331
      goto _OUT;
×
332
    }
333
    pBlk->sendTimeMs = nowMs;
904✔
334
    pSender->pSndBuf->entries[pSender->seq % pSender->pSndBuf->size] = pBlk;
904✔
335
    pBlk = NULL;
904✔
336
    pSender->pSndBuf->end = TMAX(pSender->seq + 1, pSender->pSndBuf->end);
904✔
337
  }
338
  pSender->lastSendTime = nowMs;
1,012✔
339

340
_OUT:;
1,066✔
341
  if (pBlk != NULL) {
1,066✔
342
    syncSnapBlockDestroy(pBlk);
54✔
343
    pBlk = NULL;
54✔
344
  }
345
  TAOS_RETURN(code);
1,066✔
346
}
347

348
// send snapshot data from cache
349
int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
×
350
  SSyncSnapBuffer *pSndBuf = pSender->pSndBuf;
×
351
  int32_t          code = 0;
×
352
  (void)taosThreadMutexLock(&pSndBuf->mutex);
×
353
  if (pSender->pReader == NULL || pSender->finish || !snapshotSenderIsStart(pSender)) {
×
354
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
355
    goto _out;
×
356
  }
357

358
  for (int32_t seq = pSndBuf->cursor + 1; seq < pSndBuf->end; ++seq) {
×
359
    SyncSnapBlock *pBlk = pSndBuf->entries[seq % pSndBuf->size];
×
360
    if (!pBlk) {
×
361
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
362
      goto _out;
×
363
    }
364
    int64_t nowMs = taosGetTimestampMs();
×
365
    if (pBlk->acked || nowMs < pBlk->sendTimeMs + SYNC_SNAP_RESEND_MS) {
×
366
      continue;
×
367
    }
368
    if ((code = syncSnapSendMsg(pSender, pBlk->seq, pBlk->pBlock, pBlk->blockLen, 0)) != 0) {
×
369
      goto _out;
×
370
    }
371
    pBlk->sendTimeMs = nowMs;
×
372
  }
373

374
  if (pSender->seq != SYNC_SNAPSHOT_SEQ_END && pSndBuf->end <= pSndBuf->start) {
×
375
    if ((code = snapshotSend(pSender)) != 0) {
×
376
      goto _out;
×
377
    }
378
  }
379

380
  if (pSender->seq == SYNC_SNAPSHOT_SEQ_END && pSndBuf->end <= pSndBuf->start) {
×
381
    if ((code = syncSnapSendMsg(pSender, pSender->seq, NULL, 0, 0)) != 0) {
×
382
      goto _out;
×
383
    }
384
  }
385
_out:;
×
386
  (void)taosThreadMutexUnlock(&pSndBuf->mutex);
×
387
  TAOS_RETURN(code);
×
388
}
389

390
int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) {
54✔
391
  SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, pDestId);
54✔
392
  if (pSender == NULL) {
54!
393
    sNError(pSyncNode, "snapshot sender start error since get failed");
×
394
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
395
  }
396

397
  if (snapshotSenderIsStart(pSender)) {
54!
398
    sSDebug(pSender, "snapshot sender already start, ignore");
×
399
    return 0;
×
400
  }
401

402
  taosMsleep(1);
54✔
403

404
  int32_t code = snapshotSenderStart(pSender);
54✔
405
  if (code != 0) {
54!
406
    sSError(pSender, "snapshot sender start error since %s", tstrerror(code));
×
407
    TAOS_RETURN(code);
×
408
  }
409

410
  return 0;
54✔
411
}
412

413
// receiver
414
int32_t snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId, SSyncSnapshotReceiver **ppReceiver) {
17,042✔
415
  int32_t code = 0;
17,042✔
416
  *ppReceiver = NULL;
17,042✔
417
  bool condition = (pSyncNode->pFsm->FpSnapshotStartWrite != NULL) && (pSyncNode->pFsm->FpSnapshotStopWrite != NULL) &&
34,084!
418
                   (pSyncNode->pFsm->FpSnapshotDoWrite != NULL);
17,042!
419
  if (!condition) {
17,042!
420
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
421
  }
422

423
  SSyncSnapshotReceiver *pReceiver = taosMemoryCalloc(1, sizeof(SSyncSnapshotReceiver));
17,042!
424
  if (pReceiver == NULL) {
17,041!
425
    TAOS_RETURN(terrno);
×
426
  }
427

428
  pReceiver->start = false;
17,041✔
429
  pReceiver->startTime = 0;
17,041✔
430
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
17,041✔
431
  pReceiver->pWriter = NULL;
17,041✔
432
  code = taosThreadMutexInit(&pReceiver->writerMutex, NULL);
17,041✔
433
  if (code != 0) {
17,041!
434
    taosMemoryFree(pReceiver);
×
435
    pReceiver = NULL;
×
436
    TAOS_RETURN(code);
×
437
  }
438
  pReceiver->pSyncNode = pSyncNode;
17,041✔
439
  pReceiver->fromId = fromId;
17,041✔
440
  pReceiver->term = raftStoreGetTerm(pSyncNode);
17,041✔
441
  pReceiver->snapshot.data = NULL;
17,042✔
442
  pReceiver->snapshot.lastApplyIndex = SYNC_INDEX_INVALID;
17,042✔
443
  pReceiver->snapshot.lastApplyTerm = 0;
17,042✔
444
  pReceiver->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;
17,042✔
445

446
  SSyncSnapBuffer *pRcvBuf = NULL;
17,042✔
447
  code = syncSnapBufferCreate(&pRcvBuf);
17,042✔
448
  if (pRcvBuf == NULL) {
17,042!
449
    int32_t ret = taosThreadMutexDestroy(&pReceiver->writerMutex);
×
450
    if (ret != 0) {
×
451
      sError("failed to destroy mutex since %s", tstrerror(ret));
×
452
    }
453
    taosMemoryFree(pReceiver);
×
454
    pReceiver = NULL;
×
455
    TAOS_RETURN(code);
×
456
  }
457
  pRcvBuf->entryDeleteCb = rpcFreeCont;
17,042✔
458
  pReceiver->pRcvBuf = pRcvBuf;
17,042✔
459

460
  syncSnapBufferReset(pReceiver->pRcvBuf);
17,042✔
461
  *ppReceiver = pReceiver;
17,042✔
462
  TAOS_RETURN(code);
17,042✔
463
}
464

465
static int32_t snapshotReceiverClearInfoData(SSyncSnapshotReceiver *pReceiver) {
17,094✔
466
  if (pReceiver->snapshotParam.data) {
17,094✔
467
    taosMemoryFree(pReceiver->snapshotParam.data);
53!
468
    pReceiver->snapshotParam.data = NULL;
53✔
469
  }
470

471
  if (pReceiver->snapshot.data) {
17,094!
472
    taosMemoryFree(pReceiver->snapshot.data);
×
473
    pReceiver->snapshot.data = NULL;
×
474
  }
475
  return 0;
17,094✔
476
}
477

478
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
17,041✔
479
  if (pReceiver == NULL) return;
17,041!
480

481
  (void)taosThreadMutexLock(&pReceiver->writerMutex);
17,041✔
482
  // close writer
483
  if (pReceiver->pWriter != NULL) {
17,040!
484
    int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
×
485
                                                                   false, &pReceiver->snapshot);
486
    if (code != 0) {
×
487
      sError("vgId:%d, snapshot receiver stop failed while destroy since %s", pReceiver->pSyncNode->vgId,
×
488
             tstrerror(code));
489
    }
490
    pReceiver->pWriter = NULL;
×
491
  }
492
  (void)taosThreadMutexUnlock(&pReceiver->writerMutex);
17,040✔
493

494
  (void)taosThreadMutexDestroy(&pReceiver->writerMutex);
17,041✔
495

496
  // free snap buf
497
  if (pReceiver->pRcvBuf) {
17,040!
498
    syncSnapBufferDestroy(&pReceiver->pRcvBuf);
17,040✔
499
  }
500

501
  (void)snapshotReceiverClearInfoData(pReceiver);
17,041✔
502

503
  // free receiver
504
  taosMemoryFree(pReceiver);
17,040!
505
}
506

507
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) {
48,326✔
508
  return (pReceiver != NULL ? atomic_load_8(&pReceiver->start) : false);
48,326!
509
}
510

511
static int32_t snapshotReceiverSignatureCmp(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
957✔
512
  int32_t code = 0;
957✔
513
  if (pReceiver->term < pMsg->term) code = -1;
957!
514
  if (pReceiver->term > pMsg->term) code = 1;
957!
515
  if (pReceiver->startTime < pMsg->startTime) code = -2;
957!
516
  if (pReceiver->startTime > pMsg->startTime) code = 2;
957!
517
  if (code != 0)
957!
518
    sRError(pReceiver, "receiver signature failed, result:%d, msg signature:(%" PRId64 ", %" PRId64 ")", code,
×
519
            pMsg->term, pMsg->startTime);
520
  return 0;
957✔
521
}
522

523
static int32_t snapshotReceiverStartWriter(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) {
53✔
524
  if (pReceiver->pWriter != NULL) {
53!
525
    sRError(pReceiver, "vgId:%d, snapshot receiver writer is not null", pReceiver->pSyncNode->vgId);
×
526
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
527
  }
528

529
  // update ack
530
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
53✔
531

532
  // update snapshot
533
  pReceiver->snapshot.lastApplyIndex = pBeginMsg->lastIndex;
53✔
534
  pReceiver->snapshot.lastApplyTerm = pBeginMsg->lastTerm;
53✔
535
  pReceiver->snapshot.lastConfigIndex = pBeginMsg->lastConfigIndex;
53✔
536
  pReceiver->snapshotParam.start = pBeginMsg->beginIndex;
53✔
537
  pReceiver->snapshotParam.end = pBeginMsg->lastIndex;
53✔
538

539
  // start writer
540
  int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, &pReceiver->snapshotParam,
53✔
541
                                                                  &pReceiver->pWriter);
542
  if (code != 0) {
53!
543
    sRError(pReceiver, "snapshot receiver start write failed since %s", tstrerror(code));
×
544
    TAOS_RETURN(code);
×
545
  }
546

547
  // event log
548
  sRInfo(pReceiver, "snapshot receiver start write");
53!
549
  return 0;
53✔
550
}
551

552
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pPreMsg) {
53✔
553
  if (snapshotReceiverIsStart(pReceiver)) {
53!
554
    sRInfo(pReceiver, "snapshot receiver has started");
×
555
    return;
×
556
  }
557

558
  int8_t started = atomic_val_compare_exchange_8(&pReceiver->start, false, true);
53✔
559
  if (started) return;
53!
560

561
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_PREP;
53✔
562
  pReceiver->term = pPreMsg->term;
53✔
563
  pReceiver->fromId = pPreMsg->srcId;
53✔
564
  pReceiver->startTime = pPreMsg->startTime;
53✔
565

566
  pReceiver->snapshotParam.start = syncNodeGetSnapBeginIndex(pReceiver->pSyncNode);
53✔
567
  pReceiver->snapshotParam.end = -1;
53✔
568

569
  sRInfo(pReceiver, "snapshot receiver start, from dnode:%d.", DID(&pReceiver->fromId));
53!
570
}
571

572
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
53✔
573
  sRDebug(pReceiver, "snapshot receiver stop, not apply, writer:%p", pReceiver->pWriter);
53!
574

575
  int8_t stopped = !atomic_val_compare_exchange_8(&pReceiver->start, true, false);
53✔
576
  if (stopped) return;
53!
577

578
  (void)taosThreadMutexLock(&pReceiver->writerMutex);
53✔
579
  {
580
    if (pReceiver->pWriter != NULL) {
53!
581
      int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
×
582
                                                                     false, &pReceiver->snapshot);
583
      if (code != 0) {
×
584
        sRError(pReceiver, "snapshot receiver stop write failed since %s", tstrerror(code));
×
585
      }
586
      pReceiver->pWriter = NULL;
×
587
    } else {
588
      sRInfo(pReceiver, "snapshot receiver stop, writer is null");
53!
589
    }
590
  }
591
  (void)taosThreadMutexUnlock(&pReceiver->writerMutex);
53✔
592

593
  (void)taosThreadMutexLock(&pReceiver->pRcvBuf->mutex);
53✔
594
  {
595
    syncSnapBufferReset(pReceiver->pRcvBuf);
53✔
596

597
    (void)snapshotReceiverClearInfoData(pReceiver);
53✔
598
  }
599
  (void)taosThreadMutexUnlock(&pReceiver->pRcvBuf->mutex);
53✔
600
}
601

602
static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
53✔
603
  int32_t code = 0;
53✔
604
  if (pReceiver->pWriter != NULL) {
53!
605
    // write data
606
    sRInfo(pReceiver, "snapshot receiver write about to finish, blockLen:%d seq:%d", pMsg->dataLen, pMsg->seq);
53!
607
    if (pMsg->dataLen > 0) {
53!
608
      (void)taosThreadMutexLock(&pReceiver->writerMutex);
×
609
      code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, pMsg->data,
×
610
                                                           pMsg->dataLen);
×
611
      (void)taosThreadMutexUnlock(&pReceiver->writerMutex);
×
612
      if (code != 0) {
×
613
        sRError(pReceiver, "failed to finish snapshot receiver write since %s", tstrerror(code));
×
614
        TAOS_RETURN(code);
×
615
      }
616
    }
617

618
    // update commit index
619
    if (pReceiver->snapshot.lastApplyIndex > pReceiver->pSyncNode->commitIndex) {
53!
620
      pReceiver->pSyncNode->commitIndex = pReceiver->snapshot.lastApplyIndex;
53✔
621
    }
622

623
    // maybe update term
624
    if (pReceiver->snapshot.lastApplyTerm > raftStoreGetTerm(pReceiver->pSyncNode)) {
53!
625
      raftStoreSetTerm(pReceiver->pSyncNode, pReceiver->snapshot.lastApplyTerm);
×
626
    }
627

628
    (void)taosThreadMutexLock(&pReceiver->writerMutex);
53✔
629
    if (pReceiver->pWriter != NULL) {
53!
630
      // stop writer, apply data
631
      code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true,
53✔
632
                                                             &pReceiver->snapshot);
633
      if (code != 0) {
53!
634
        sRError(pReceiver, "snapshot receiver apply failed since %s", tstrerror(code));
×
635
        TAOS_RETURN(code);
×
636
      }
637
      pReceiver->pWriter = NULL;
53✔
638
      sRInfo(pReceiver, "snapshot receiver write stopped");
53!
639
    }
640
    (void)taosThreadMutexUnlock(&pReceiver->writerMutex);
53✔
641

642
    // update progress
643
    pReceiver->ack = SYNC_SNAPSHOT_SEQ_END;
53✔
644

645
    // get fsmState
646
    SSnapshot snapshot = {0};
53✔
647
    code = pReceiver->pSyncNode->pFsm->FpGetSnapshotInfo(pReceiver->pSyncNode->pFsm, &snapshot);
53✔
648
    if (code != 0) {
53!
649
      sRError(pReceiver, "snapshot receiver get snapshot info failed since %s", tstrerror(code));
×
650
      TAOS_RETURN(code);
×
651
    }
652
    pReceiver->pSyncNode->fsmState = snapshot.state;
53✔
653

654
    // reset wal
655
    code =
656
        pReceiver->pSyncNode->pLogStore->syncLogRestoreFromSnapshot(pReceiver->pSyncNode->pLogStore, pMsg->lastIndex);
53✔
657
    if (code != 0) {
53!
658
      sRError(pReceiver, "failed to snapshot receiver log restore since %s", tstrerror(code));
×
659
      TAOS_RETURN(code);
×
660
    }
661
    sRInfo(pReceiver, "wal log restored from snapshot");
53!
662
  } else {
663
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
664
    sRError(pReceiver, "snapshot receiver finish error since writer is null");
×
665
    TAOS_RETURN(code);
×
666
  }
667

668
  return 0;
53✔
669
}
670

671
static int32_t snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
851✔
672
  if (pMsg->seq != pReceiver->ack + 1) {
851!
673
    sRError(pReceiver, "snapshot receiver invalid seq, ack:%d seq:%d", pReceiver->ack, pMsg->seq);
×
674
    TAOS_RETURN(TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG);
×
675
  }
676

677
  if (pReceiver->pWriter == NULL) {
851!
678
    sRError(pReceiver, "snapshot receiver failed to write data since writer is null");
×
679
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
680
  }
681

682
  sRDebug(pReceiver, "snapshot receiver continue to write, blockLen:%d seq:%d", pMsg->dataLen, pMsg->seq);
851!
683

684
  if (pMsg->dataLen > 0) {
851!
685
    // apply data block
686
    int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
851✔
687
                                                                 pMsg->data, pMsg->dataLen);
851✔
688
    if (code != 0) {
851!
689
      sRError(pReceiver, "snapshot receiver continue write failed since %s", tstrerror(code));
×
690
      TAOS_RETURN(code);
×
691
    }
692
  }
693

694
  // update progress
695
  pReceiver->ack = pMsg->seq;
851✔
696

697
  // event log
698
  sRDebug(pReceiver, "snapshot receiver continue to write finish");
851!
699
  return 0;
851✔
700
}
701

702
SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths) {
106✔
703
  SyncIndex snapStart = SYNC_INDEX_INVALID;
106✔
704

705
  if (syncNodeIsMnode(ths)) {
106!
706
    snapStart = SYNC_INDEX_BEGIN;
×
707
    sNInfo(ths, "snapshot begin index is %" PRId64 " since its mnode", snapStart);
×
708
  } else {
709
    SSyncLogStoreData *pData = ths->pLogStore->data;
106✔
710
    SWal              *pWal = pData->pWal;
106✔
711

712
    int64_t walCommitVer = walGetCommittedVer(pWal);
106✔
713
    snapStart = TMAX(ths->commitIndex, walCommitVer) + 1;
106✔
714

715
    sNInfo(ths, "snapshot begin index is %" PRId64, snapStart);
106!
716
  }
717

718
  return snapStart;
106✔
719
}
720

721
static int32_t syncSnapReceiverExchgSnapInfo(SSyncNode *pSyncNode, SSyncSnapshotReceiver *pReceiver,
53✔
722
                                             SyncSnapshotSend *pMsg, SSnapshot *pInfo) {
723
  if (pMsg->payloadType != TDMT_SYNC_PREP_SNAPSHOT) return TSDB_CODE_SYN_INTERNAL_ERROR;
53!
724
  int32_t code = 0, lino = 0;
53✔
725

726
  // copy snap info from leader
727
  void *data = taosMemoryCalloc(1, pMsg->dataLen);
53!
728
  if (data == NULL) {
53!
729
    TAOS_CHECK_EXIT(terrno);
×
730
  }
731
  pInfo->data = data;
53✔
732
  data = NULL;
53✔
733
  (void)memcpy(pInfo->data, pMsg->data, pMsg->dataLen);
53✔
734

735
  // exchange snap info
736
  if ((code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, pInfo)) != 0) {
53!
737
    sRError(pReceiver, "failed to get snapshot info. type: %d", pMsg->payloadType);
×
738
    goto _exit;
×
739
  }
740
  SSyncTLV *datHead = pInfo->data;
53✔
741
  if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
53!
742
    sRError(pReceiver, "unexpected data typ in data of snapshot info. typ: %d", datHead->typ);
×
743
    code = TSDB_CODE_INVALID_DATA_FMT;
×
744
    goto _exit;
×
745
  }
746
  int32_t dataLen = sizeof(SSyncTLV) + datHead->len;
53✔
747

748
  // save exchanged snap info
749
  SSnapshotParam *pParam = &pReceiver->snapshotParam;
53✔
750
  data = taosMemoryRealloc(pParam->data, dataLen);
53!
751
  if (data == NULL) {
53!
752
    code = terrno;
×
753
    sError("vgId:%d, failed to realloc memory for snapshot prep due to %s. dataLen:%d", pSyncNode->vgId,
×
754
           tstrerror(code), dataLen);
755
    goto _exit;
×
756
  }
757
  pParam->data = data;
53✔
758
  data = NULL;
53✔
759
  (void)memcpy(pParam->data, pInfo->data, dataLen);
53✔
760

761
_exit:
53✔
762
  TAOS_RETURN(code);
53✔
763
}
764

765
static int32_t syncNodeOnSnapshotPrep(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
53✔
766
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
53✔
767
  int64_t                timeNow = taosGetTimestampMs();
53✔
768
  int32_t                code = 0;
53✔
769

770
  if (snapshotReceiverIsStart(pReceiver)) {
53!
771
    // already start
772
    int32_t order = 0;
×
773
    if ((order = snapshotReceiverSignatureCmp(pReceiver, pMsg)) < 0) {
×
774
      sInfo("failed to prepare snapshot, received a new snapshot preparation. restart receiver.");
×
775
      goto _START_RECEIVER;
×
776
    } else if (order == 0) {
×
777
      sInfo("prepare snapshot, received a duplicate snapshot preparation. send reply.");
×
778
      goto _SEND_REPLY;
×
779
    } else {
780
      // ignore
781
      sError("failed to prepare snapshot, received a stale snapshot preparation. ignore.");
×
782
      code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
783
      goto _SEND_REPLY;
×
784
    }
785
  } else {
786
    // start new
787
    sRInfo(pReceiver, "snapshot receiver not start yet so start new one");
53!
788
    goto _START_RECEIVER;
53✔
789
  }
790

791
_START_RECEIVER:
53✔
792
  if (snapshotReceiverIsStart(pReceiver)) {
53!
793
    sRInfo(pReceiver, "snapshot receiver already start and force stop pre one");
×
794
    snapshotReceiverStop(pReceiver);
×
795
  }
796

797
  snapshotReceiverStart(pReceiver, pMsg);
53✔
798

799
_SEND_REPLY:;
53✔
800

801
  SSnapshot snapInfo = {.type = TDMT_SYNC_PREP_SNAPSHOT_REPLY};
53✔
802
  int32_t   dataLen = 0;
53✔
803
  if (pMsg->payloadType == TDMT_SYNC_PREP_SNAPSHOT) {
53!
804
    if ((code = syncSnapReceiverExchgSnapInfo(pSyncNode, pReceiver, pMsg, &snapInfo)) != 0) {
53!
805
      goto _out;
×
806
    }
807
    SSyncTLV *datHead = snapInfo.data;
53✔
808
    dataLen = sizeof(SSyncTLV) + datHead->len;
53✔
809
  }
810

811
  // send response
812
  int32_t type = (snapInfo.data) ? snapInfo.type : 0;
53!
813
  if ((code = syncSnapSendRsp(pReceiver, pMsg, snapInfo.data, dataLen, type, code)) != 0) {
53!
814
    goto _out;
×
815
  }
816

817
_out:
53✔
818
  if (snapInfo.data) {
53!
819
    taosMemoryFree(snapInfo.data);
53!
820
    snapInfo.data = NULL;
53✔
821
  }
822
  TAOS_RETURN(code);
53✔
823
}
824

825
static int32_t syncNodeOnSnapshotBegin(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
53✔
826
  // condition 1
827
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
53✔
828
  int32_t                code = TSDB_CODE_SYN_INTERNAL_ERROR;
53✔
829

830
  if (!snapshotReceiverIsStart(pReceiver)) {
53!
831
    sRError(pReceiver, "failed to begin snapshot receiver since not started");
×
832
    goto _SEND_REPLY;
×
833
  }
834

835
  if (snapshotReceiverSignatureCmp(pReceiver, pMsg) != 0) {
53!
836
    code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
837
    sError("failed to begin snapshot, since %s", tstrerror(code));
×
838
    goto _SEND_REPLY;
×
839
  }
840

841
  // start writer
842
  if ((code = snapshotReceiverStartWriter(pReceiver, pMsg)) != 0) {
53!
843
    sRError(pReceiver, "failed to start snapshot writer since %s", tstrerror(code));
×
844
    goto _SEND_REPLY;
×
845
  }
846

847
  SyncIndex beginIndex = syncNodeGetSnapBeginIndex(pSyncNode);
53✔
848
  if (pReceiver->snapshotParam.start != beginIndex) {
53!
849
    sRError(pReceiver, "snapshot begin index is changed unexpectedly. sver:%" PRId64 ", beginIndex:%" PRId64,
×
850
            pReceiver->snapshotParam.start, beginIndex);
851
    goto _SEND_REPLY;
×
852
  }
853

854
  code = 0;
53✔
855
_SEND_REPLY:
53✔
856

857
  // send response
858
  TAOS_CHECK_RETURN(syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, code));
53!
859

860
  TAOS_RETURN(code);
53✔
861
}
862

863
int32_t syncSnapSendRsp(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg, void *pBlock, int32_t blockLen,
957✔
864
                        int32_t type, int32_t rspCode) {
865
  int32_t    code = 0;
957✔
866
  SSyncNode *pSyncNode = pReceiver->pSyncNode;
957✔
867
  // build msg
868
  SRpcMsg rpcMsg = {0};
957✔
869
  if ((code = syncBuildSnapshotSendRsp(&rpcMsg, blockLen, pSyncNode->vgId)) != 0) {
957!
870
    sRError(pReceiver, "failed to build snapshot receiver resp since %s", tstrerror(code));
×
871
    TAOS_RETURN(code);
×
872
  }
873

874
  SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
957✔
875
  pRspMsg->srcId = pSyncNode->myRaftId;
957✔
876
  pRspMsg->destId = pMsg->srcId;
957✔
877
  pRspMsg->term = pMsg->term;
957✔
878
  pRspMsg->lastIndex = pMsg->lastIndex;
957✔
879
  pRspMsg->lastTerm = pMsg->lastTerm;
957✔
880
  pRspMsg->startTime = pMsg->startTime;
957✔
881
  pRspMsg->ack = pMsg->seq;
957✔
882
  pRspMsg->code = rspCode;
957✔
883
  pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;
957✔
884
  pRspMsg->payloadType = type;
957✔
885

886
  if (pBlock != NULL && blockLen > 0) {
957!
887
    (void)memcpy(pRspMsg->data, pBlock, blockLen);
53✔
888
  }
889

890
  // send msg
891
  if ((code = syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg)) != 0) {
957!
892
    sRError(pReceiver, "failed to send snapshot receiver resp since %s", tstrerror(code));
×
893
    TAOS_RETURN(code);
×
894
  }
895
  return 0;
957✔
896
}
897

898
static int32_t syncSnapBufferRecv(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend **ppMsg) {
851✔
899
  int32_t           code = 0;
851✔
900
  SSyncSnapBuffer  *pRcvBuf = pReceiver->pRcvBuf;
851✔
901
  SyncSnapshotSend *pMsg = ppMsg[0];
851✔
902

903
  (void)taosThreadMutexLock(&pRcvBuf->mutex);
851✔
904

905
  if (pMsg->seq - pRcvBuf->start >= pRcvBuf->size) {
851!
906
    code = TSDB_CODE_SYN_BUFFER_FULL;
×
907
    goto _out;
×
908
  }
909

910
  if (!(pRcvBuf->start <= pRcvBuf->cursor + 1 && pRcvBuf->cursor < pRcvBuf->end)) return TSDB_CODE_SYN_INTERNAL_ERROR;
851!
911

912
  if (pMsg->seq > pRcvBuf->cursor) {
851!
913
    if (pRcvBuf->entries[pMsg->seq % pRcvBuf->size]) {
851!
914
      pRcvBuf->entryDeleteCb(pRcvBuf->entries[pMsg->seq % pRcvBuf->size]);
×
915
    }
916
    pRcvBuf->entries[pMsg->seq % pRcvBuf->size] = pMsg;
851✔
917
    ppMsg[0] = NULL;
851✔
918
    pRcvBuf->end = TMAX(pMsg->seq + 1, pRcvBuf->end);
851✔
919
  } else if (pMsg->seq < pRcvBuf->start) {
×
920
    code = syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, code);
×
921
    goto _out;
×
922
  }
923

924
  for (int64_t seq = pRcvBuf->cursor + 1; seq < pRcvBuf->end; ++seq) {
1,702✔
925
    if (pRcvBuf->entries[seq % pRcvBuf->size]) {
1,131✔
926
      pRcvBuf->cursor = seq;
851✔
927
    } else {
928
      break;
280✔
929
    }
930
  }
931

932
  for (int64_t seq = pRcvBuf->start; seq <= pRcvBuf->cursor; ++seq) {
1,702✔
933
    if ((code = snapshotReceiverGotData(pReceiver, pRcvBuf->entries[seq % pRcvBuf->size])) != 0) {
851!
934
      if (code >= SYNC_SNAPSHOT_SEQ_INVALID) {
×
935
        code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
936
      }
937
    }
938
    pRcvBuf->start = seq + 1;
851✔
939
    if (syncSnapSendRsp(pReceiver, pRcvBuf->entries[seq % pRcvBuf->size], NULL, 0, 0, code) != 0) {
851!
940
      sError("failed to send snap rsp");
×
941
    }
942
    pRcvBuf->entryDeleteCb(pRcvBuf->entries[seq % pRcvBuf->size]);
851✔
943
    pRcvBuf->entries[seq % pRcvBuf->size] = NULL;
851✔
944
    if (code) goto _out;
851!
945
  }
946

947
_out:
851✔
948
  (void)taosThreadMutexUnlock(&pRcvBuf->mutex);
851✔
949
  TAOS_RETURN(code);
851✔
950
}
951

952
static int32_t syncNodeOnSnapshotReceive(SSyncNode *pSyncNode, SyncSnapshotSend **ppMsg) {
851✔
953
  // condition 4
954
  // transfering
955
  SyncSnapshotSend *pMsg = ppMsg[0];
851✔
956
  if (!pMsg) return TSDB_CODE_SYN_INTERNAL_ERROR;
851!
957
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
851✔
958
  int64_t                timeNow = taosGetTimestampMs();
851✔
959
  int32_t                code = 0;
851✔
960

961
  if (snapshotReceiverSignatureCmp(pReceiver, pMsg) != 0) {
851!
962
    code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
963
    sError("failed to receive snapshot data, since %s", tstrerror(code));
×
964
    return syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, code);
×
965
  }
966

967
  return syncSnapBufferRecv(pReceiver, ppMsg);
851✔
968
}
969

970
static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
53✔
971
  // condition 2
972
  // end, finish FSM
973
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
53✔
974
  int64_t                timeNow = taosGetTimestampMs();
53✔
975
  int32_t                code = 0;
53✔
976

977
  if (snapshotReceiverSignatureCmp(pReceiver, pMsg) != 0) {
53!
978
    code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
979
    sError("failed to end snapshot, since %s", tstrerror(code));
×
980
    goto _SEND_REPLY;
×
981
  }
982

983
  code = snapshotReceiverFinish(pReceiver, pMsg);
53✔
984
  if (code == 0) {
53!
985
    snapshotReceiverStop(pReceiver);
53✔
986
  }
987

988
_SEND_REPLY:;
×
989

990
  // build msg
991
  SRpcMsg rpcMsg = {0};
53✔
992
  if ((code = syncBuildSnapshotSendRsp(&rpcMsg, 0, pSyncNode->vgId)) != 0) {
53!
993
    sRError(pReceiver, "snapshot receiver build rsp failed since %s", tstrerror(code));
×
994
    TAOS_RETURN(code);
×
995
  }
996

997
  SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
53✔
998
  pRspMsg->srcId = pSyncNode->myRaftId;
53✔
999
  pRspMsg->destId = pMsg->srcId;
53✔
1000
  pRspMsg->term = raftStoreGetTerm(pSyncNode);
53✔
1001
  pRspMsg->lastIndex = pMsg->lastIndex;
53✔
1002
  pRspMsg->lastTerm = pMsg->lastTerm;
53✔
1003
  pRspMsg->startTime = pMsg->startTime;
53✔
1004
  pRspMsg->ack = pReceiver->ack;  // receiver maybe already closed
53✔
1005
  pRspMsg->code = code;
53✔
1006
  pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;
53✔
1007

1008
  // send msg
1009
  syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver end", &rpcMsg.info.traceId);
53✔
1010
  if ((code = syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg)) != 0) {
53!
1011
    sRError(pReceiver, "snapshot receiver send rsp failed since %s", tstrerror(code));
×
1012
    TAOS_RETURN(code);
×
1013
  }
1014

1015
  TAOS_RETURN(code);
53✔
1016
}
1017

1018
int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
1,010✔
1019
  SyncSnapshotSend     **ppMsg = (SyncSnapshotSend **)&pRpcMsg->pCont;
1,010✔
1020
  SyncSnapshotSend      *pMsg = ppMsg[0];
1,010✔
1021
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
1,010✔
1022
  int32_t                code = 0;
1,010✔
1023

1024
  // if already drop replica, do not process
1025
  if (!syncNodeInRaftGroup(pSyncNode, &pMsg->srcId)) {
1,010!
1026
    syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "not in my config", &pRpcMsg->info.traceId);
×
1027
    code = TSDB_CODE_SYN_NOT_IN_RAFT_GROUP;
×
1028
    TAOS_RETURN(code);
×
1029
  }
1030

1031
  if (pMsg->term < raftStoreGetTerm(pSyncNode)) {
1,010!
1032
    sRError(pReceiver, "reject snap replication with smaller term. msg term:%" PRId64 ", seq:%d", pMsg->term,
×
1033
            pMsg->seq);
1034
    code = TSDB_CODE_SYN_TERM_NOT_MATCH;
×
1035
    if (syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, code) != 0) sError("failed to send snap rsp");
×
1036
    TAOS_RETURN(code);
×
1037
  }
1038

1039
  if (pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole != TAOS_SYNC_ROLE_LEARNER) {
1,010✔
1040
    if (pMsg->term > raftStoreGetTerm(pSyncNode)) {
482!
1041
      syncNodeStepDown(pSyncNode, pMsg->term, pMsg->srcId);
×
1042
    }
1043
  } else {
1044
    syncNodeUpdateTermWithoutStepDown(pSyncNode, pMsg->term);
528✔
1045
  }
1046

1047
  if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER && pSyncNode->state != TAOS_SYNC_STATE_LEARNER) {
1,010!
1048
    sRError(pReceiver, "snapshot receiver not a follower or learner");
×
1049
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1050
    TAOS_RETURN(code);
×
1051
  }
1052

1053
  if (pMsg->seq < SYNC_SNAPSHOT_SEQ_PREP || pMsg->seq > SYNC_SNAPSHOT_SEQ_END) {
1,010!
1054
    sRError(pReceiver, "snap replication msg with invalid seq:%d", pMsg->seq);
×
1055
    code = TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG;
×
1056
    TAOS_RETURN(code);
×
1057
  }
1058

1059
  // prepare
1060
  if (pMsg->seq == SYNC_SNAPSHOT_SEQ_PREP) {
1,010✔
1061
    sInfo("vgId:%d, prepare snap replication. msg signature:(%" PRId64 ", %" PRId64 ")", pSyncNode->vgId, pMsg->term,
53!
1062
          pMsg->startTime);
1063
    code = syncNodeOnSnapshotPrep(pSyncNode, pMsg);
53✔
1064
    goto _out;
53✔
1065
  }
1066

1067
  // begin
1068
  if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) {
957✔
1069
    sInfo("vgId:%d, begin snap replication. msg signature:(%" PRId64 ", %" PRId64 ")", pSyncNode->vgId, pMsg->term,
53!
1070
          pMsg->startTime);
1071
    code = syncNodeOnSnapshotBegin(pSyncNode, pMsg);
53✔
1072
    goto _out;
53✔
1073
  }
1074

1075
  // data
1076
  if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
904!
1077
    sDebug("vgId:%d, begin snapshot receive. msg signature:(%" PRId64 ", %" PRId64 ")", pSyncNode->vgId, pMsg->term,
851!
1078
          pMsg->startTime);
1079
    code = syncNodeOnSnapshotReceive(pSyncNode, ppMsg);
851✔
1080
    goto _out;
851✔
1081
  }
1082

1083
  // end
1084
  if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
53!
1085
    sInfo("vgId:%d, end snap replication. msg signature:(%" PRId64 ", %" PRId64 ")", pSyncNode->vgId, pMsg->term,
53!
1086
          pMsg->startTime);
1087
    code = syncNodeOnSnapshotEnd(pSyncNode, pMsg);
53✔
1088
    if (code != 0) {
53!
1089
      sRError(pReceiver, "failed to end snapshot.");
×
1090
      goto _out;
×
1091
    }
1092

1093
    code = syncLogBufferReInit(pSyncNode->pLogBuf, pSyncNode);
53✔
1094
    if (code != 0) {
53!
1095
      sRError(pReceiver, "failed to reinit log buffer since %s", tstrerror(code));
×
1096
    }
1097
    goto _out;
53✔
1098
  }
1099

1100
_out:;
×
1101
  syncNodeResetElectTimer(pSyncNode);
1,010✔
1102
  TAOS_RETURN(code);
1,010✔
1103
}
1104

1105
static int32_t syncSnapSenderExchgSnapInfo(SSyncNode *pSyncNode, SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
54✔
1106
  if (pMsg->payloadType != TDMT_SYNC_PREP_SNAPSHOT_REPLY) return TSDB_CODE_SYN_INTERNAL_ERROR;
54!
1107

1108
  SSyncTLV *datHead = (void *)pMsg->data;
54✔
1109
  if (datHead->typ != pMsg->payloadType) {
54!
1110
    sSError(pSender, "unexpected data type in data of SyncSnapshotRsp. typ: %d", datHead->typ);
×
1111
    TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT);
×
1112
  }
1113
  int32_t dataLen = sizeof(SSyncTLV) + datHead->len;
54✔
1114

1115
  SSnapshotParam *pParam = &pSender->snapshotParam;
54✔
1116
  void           *data = taosMemoryRealloc(pParam->data, dataLen);
54!
1117
  if (data == NULL) {
54!
1118
    TAOS_RETURN(terrno);
×
1119
  }
1120
  (void)memcpy(data, pMsg->data, dataLen);
54✔
1121

1122
  pParam->data = data;
54✔
1123
  data = NULL;
54✔
1124
  sSInfo(pSender, "data of snapshot param. len: %d", datHead->len);
54!
1125
  return 0;
54✔
1126
}
1127

1128
// sender
1129
static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
54✔
1130
  int32_t   code = 0;
54✔
1131
  SSnapshot snapshot = {0};
54✔
1132

1133
  if (pMsg->snapBeginIndex > pSyncNode->commitIndex) {
54!
1134
    sSError(pSender,
×
1135
            "snapshot begin index is greater than commit index. snapBeginIndex:%" PRId64 ", commitIndex:%" PRId64,
1136
            pMsg->snapBeginIndex, pSyncNode->commitIndex);
1137
    TAOS_RETURN(TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG);
×
1138
  }
1139

1140
  (void)taosThreadMutexLock(&pSender->pSndBuf->mutex);
54✔
1141
  TAOS_CHECK_GOTO(pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot), NULL, _out);
54!
1142

1143
  // prepare <begin, end>
1144
  pSender->snapshotParam.start = pMsg->snapBeginIndex;
54✔
1145
  pSender->snapshotParam.end = snapshot.lastApplyIndex;
54✔
1146

1147
  sSInfo(pSender, "prepare snapshot, recv-begin:%" PRId64 ", snapshot.last:%" PRId64 ", snapshot.term:%" PRId64,
54!
1148
         pMsg->snapBeginIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
1149

1150
  // update sender
1151
  pSender->snapshot = snapshot;
54✔
1152

1153
  // start reader
1154
  if (pMsg->payloadType == TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
54!
1155
    TAOS_CHECK_GOTO(syncSnapSenderExchgSnapInfo(pSyncNode, pSender, pMsg), NULL, _out);
54!
1156
  }
1157

1158
  code = pSyncNode->pFsm->FpSnapshotStartRead(pSyncNode->pFsm, &pSender->snapshotParam, &pSender->pReader);
54✔
1159
  if (code != 0) {
54!
1160
    sSError(pSender, "prepare snapshot failed since %s", tstrerror(code));
×
1161
    goto _out;
×
1162
  }
1163

1164
  // update next index
1165
  syncIndexMgrSetIndex(pSyncNode->pNextIndex, &pMsg->srcId, snapshot.lastApplyIndex + 1);
54✔
1166

1167
  code = snapshotSend(pSender);
54✔
1168

1169
_out:
54✔
1170
  (void)taosThreadMutexUnlock(&pSender->pSndBuf->mutex);
54✔
1171
  TAOS_RETURN(code);
54✔
1172
}
1173

1174
static int32_t snapshotSenderSignatureCmp(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
2,024✔
1175
  int32_t code = 0;
2,024✔
1176
  if (pSender->term < pMsg->term) return -1;
2,024!
1177
  if (pSender->term > pMsg->term) return 1;
2,024!
1178
  if (pSender->startTime < pMsg->startTime) return -2;
2,024!
1179
  if (pSender->startTime > pMsg->startTime) return 2;
2,024!
1180
  if (code != 0)
2,024!
1181
    sSError(pSender, "sender signature failed, result:%d, msg signature:(%" PRId64 ", %" PRId64 ")", code, pMsg->term,
×
1182
            pMsg->startTime);
1183
  return 0;
2,024✔
1184
}
1185

1186
static int32_t syncSnapBufferSend(SSyncSnapshotSender *pSender, SyncSnapshotRsp **ppMsg) {
958✔
1187
  int32_t          code = 0;
958✔
1188
  SSyncSnapBuffer *pSndBuf = pSender->pSndBuf;
958✔
1189
  SyncSnapshotRsp *pMsg = ppMsg[0];
958✔
1190

1191
  (void)taosThreadMutexLock(&pSndBuf->mutex);
958✔
1192
  if (snapshotSenderSignatureCmp(pSender, pMsg) != 0) {
958!
1193
    code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
1194
    sError("failed to send snapshot data, since %s", tstrerror(code));
×
1195
    goto _out;
×
1196
  }
1197

1198
  if (pSender->pReader == NULL || pSender->finish || !snapshotSenderIsStart(pSender)) {
958!
1199
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1200
    goto _out;
×
1201
  }
1202

1203
  if (pMsg->ack - pSndBuf->start >= pSndBuf->size) {
958!
1204
    code = TSDB_CODE_SYN_BUFFER_FULL;
×
1205
    goto _out;
×
1206
  }
1207

1208
  if (!(pSndBuf->start <= pSndBuf->cursor + 1 && pSndBuf->cursor < pSndBuf->end)) {
958!
1209
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1210
    goto _out;
×
1211
  }
1212

1213
  if (pMsg->ack > pSndBuf->cursor && pMsg->ack < pSndBuf->end) {
958!
1214
    SyncSnapBlock *pBlk = pSndBuf->entries[pMsg->ack % pSndBuf->size];
904✔
1215
    if (!pBlk) {
904!
1216
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1217
      goto _out;
×
1218
    }
1219
    pBlk->acked = 1;
904✔
1220
  }
1221

1222
  for (int64_t ack = pSndBuf->cursor + 1; ack < pSndBuf->end; ++ack) {
1,862✔
1223
    SyncSnapBlock *pBlk = pSndBuf->entries[ack % pSndBuf->size];
1,754✔
1224
    if (pBlk->acked) {
1,754✔
1225
      pSndBuf->cursor = ack;
904✔
1226
    } else {
1227
      break;
850✔
1228
    }
1229
  }
1230

1231
  for (int64_t ack = pSndBuf->start; ack <= pSndBuf->cursor; ++ack) {
1,862✔
1232
    pSndBuf->entryDeleteCb(pSndBuf->entries[ack % pSndBuf->size]);
904✔
1233
    pSndBuf->entries[ack % pSndBuf->size] = NULL;
904✔
1234
    pSndBuf->start = ack + 1;
904✔
1235
  }
1236

1237
  while (pSender->seq != SYNC_SNAPSHOT_SEQ_END && pSender->seq - pSndBuf->start < tsSnapReplMaxWaitN) {
1,916!
1238
    if ((code = snapshotSend(pSender)) != 0) {
958!
1239
      goto _out;
×
1240
    }
1241
  }
1242

1243
  if (pSender->seq == SYNC_SNAPSHOT_SEQ_END && pSndBuf->end <= pSndBuf->start) {
958!
1244
    if ((code = snapshotSend(pSender)) != 0) {
54!
1245
      goto _out;
×
1246
    }
1247
  }
1248
_out:
958✔
1249
  (void)taosThreadMutexUnlock(&pSndBuf->mutex);
958✔
1250
  TAOS_RETURN(code);
958✔
1251
}
1252

1253
int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
1,066✔
1254
  SyncSnapshotRsp **ppMsg = (SyncSnapshotRsp **)&pRpcMsg->pCont;
1,066✔
1255
  SyncSnapshotRsp  *pMsg = ppMsg[0];
1,066✔
1256
  int32_t           code = 0;
1,066✔
1257

1258
  // if already drop replica, do not process
1259
  if (!syncNodeInRaftGroup(pSyncNode, &pMsg->srcId)) {
1,066!
1260
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "maybe replica already dropped", &pRpcMsg->info.traceId);
×
1261
    TAOS_RETURN(TSDB_CODE_SYN_NOT_IN_RAFT_GROUP);
×
1262
  }
1263

1264
  // get sender
1265
  SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, &pMsg->srcId);
1,066✔
1266
  if (pSender == NULL) {
1,066!
1267
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "sender is null", &pRpcMsg->info.traceId);
×
1268
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
1269
  }
1270

1271
  if (!snapshotSenderIsStart(pSender)) {
1,066!
1272
    sSError(pSender, "snapshot sender stopped. sender startTime:%" PRId64 ", msg startTime:%" PRId64,
×
1273
            pSender->startTime, pMsg->startTime);
1274
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
1275
  }
1276

1277
  // check signature
1278
  int32_t order = 0;
1,066✔
1279
  if ((order = snapshotSenderSignatureCmp(pSender, pMsg)) > 0) {
1,066!
1280
    sError("failed to check snapshot rsp signature, ignore a stale snap rsp.");
×
1281
    TAOS_RETURN(TSDB_CODE_SYN_MISMATCHED_SIGNATURE);
×
1282
  } else if (order < 0) {
1,066!
1283
    sError("failed to check snapshot rsp signature, snapshot sender is stale. stop");
×
1284
    code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
1285
    goto _ERROR;
×
1286
  }
1287

1288
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
1,066!
1289
    sSError(pSender, "snapshot sender not leader");
×
1290
    code = TSDB_CODE_SYN_NOT_LEADER;
×
1291
    goto _ERROR;
×
1292
  }
1293

1294
  SyncTerm currentTerm = raftStoreGetTerm(pSyncNode);
1,066✔
1295
  if (pMsg->term != currentTerm) {
1,066!
1296
    sSError(pSender, "snapshot sender term mismatch, msg term:%" PRId64 " currentTerm:%" PRId64, pMsg->term,
×
1297
            currentTerm);
1298
    code = TSDB_CODE_SYN_TERM_NOT_MATCH;
×
1299
    goto _ERROR;
×
1300
  }
1301

1302
  if (pMsg->code != 0) {
1,066!
1303
    sSError(pSender, "snapshot sender receive error:%s 0x%x and stop sender", tstrerror(pMsg->code), pMsg->code);
×
1304
    code = pMsg->code;
×
1305
    goto _ERROR;
×
1306
  }
1307

1308
  // send begin
1309
  if (pMsg->ack == SYNC_SNAPSHOT_SEQ_PREP) {
1,066✔
1310
    sSInfo(pSender, "process prepare rsp");
54!
1311
    if ((code = syncNodeOnSnapshotPrepRsp(pSyncNode, pSender, pMsg)) != 0) {
54!
1312
      goto _ERROR;
×
1313
    }
1314
  }
1315

1316
  // send msg of data or end
1317
  if (pMsg->ack >= SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->ack < SYNC_SNAPSHOT_SEQ_END) {
1,066✔
1318
    if ((code = syncSnapBufferSend(pSender, ppMsg)) != 0) {
958!
1319
      sSError(pSender, "failed to replicate snap since %s. seq:%d, pReader:%p, finish:%d", tstrerror(code),
×
1320
              pSender->seq, pSender->pReader, pSender->finish);
1321
      goto _ERROR;
×
1322
    }
1323
  }
1324

1325
  // end
1326
  if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
1,066✔
1327
    sSInfo(pSender, "process end rsp");
54!
1328
    snapshotSenderStop(pSender, true);
54✔
1329
    TAOS_CHECK_GOTO(syncNodeReplicateReset(pSyncNode, &pMsg->srcId), NULL, _ERROR);
54!
1330
  }
1331

1332
  return 0;
1,066✔
1333

1334
_ERROR:
×
1335
  snapshotSenderStop(pSender, false);
×
1336
  if (syncNodeReplicateReset(pSyncNode, &pMsg->srcId) != 0) sError("failed to reset replicate");
×
1337
  TAOS_RETURN(code);
×
1338
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc