• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3543

29 Nov 2024 02:58AM UTC coverage: 60.842% (+0.02%) from 60.819%
#3543

push

travis-ci

web-flow
Merge pull request #28973 from taosdata/merge/mainto3.0

merge: from main to 3.0

120460 of 253224 branches covered (47.57%)

Branch coverage included in aggregate %.

706 of 908 new or added lines in 18 files covered. (77.75%)

2401 existing lines in 137 files now uncovered.

201633 of 276172 relevant lines covered (73.01%)

19045673.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.24
/source/libs/sync/src/syncSnapshot.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "syncSnapshot.h"
18
#include "syncIndexMgr.h"
19
#include "syncPipeline.h"
20
#include "syncRaftCfg.h"
21
#include "syncRaftLog.h"
22
#include "syncRaftStore.h"
23
#include "syncReplication.h"
24
#include "syncUtil.h"
25
#include "tglobal.h"
26

27
static SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths);
28

29
static void syncSnapBufferReset(SSyncSnapBuffer *pBuf) {
516,560✔
30
  for (int64_t i = pBuf->start; i < pBuf->end; ++i) {
516,560!
31
    if (pBuf->entryDeleteCb) {
×
32
      pBuf->entryDeleteCb(pBuf->entries[i % pBuf->size]);
×
33
    }
34
    pBuf->entries[i % pBuf->size] = NULL;
×
35
  }
36
  pBuf->start = SYNC_SNAPSHOT_SEQ_BEGIN + 1;
516,560✔
37
  pBuf->end = pBuf->start;
516,560✔
38
  pBuf->cursor = pBuf->start - 1;
516,560✔
39
}
516,560✔
40

41
static void syncSnapBufferDestroy(SSyncSnapBuffer **ppBuf) {
258,186✔
42
  if (ppBuf == NULL || ppBuf[0] == NULL) return;
258,186!
43
  SSyncSnapBuffer *pBuf = ppBuf[0];
258,186✔
44

45
  syncSnapBufferReset(pBuf);
258,186✔
46

47
  (void)taosThreadMutexDestroy(&pBuf->mutex);
258,170✔
48
  taosMemoryFree(ppBuf[0]);
258,161✔
49
  ppBuf[0] = NULL;
258,154✔
50
  return;
258,154✔
51
}
52

53
static int32_t syncSnapBufferCreate(SSyncSnapBuffer **ppBuf) {
258,211✔
54
  SSyncSnapBuffer *pBuf = taosMemoryCalloc(1, sizeof(SSyncSnapBuffer));
258,211✔
55
  if (pBuf == NULL) {
258,192!
UNCOV
56
    *ppBuf = NULL;
×
UNCOV
57
    TAOS_RETURN(terrno);
×
58
  }
59
  pBuf->size = sizeof(pBuf->entries) / sizeof(void *);
258,197✔
60
  if (pBuf->size != TSDB_SYNC_SNAP_BUFFER_SIZE) return TSDB_CODE_SYN_INTERNAL_ERROR;
258,197!
61
  (void)taosThreadMutexInit(&pBuf->mutex, NULL);
258,197✔
62
  *ppBuf = pBuf;
258,184✔
63
  TAOS_RETURN(0);
258,184✔
64
}
65

66
int32_t snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex, SSyncSnapshotSender **ppSender) {
242,226✔
67
  int32_t code = 0;
242,226✔
68
  *ppSender = NULL;
242,226✔
69
  bool condition = (pSyncNode->pFsm->FpSnapshotStartRead != NULL) && (pSyncNode->pFsm->FpSnapshotStopRead != NULL) &&
484,459!
70
                   (pSyncNode->pFsm->FpSnapshotDoRead != NULL);
242,233!
71
  if (!condition) {
242,226!
72
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
73
  }
74

75
  SSyncSnapshotSender *pSender = taosMemoryCalloc(1, sizeof(SSyncSnapshotSender));
242,226✔
76
  if (pSender == NULL) {
242,209!
77
    TAOS_RETURN(terrno);
×
78
  }
79

80
  pSender->start = false;
242,209✔
81
  pSender->seq = SYNC_SNAPSHOT_SEQ_INVALID;
242,209✔
82
  pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
242,209✔
83
  pSender->pReader = NULL;
242,209✔
84
  pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS;
242,209✔
85
  pSender->pSyncNode = pSyncNode;
242,209✔
86
  pSender->replicaIndex = replicaIndex;
242,209✔
87
  pSender->term = raftStoreGetTerm(pSyncNode);
242,209✔
88
  pSender->startTime = -1;
242,345✔
89
  pSender->finish = false;
242,345✔
90

91
  code = pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &pSender->snapshot);
242,345✔
92
  if (code != 0) {
242,328!
93
    taosMemoryFreeClear(pSender);
×
94
    TAOS_RETURN(code);
×
95
  }
96
  SSyncSnapBuffer *pSndBuf = NULL;
242,328✔
97
  code = syncSnapBufferCreate(&pSndBuf);
242,328✔
98
  if (pSndBuf == NULL) {
242,274!
99
    taosMemoryFreeClear(pSender);
×
100
    TAOS_RETURN(code);
×
101
  }
102
  pSndBuf->entryDeleteCb = syncSnapBlockDestroy;
242,274✔
103
  pSender->pSndBuf = pSndBuf;
242,274✔
104

105
  syncSnapBufferReset(pSender->pSndBuf);
242,274✔
106
  *ppSender = pSender;
242,261✔
107
  TAOS_RETURN(code);
242,261✔
108
}
109

110
void syncSnapBlockDestroy(void *ptr) {
85,434✔
111
  SyncSnapBlock *pBlk = ptr;
85,434✔
112
  if (pBlk->pBlock != NULL) {
85,434✔
113
    taosMemoryFree(pBlk->pBlock);
85,328✔
114
    pBlk->pBlock = NULL;
85,328✔
115
    pBlk->blockLen = 0;
85,328✔
116
  }
117
  taosMemoryFree(pBlk);
85,434✔
118
}
85,434✔
119

120
static int32_t snapshotSenderClearInfoData(SSyncSnapshotSender *pSender) {
242,369✔
121
  if (pSender->snapshotParam.data) {
242,369✔
122
    taosMemoryFree(pSender->snapshotParam.data);
106✔
123
    pSender->snapshotParam.data = NULL;
106✔
124
  }
125

126
  if (pSender->snapshot.data) {
242,369!
127
    taosMemoryFree(pSender->snapshot.data);
×
128
    pSender->snapshot.data = NULL;
×
129
  }
130
  return 0;
242,369✔
131
}
132

133
void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
242,285✔
134
  if (pSender == NULL) return;
242,285!
135

136
  // close reader
137
  if (pSender->pReader != NULL) {
242,285!
138
    pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
×
139
    pSender->pReader = NULL;
×
140
  }
141

142
  // free snap buffer
143
  if (pSender->pSndBuf) {
242,285✔
144
    syncSnapBufferDestroy(&pSender->pSndBuf);
242,279✔
145
  }
146

147
  (void)snapshotSenderClearInfoData(pSender);
242,270✔
148

149
  // free sender
150
  taosMemoryFree(pSender);
242,270✔
151
}
152

153
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return atomic_load_8(&pSender->start); }
409,724✔
154

155
int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
106✔
156
  int32_t code = 0;
106✔
157

158
  int8_t started = atomic_val_compare_exchange_8(&pSender->start, false, true);
106✔
159
  if (started) return 0;
106!
160

161
  pSender->seq = SYNC_SNAPSHOT_SEQ_PREP;
106✔
162
  pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
106✔
163
  pSender->pReader = NULL;
106✔
164
  pSender->snapshotParam.start = SYNC_INDEX_INVALID;
106✔
165
  pSender->snapshotParam.end = SYNC_INDEX_INVALID;
106✔
166
  pSender->snapshot.data = NULL;
106✔
167
  pSender->snapshotParam.end = SYNC_INDEX_INVALID;
106✔
168
  pSender->snapshot.lastApplyIndex = SYNC_INDEX_INVALID;
106✔
169
  pSender->snapshot.lastApplyTerm = SYNC_TERM_INVALID;
106✔
170
  pSender->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;
106✔
171

172
  (void)memset(&pSender->lastConfig, 0, sizeof(pSender->lastConfig));
106✔
173
  pSender->sendingMS = 0;
106✔
174
  pSender->term = raftStoreGetTerm(pSender->pSyncNode);
106✔
175
  pSender->startTime = taosGetMonoTimestampMs();
106✔
176
  pSender->lastSendTime = taosGetTimestampMs();
106✔
177
  pSender->finish = false;
106✔
178

179
  // Get snapshot info
180
  SSyncNode *pSyncNode = pSender->pSyncNode;
106✔
181
  SSnapshot  snapInfo = {.type = TDMT_SYNC_PREP_SNAPSHOT};
106✔
182
  if ((code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapInfo)) != 0) {
106!
183
    sSError(pSender, "snapshot get info failure since %s", tstrerror(code));
×
184
    goto _out;
×
185
  }
186

187
  void   *pData = snapInfo.data;
106✔
188
  int32_t type = (pData) ? snapInfo.type : 0;
106!
189
  int32_t dataLen = 0;
106✔
190
  if (pData) {
106!
191
    SSyncTLV *datHead = pData;
106✔
192
    if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT) {
106!
193
      sSError(pSender, "unexpected data typ in data of snapshot info. typ: %d", datHead->typ);
×
194
      code = TSDB_CODE_INVALID_DATA_FMT;
×
195
      goto _out;
×
196
    }
197
    dataLen = sizeof(SSyncTLV) + datHead->len;
106✔
198
  }
199

200
  if ((code = syncSnapSendMsg(pSender, pSender->seq, pData, dataLen, type)) != 0) {
106!
201
    goto _out;
×
202
  }
203

204
  SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
106✔
205
  sSInfo(pSender, "snapshot sender start, to dnode:%d.", DID(&destId));
106!
206
_out:
×
207
  if (snapInfo.data) {
106!
208
    taosMemoryFree(snapInfo.data);
106✔
209
    snapInfo.data = NULL;
106✔
210
  }
211
  TAOS_RETURN(code);
106✔
212
}
213

214
void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
106✔
215
  sSDebug(pSender, "snapshot sender stop, finish:%d reader:%p", finish, pSender->pReader);
106!
216

217
  // update flag
218
  int8_t stopped = !atomic_val_compare_exchange_8(&pSender->start, true, false);
106✔
219
  if (stopped) return;
106!
220
  (void)taosThreadMutexLock(&pSender->pSndBuf->mutex);
106✔
221
  {
222
    pSender->finish = finish;
106✔
223

224
    // close reader
225
    if (pSender->pReader != NULL) {
106!
226
      pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
106✔
227
      pSender->pReader = NULL;
106✔
228
    }
229

230
    syncSnapBufferReset(pSender->pSndBuf);
106✔
231

232
    (void)snapshotSenderClearInfoData(pSender);
106✔
233

234
    SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
106✔
235
    sSInfo(pSender, "snapshot sender stop, to dnode:%d, finish:%d", DID(&destId), finish);
106!
236
  }
237
  (void)taosThreadMutexUnlock(&pSender->pSndBuf->mutex);
106✔
238
}
239

240
int32_t syncSnapSendMsg(SSyncSnapshotSender *pSender, int32_t seq, void *pBlock, int32_t blockLen, int32_t typ) {
85,646✔
241
  int32_t code = 0;
85,646✔
242
  SRpcMsg rpcMsg = {0};
85,646✔
243

244
  if ((code = syncBuildSnapshotSend(&rpcMsg, blockLen, pSender->pSyncNode->vgId)) != 0) {
85,646!
245
    sSError(pSender, "failed to build snap replication msg since %s", tstrerror(code));
×
246
    goto _OUT;
×
247
  }
248

249
  SyncSnapshotSend *pMsg = rpcMsg.pCont;
85,646✔
250
  pMsg->srcId = pSender->pSyncNode->myRaftId;
85,646✔
251
  pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
85,646✔
252
  pMsg->term = pSender->term;
85,646✔
253
  pMsg->beginIndex = pSender->snapshotParam.start;
85,646✔
254
  pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
85,646✔
255
  pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
85,646✔
256
  pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
85,646✔
257
  pMsg->lastConfig = pSender->lastConfig;
85,646✔
258
  pMsg->startTime = pSender->startTime;
85,646✔
259
  pMsg->seq = seq;
85,646✔
260

261
  if (pBlock != NULL && blockLen > 0) {
85,646!
262
    (void)memcpy(pMsg->data, pBlock, blockLen);
85,434✔
263
  }
264
  pMsg->payloadType = typ;
85,646✔
265

266
  // send msg
267
  if ((code = syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg)) != 0) {
85,646!
268
    sSError(pSender, "failed to send snap replication msg since %s. seq:%d", tstrerror(code), seq);
×
269
    goto _OUT;
×
270
  }
271

272
_OUT:
85,646✔
273
  TAOS_RETURN(code);
85,646✔
274
}
275

276
// when sender receive ack, call this function to send msg from seq
277
// seq = ack + 1, already updated
278
static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
85,646✔
279
  int32_t        code = 0;
85,646✔
280
  SyncSnapBlock *pBlk = NULL;
85,646✔
281

282
  if (pSender->seq < SYNC_SNAPSHOT_SEQ_END) {
85,646✔
283
    pSender->seq++;
85,540✔
284

285
    if (pSender->seq > SYNC_SNAPSHOT_SEQ_BEGIN) {
85,540✔
286
      pBlk = taosMemoryCalloc(1, sizeof(SyncSnapBlock));
85,434✔
287
      if (pBlk == NULL) {
85,434!
288
        code = terrno;
×
289
        goto _OUT;
×
290
      }
291

292
      pBlk->seq = pSender->seq;
85,434✔
293

294
      // read data
295
      code = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader, &pBlk->pBlock,
85,434✔
296
                                                        &pBlk->blockLen);
297
      if (code != 0) {
85,434!
298
        sSError(pSender, "snapshot sender read failed since %s", tstrerror(code));
×
299
        goto _OUT;
×
300
      }
301

302
      if (pBlk->blockLen > 0) {
85,434✔
303
        // has read data
304
        sSDebug(pSender, "snapshot sender continue to read, blockLen:%d seq:%d", pBlk->blockLen, pBlk->seq);
85,328!
305
      } else {
306
        // read finish, update seq to end
307
        pSender->seq = SYNC_SNAPSHOT_SEQ_END;
106✔
308
        sSInfo(pSender, "snapshot sender read to the end");
106!
309
        goto _OUT;
106✔
310
      }
311
    }
312
  }
313

314
  if (!(pSender->seq >= SYNC_SNAPSHOT_SEQ_BEGIN && pSender->seq <= SYNC_SNAPSHOT_SEQ_END)) {
85,540!
315
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
316
    goto _OUT;
×
317
  }
318

319
  // send msg
320
  int32_t blockLen = (pBlk) ? pBlk->blockLen : 0;
85,540✔
321
  void   *pBlock = (pBlk) ? pBlk->pBlock : NULL;
85,540✔
322
  if ((code = syncSnapSendMsg(pSender, pSender->seq, pBlock, blockLen, 0)) != 0) {
85,540!
323
    goto _OUT;
×
324
  }
325

326
  // put in buffer
327
  int64_t nowMs = taosGetTimestampMs();
85,540✔
328
  if (pBlk) {
85,540✔
329
    if (!(pBlk->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pBlk->seq < SYNC_SNAPSHOT_SEQ_END)) {
85,328!
330
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
331
      goto _OUT;
×
332
    }
333
    pBlk->sendTimeMs = nowMs;
85,328✔
334
    pSender->pSndBuf->entries[pSender->seq % pSender->pSndBuf->size] = pBlk;
85,328✔
335
    pBlk = NULL;
85,328✔
336
    pSender->pSndBuf->end = TMAX(pSender->seq + 1, pSender->pSndBuf->end);
85,328✔
337
  }
338
  pSender->lastSendTime = nowMs;
85,540✔
339

340
_OUT:;
85,646✔
341
  if (pBlk != NULL) {
85,646✔
342
    syncSnapBlockDestroy(pBlk);
106✔
343
    pBlk = NULL;
106✔
344
  }
345
  TAOS_RETURN(code);
85,646✔
346
}
347

348
// send snapshot data from cache
349
int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
×
350
  SSyncSnapBuffer *pSndBuf = pSender->pSndBuf;
×
351
  int32_t          code = 0;
×
352
  (void)taosThreadMutexLock(&pSndBuf->mutex);
×
353
  if (pSender->pReader == NULL || pSender->finish || !snapshotSenderIsStart(pSender)) {
×
354
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
355
    goto _out;
×
356
  }
357

358
  for (int32_t seq = pSndBuf->cursor + 1; seq < pSndBuf->end; ++seq) {
×
359
    SyncSnapBlock *pBlk = pSndBuf->entries[seq % pSndBuf->size];
×
360
    if (!pBlk) {
×
361
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
362
      goto _out;
×
363
    }
364
    int64_t nowMs = taosGetTimestampMs();
×
365
    if (pBlk->acked || nowMs < pBlk->sendTimeMs + SYNC_SNAP_RESEND_MS) {
×
366
      continue;
×
367
    }
368
    if ((code = syncSnapSendMsg(pSender, pBlk->seq, pBlk->pBlock, pBlk->blockLen, 0)) != 0) {
×
369
      goto _out;
×
370
    }
371
    pBlk->sendTimeMs = nowMs;
×
372
  }
373

374
  if (pSender->seq != SYNC_SNAPSHOT_SEQ_END && pSndBuf->end <= pSndBuf->start) {
×
375
    if ((code = snapshotSend(pSender)) != 0) {
×
376
      goto _out;
×
377
    }
378
  }
379

380
  if (pSender->seq == SYNC_SNAPSHOT_SEQ_END && pSndBuf->end <= pSndBuf->start) {
×
381
    if ((code = syncSnapSendMsg(pSender, pSender->seq, NULL, 0, 0)) != 0) {
×
382
      goto _out;
×
383
    }
384
  }
385
_out:;
×
386
  (void)taosThreadMutexUnlock(&pSndBuf->mutex);
×
387
  TAOS_RETURN(code);
×
388
}
389

390
int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) {
106✔
391
  SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, pDestId);
106✔
392
  if (pSender == NULL) {
106!
393
    sNError(pSyncNode, "snapshot sender start error since get failed");
×
394
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
395
  }
396

397
  if (snapshotSenderIsStart(pSender)) {
106!
398
    sSDebug(pSender, "snapshot sender already start, ignore");
×
399
    return 0;
×
400
  }
401

402
  taosMsleep(1);
106✔
403

404
  int32_t code = snapshotSenderStart(pSender);
106✔
405
  if (code != 0) {
106!
406
    sSError(pSender, "snapshot sender start error since %s", tstrerror(code));
×
407
    TAOS_RETURN(code);
×
408
  }
409

410
  return 0;
106✔
411
}
412

413
// receiver
414
int32_t snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId, SSyncSnapshotReceiver **ppReceiver) {
15,903✔
415
  int32_t code = 0;
15,903✔
416
  *ppReceiver = NULL;
15,903✔
417
  bool condition = (pSyncNode->pFsm->FpSnapshotStartWrite != NULL) && (pSyncNode->pFsm->FpSnapshotStopWrite != NULL) &&
31,811!
418
                   (pSyncNode->pFsm->FpSnapshotDoWrite != NULL);
15,908!
419
  if (!condition) {
15,903!
420
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
421
  }
422

423
  SSyncSnapshotReceiver *pReceiver = taosMemoryCalloc(1, sizeof(SSyncSnapshotReceiver));
15,903✔
424
  if (pReceiver == NULL) {
15,908!
425
    TAOS_RETURN(terrno);
×
426
  }
427

428
  pReceiver->start = false;
15,908✔
429
  pReceiver->startTime = 0;
15,908✔
430
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
15,908✔
431
  pReceiver->pWriter = NULL;
15,908✔
432
  code = taosThreadMutexInit(&pReceiver->writerMutex, NULL);
15,908✔
433
  if (code != 0) {
15,908!
434
    taosMemoryFree(pReceiver);
×
435
    pReceiver = NULL;
×
436
    TAOS_RETURN(code);
×
437
  }
438
  pReceiver->pSyncNode = pSyncNode;
15,908✔
439
  pReceiver->fromId = fromId;
15,908✔
440
  pReceiver->term = raftStoreGetTerm(pSyncNode);
15,908✔
441
  pReceiver->snapshot.data = NULL;
15,908✔
442
  pReceiver->snapshot.lastApplyIndex = SYNC_INDEX_INVALID;
15,908✔
443
  pReceiver->snapshot.lastApplyTerm = 0;
15,908✔
444
  pReceiver->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;
15,908✔
445

446
  SSyncSnapBuffer *pRcvBuf = NULL;
15,908✔
447
  code = syncSnapBufferCreate(&pRcvBuf);
15,908✔
448
  if (pRcvBuf == NULL) {
15,908!
449
    int32_t ret = taosThreadMutexDestroy(&pReceiver->writerMutex);
×
450
    if (ret != 0) {
×
451
      sError("failed to destroy mutex since %s", tstrerror(ret));
×
452
    }
453
    taosMemoryFree(pReceiver);
×
454
    pReceiver = NULL;
×
455
    TAOS_RETURN(code);
×
456
  }
457
  pRcvBuf->entryDeleteCb = rpcFreeCont;
15,908✔
458
  pReceiver->pRcvBuf = pRcvBuf;
15,908✔
459

460
  syncSnapBufferReset(pReceiver->pRcvBuf);
15,908✔
461
  *ppReceiver = pReceiver;
15,908✔
462
  TAOS_RETURN(code);
15,908✔
463
}
464

465
static int32_t snapshotReceiverClearInfoData(SSyncSnapshotReceiver *pReceiver) {
16,011✔
466
  if (pReceiver->snapshotParam.data) {
16,011✔
467
    taosMemoryFree(pReceiver->snapshotParam.data);
105✔
468
    pReceiver->snapshotParam.data = NULL;
105✔
469
  }
470

471
  if (pReceiver->snapshot.data) {
16,011!
472
    taosMemoryFree(pReceiver->snapshot.data);
×
473
    pReceiver->snapshot.data = NULL;
×
474
  }
475
  return 0;
16,011✔
476
}
477

478
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
15,907✔
479
  if (pReceiver == NULL) return;
15,907!
480

481
  (void)taosThreadMutexLock(&pReceiver->writerMutex);
15,907✔
482
  // close writer
483
  if (pReceiver->pWriter != NULL) {
15,907!
484
    int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
×
485
                                                                   false, &pReceiver->snapshot);
486
    if (code != 0) {
×
487
      sError("vgId:%d, snapshot receiver stop failed while destroy since %s", pReceiver->pSyncNode->vgId,
×
488
             tstrerror(code));
489
    }
490
    pReceiver->pWriter = NULL;
×
491
  }
492
  (void)taosThreadMutexUnlock(&pReceiver->writerMutex);
15,907✔
493

494
  (void)taosThreadMutexDestroy(&pReceiver->writerMutex);
15,907✔
495

496
  // free snap buf
497
  if (pReceiver->pRcvBuf) {
15,907!
498
    syncSnapBufferDestroy(&pReceiver->pRcvBuf);
15,907✔
499
  }
500

501
  (void)snapshotReceiverClearInfoData(pReceiver);
15,906✔
502

503
  // free receiver
504
  taosMemoryFree(pReceiver);
15,907✔
505
}
506

507
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) {
45,734✔
508
  return (pReceiver != NULL ? atomic_load_8(&pReceiver->start) : false);
45,734!
509
}
510

511
static int32_t snapshotReceiverSignatureCmp(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
85,509✔
512
  if (pReceiver->term < pMsg->term) return -1;
85,509!
513
  if (pReceiver->term > pMsg->term) return 1;
85,509!
514
  if (pReceiver->startTime < pMsg->startTime) return -1;
85,509!
515
  if (pReceiver->startTime > pMsg->startTime) return 1;
85,509!
516
  return 0;
85,509✔
517
}
518

519
static int32_t snapshotReceiverStartWriter(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) {
105✔
520
  if (pReceiver->pWriter != NULL) {
105!
521
    sRError(pReceiver, "vgId:%d, snapshot receiver writer is not null", pReceiver->pSyncNode->vgId);
×
522
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
523
  }
524

525
  // update ack
526
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
105✔
527

528
  // update snapshot
529
  pReceiver->snapshot.lastApplyIndex = pBeginMsg->lastIndex;
105✔
530
  pReceiver->snapshot.lastApplyTerm = pBeginMsg->lastTerm;
105✔
531
  pReceiver->snapshot.lastConfigIndex = pBeginMsg->lastConfigIndex;
105✔
532
  pReceiver->snapshotParam.start = pBeginMsg->beginIndex;
105✔
533
  pReceiver->snapshotParam.end = pBeginMsg->lastIndex;
105✔
534

535
  // start writer
536
  int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, &pReceiver->snapshotParam,
105✔
537
                                                                  &pReceiver->pWriter);
538
  if (code != 0) {
105!
539
    sRError(pReceiver, "snapshot receiver start write failed since %s", tstrerror(code));
×
540
    TAOS_RETURN(code);
×
541
  }
542

543
  // event log
544
  sRInfo(pReceiver, "snapshot receiver start write");
105!
545
  return 0;
105✔
546
}
547

548
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pPreMsg) {
105✔
549
  if (snapshotReceiverIsStart(pReceiver)) {
105!
550
    sRInfo(pReceiver, "snapshot receiver has started");
×
551
    return;
×
552
  }
553

554
  int8_t started = atomic_val_compare_exchange_8(&pReceiver->start, false, true);
105✔
555
  if (started) return;
105!
556

557
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_PREP;
105✔
558
  pReceiver->term = pPreMsg->term;
105✔
559
  pReceiver->fromId = pPreMsg->srcId;
105✔
560
  pReceiver->startTime = pPreMsg->startTime;
105✔
561

562
  pReceiver->snapshotParam.start = syncNodeGetSnapBeginIndex(pReceiver->pSyncNode);
105✔
563
  pReceiver->snapshotParam.end = -1;
105✔
564

565
  sRInfo(pReceiver, "snapshot receiver start, from dnode:%d.", DID(&pReceiver->fromId));
105!
566
}
567

568
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
105✔
569
  sRDebug(pReceiver, "snapshot receiver stop, not apply, writer:%p", pReceiver->pWriter);
105!
570

571
  int8_t stopped = !atomic_val_compare_exchange_8(&pReceiver->start, true, false);
105✔
572
  if (stopped) return;
105!
573

574
  (void)taosThreadMutexLock(&pReceiver->writerMutex);
105✔
575
  {
576
    if (pReceiver->pWriter != NULL) {
105!
577
      int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
×
578
                                                                     false, &pReceiver->snapshot);
579
      if (code != 0) {
×
580
        sRError(pReceiver, "snapshot receiver stop write failed since %s", tstrerror(code));
×
581
      }
582
      pReceiver->pWriter = NULL;
×
583
    } else {
584
      sRInfo(pReceiver, "snapshot receiver stop, writer is null");
105!
585
    }
586
  }
587
  (void)taosThreadMutexUnlock(&pReceiver->writerMutex);
105✔
588

589
  (void)taosThreadMutexLock(&pReceiver->pRcvBuf->mutex);
105✔
590
  {
591
    syncSnapBufferReset(pReceiver->pRcvBuf);
105✔
592

593
    (void)snapshotReceiverClearInfoData(pReceiver);
105✔
594
  }
595
  (void)taosThreadMutexUnlock(&pReceiver->pRcvBuf->mutex);
105✔
596
}
597

598
static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
105✔
599
  int32_t code = 0;
105✔
600
  if (pReceiver->pWriter != NULL) {
105!
601
    // write data
602
    sRInfo(pReceiver, "snapshot receiver write about to finish, blockLen:%d seq:%d", pMsg->dataLen, pMsg->seq);
105!
603
    if (pMsg->dataLen > 0) {
105!
604
      code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, pMsg->data,
×
605
                                                           pMsg->dataLen);
×
606
      if (code != 0) {
×
607
        sRError(pReceiver, "failed to finish snapshot receiver write since %s", tstrerror(code));
×
608
        TAOS_RETURN(code);
×
609
      }
610
    }
611

612
    // update commit index
613
    if (pReceiver->snapshot.lastApplyIndex > pReceiver->pSyncNode->commitIndex) {
105!
614
      pReceiver->pSyncNode->commitIndex = pReceiver->snapshot.lastApplyIndex;
105✔
615
    }
616

617
    // maybe update term
618
    if (pReceiver->snapshot.lastApplyTerm > raftStoreGetTerm(pReceiver->pSyncNode)) {
105!
619
      raftStoreSetTerm(pReceiver->pSyncNode, pReceiver->snapshot.lastApplyTerm);
×
620
    }
621

622
    (void)taosThreadMutexLock(&pReceiver->writerMutex);
105✔
623
    if (pReceiver->pWriter != NULL) {
105!
624
      // stop writer, apply data
625
      code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true,
105✔
626
                                                             &pReceiver->snapshot);
627
      if (code != 0) {
105!
628
        sRError(pReceiver, "snapshot receiver apply failed since %s", tstrerror(code));
×
629
        TAOS_RETURN(code);
×
630
      }
631
      pReceiver->pWriter = NULL;
105✔
632
      sRInfo(pReceiver, "snapshot receiver write stopped");
105!
633
    }
634
    (void)taosThreadMutexUnlock(&pReceiver->writerMutex);
105✔
635

636
    // update progress
637
    pReceiver->ack = SYNC_SNAPSHOT_SEQ_END;
105✔
638

639
    // get fsmState
640
    SSnapshot snapshot = {0};
105✔
641
    code = pReceiver->pSyncNode->pFsm->FpGetSnapshotInfo(pReceiver->pSyncNode->pFsm, &snapshot);
105✔
642
    if (code != 0) {
105!
643
      sRError(pReceiver, "snapshot receiver get snapshot info failed since %s", tstrerror(code));
×
644
      TAOS_RETURN(code);
×
645
    }
646
    pReceiver->pSyncNode->fsmState = snapshot.state;
105✔
647

648
    // reset wal
649
    code =
650
        pReceiver->pSyncNode->pLogStore->syncLogRestoreFromSnapshot(pReceiver->pSyncNode->pLogStore, pMsg->lastIndex);
105✔
651
    if (code != 0) {
105!
652
      sRError(pReceiver, "failed to snapshot receiver log restore since %s", tstrerror(code));
×
653
      TAOS_RETURN(code);
×
654
    }
655
    sRInfo(pReceiver, "wal log restored from snapshot");
105!
656
  } else {
657
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
658
    sRError(pReceiver, "snapshot receiver finish error since writer is null");
×
659
    TAOS_RETURN(code);
×
660
  }
661

662
  return 0;
105✔
663
}
664

665
static int32_t snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
85,299✔
666
  if (pMsg->seq != pReceiver->ack + 1) {
85,299!
667
    sRError(pReceiver, "snapshot receiver invalid seq, ack:%d seq:%d", pReceiver->ack, pMsg->seq);
×
668
    TAOS_RETURN(TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG);
×
669
  }
670

671
  if (pReceiver->pWriter == NULL) {
85,299!
672
    sRError(pReceiver, "snapshot receiver failed to write data since writer is null");
×
673
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
674
  }
675

676
  sRDebug(pReceiver, "snapshot receiver continue to write, blockLen:%d seq:%d", pMsg->dataLen, pMsg->seq);
85,299!
677

678
  if (pMsg->dataLen > 0) {
85,299!
679
    // apply data block
680
    int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
85,299✔
681
                                                                 pMsg->data, pMsg->dataLen);
85,299✔
682
    if (code != 0) {
85,299!
683
      sRError(pReceiver, "snapshot receiver continue write failed since %s", tstrerror(code));
×
684
      TAOS_RETURN(code);
×
685
    }
686
  }
687

688
  // update progress
689
  pReceiver->ack = pMsg->seq;
85,299✔
690

691
  // event log
692
  sRDebug(pReceiver, "snapshot receiver continue to write finish");
85,299!
693
  return 0;
85,299✔
694
}
695

696
SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths) {
210✔
697
  SyncIndex snapStart = SYNC_INDEX_INVALID;
210✔
698

699
  if (syncNodeIsMnode(ths)) {
210!
700
    snapStart = SYNC_INDEX_BEGIN;
×
701
    sNInfo(ths, "snapshot begin index is %" PRId64 " since its mnode", snapStart);
×
702
  } else {
703
    SSyncLogStoreData *pData = ths->pLogStore->data;
210✔
704
    SWal              *pWal = pData->pWal;
210✔
705

706
    int64_t walCommitVer = walGetCommittedVer(pWal);
210✔
707
    snapStart = TMAX(ths->commitIndex, walCommitVer) + 1;
210✔
708

709
    sNInfo(ths, "snapshot begin index is %" PRId64, snapStart);
210!
710
  }
711

712
  return snapStart;
210✔
713
}
714

715
static int32_t syncSnapReceiverExchgSnapInfo(SSyncNode *pSyncNode, SSyncSnapshotReceiver *pReceiver,
105✔
716
                                             SyncSnapshotSend *pMsg, SSnapshot *pInfo) {
717
  if (pMsg->payloadType != TDMT_SYNC_PREP_SNAPSHOT) return TSDB_CODE_SYN_INTERNAL_ERROR;
105!
718
  int32_t code = 0, lino = 0;
105✔
719

720
  // copy snap info from leader
721
  void *data = taosMemoryCalloc(1, pMsg->dataLen);
105✔
722
  if (data == NULL) {
105!
723
    TAOS_CHECK_EXIT(terrno);
×
724
  }
725
  pInfo->data = data;
105✔
726
  data = NULL;
105✔
727
  (void)memcpy(pInfo->data, pMsg->data, pMsg->dataLen);
105✔
728

729
  // exchange snap info
730
  if ((code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, pInfo)) != 0) {
105!
731
    sRError(pReceiver, "failed to get snapshot info. type: %d", pMsg->payloadType);
×
732
    goto _exit;
×
733
  }
734
  SSyncTLV *datHead = pInfo->data;
105✔
735
  if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
105!
736
    sRError(pReceiver, "unexpected data typ in data of snapshot info. typ: %d", datHead->typ);
×
737
    code = TSDB_CODE_INVALID_DATA_FMT;
×
738
    goto _exit;
×
739
  }
740
  int32_t dataLen = sizeof(SSyncTLV) + datHead->len;
105✔
741

742
  // save exchanged snap info
743
  SSnapshotParam *pParam = &pReceiver->snapshotParam;
105✔
744
  data = taosMemoryRealloc(pParam->data, dataLen);
105✔
745
  if (data == NULL) {
105!
746
    code = terrno;
×
747
    sError("vgId:%d, failed to realloc memory for snapshot prep due to %s. dataLen:%d", pSyncNode->vgId,
×
748
           tstrerror(code), dataLen);
749
    goto _exit;
×
750
  }
751
  pParam->data = data;
105✔
752
  data = NULL;
105✔
753
  (void)memcpy(pParam->data, pInfo->data, dataLen);
105✔
754

755
_exit:
105✔
756
  TAOS_RETURN(code);
105✔
757
}
758

759
static int32_t syncNodeOnSnapshotPrep(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
105✔
760
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
105✔
761
  int64_t                timeNow = taosGetTimestampMs();
105✔
762
  int32_t                code = 0;
105✔
763

764
  if (snapshotReceiverIsStart(pReceiver)) {
105!
765
    // already start
766
    int32_t order = 0;
×
767
    if ((order = snapshotReceiverSignatureCmp(pReceiver, pMsg)) < 0) {
×
768
      sRInfo(pReceiver,
×
769
             "received a new snapshot preparation. restart receiver."
770
             " msg signature:(%" PRId64 ", %" PRId64 ")",
771
             pMsg->term, pMsg->startTime);
772
      goto _START_RECEIVER;
×
773
    } else if (order == 0) {
×
774
      sRInfo(pReceiver,
×
775
             "received a duplicate snapshot preparation. send reply."
776
             " msg signature:(%" PRId64 ", %" PRId64 ")",
777
             pMsg->term, pMsg->startTime);
778
      goto _SEND_REPLY;
×
779
    } else {
780
      // ignore
781
      sRError(pReceiver,
×
782
              "received a stale snapshot preparation. ignore."
783
              " msg signature:(%" PRId64 ", %" PRId64 ")",
784
              pMsg->term, pMsg->startTime);
785
      code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
786
      goto _SEND_REPLY;
×
787
    }
788
  } else {
789
    // start new
790
    sRInfo(pReceiver, "snapshot receiver not start yet so start new one");
105!
791
    goto _START_RECEIVER;
105✔
792
  }
793

794
_START_RECEIVER:
105✔
795
  if (snapshotReceiverIsStart(pReceiver)) {
105!
796
    sRInfo(pReceiver, "snapshot receiver already start and force stop pre one");
×
797
    snapshotReceiverStop(pReceiver);
×
798
  }
799

800
  snapshotReceiverStart(pReceiver, pMsg);
105✔
801

802
_SEND_REPLY:;
105✔
803

804
  SSnapshot snapInfo = {.type = TDMT_SYNC_PREP_SNAPSHOT_REPLY};
105✔
805
  int32_t   dataLen = 0;
105✔
806
  if (pMsg->payloadType == TDMT_SYNC_PREP_SNAPSHOT) {
105!
807
    if ((code = syncSnapReceiverExchgSnapInfo(pSyncNode, pReceiver, pMsg, &snapInfo)) != 0) {
105!
808
      goto _out;
×
809
    }
810
    SSyncTLV *datHead = snapInfo.data;
105✔
811
    dataLen = sizeof(SSyncTLV) + datHead->len;
105✔
812
  }
813

814
  // send response
815
  int32_t type = (snapInfo.data) ? snapInfo.type : 0;
105!
816
  if ((code = syncSnapSendRsp(pReceiver, pMsg, snapInfo.data, dataLen, type, code)) != 0) {
105!
817
    goto _out;
×
818
  }
819

820
_out:
105✔
821
  if (snapInfo.data) {
105!
822
    taosMemoryFree(snapInfo.data);
105✔
823
    snapInfo.data = NULL;
105✔
824
  }
825
  TAOS_RETURN(code);
105✔
826
}
827

828
static int32_t syncNodeOnSnapshotBegin(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
105✔
829
  // condition 1
830
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
105✔
831
  int32_t                code = TSDB_CODE_SYN_INTERNAL_ERROR;
105✔
832

833
  if (!snapshotReceiverIsStart(pReceiver)) {
105!
834
    sRError(pReceiver, "failed to begin snapshot receiver since not started");
×
835
    goto _SEND_REPLY;
×
836
  }
837

838
  if (snapshotReceiverSignatureCmp(pReceiver, pMsg) != 0) {
105!
839
    code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
840
    sRError(pReceiver, "failed to begin snapshot receiver since %s", tstrerror(code));
×
841
    goto _SEND_REPLY;
×
842
  }
843

844
  // start writer
845
  if ((code = snapshotReceiverStartWriter(pReceiver, pMsg)) != 0) {
105!
846
    sRError(pReceiver, "failed to start snapshot writer since %s", tstrerror(code));
×
847
    goto _SEND_REPLY;
×
848
  }
849

850
  SyncIndex beginIndex = syncNodeGetSnapBeginIndex(pSyncNode);
105✔
851
  if (pReceiver->snapshotParam.start != beginIndex) {
105!
852
    sRError(pReceiver, "snapshot begin index is changed unexpectedly. sver:%" PRId64 ", beginIndex:%" PRId64,
×
853
            pReceiver->snapshotParam.start, beginIndex);
854
    goto _SEND_REPLY;
×
855
  }
856

857
  code = 0;
105✔
858
_SEND_REPLY:
105✔
859

860
  // send response
861
  TAOS_CHECK_RETURN(syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, code));
105!
862

863
  TAOS_RETURN(code);
105✔
864
}
865

866
int32_t syncSnapSendRsp(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg, void *pBlock, int32_t blockLen,
85,509✔
867
                        int32_t type, int32_t rspCode) {
868
  int32_t    code = 0;
85,509✔
869
  SSyncNode *pSyncNode = pReceiver->pSyncNode;
85,509✔
870
  // build msg
871
  SRpcMsg rpcMsg = {0};
85,509✔
872
  if ((code = syncBuildSnapshotSendRsp(&rpcMsg, blockLen, pSyncNode->vgId)) != 0) {
85,509!
873
    sRError(pReceiver, "failed to build snapshot receiver resp since %s", tstrerror(code));
×
874
    TAOS_RETURN(code);
×
875
  }
876

877
  SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
85,509✔
878
  pRspMsg->srcId = pSyncNode->myRaftId;
85,509✔
879
  pRspMsg->destId = pMsg->srcId;
85,509✔
880
  pRspMsg->term = pMsg->term;
85,509✔
881
  pRspMsg->lastIndex = pMsg->lastIndex;
85,509✔
882
  pRspMsg->lastTerm = pMsg->lastTerm;
85,509✔
883
  pRspMsg->startTime = pMsg->startTime;
85,509✔
884
  pRspMsg->ack = pMsg->seq;
85,509✔
885
  pRspMsg->code = rspCode;
85,509✔
886
  pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;
85,509✔
887
  pRspMsg->payloadType = type;
85,509✔
888

889
  if (pBlock != NULL && blockLen > 0) {
85,509!
890
    (void)memcpy(pRspMsg->data, pBlock, blockLen);
105✔
891
  }
892

893
  // send msg
894
  if ((code = syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg)) != 0) {
85,509!
895
    sRError(pReceiver, "failed to send snapshot receiver resp since %s", tstrerror(code));
×
896
    TAOS_RETURN(code);
×
897
  }
898
  return 0;
85,509✔
899
}
900

901
static int32_t syncSnapBufferRecv(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend **ppMsg) {
85,299✔
902
  int32_t           code = 0;
85,299✔
903
  SSyncSnapBuffer  *pRcvBuf = pReceiver->pRcvBuf;
85,299✔
904
  SyncSnapshotSend *pMsg = ppMsg[0];
85,299✔
905

906
  (void)taosThreadMutexLock(&pRcvBuf->mutex);
85,299✔
907

908
  if (pMsg->seq - pRcvBuf->start >= pRcvBuf->size) {
85,299!
909
    code = TSDB_CODE_SYN_BUFFER_FULL;
×
910
    goto _out;
×
911
  }
912

913
  if (!(pRcvBuf->start <= pRcvBuf->cursor + 1 && pRcvBuf->cursor < pRcvBuf->end)) return TSDB_CODE_SYN_INTERNAL_ERROR;
85,299!
914

915
  if (pMsg->seq > pRcvBuf->cursor) {
85,299!
916
    if (pRcvBuf->entries[pMsg->seq % pRcvBuf->size]) {
85,299!
917
      pRcvBuf->entryDeleteCb(pRcvBuf->entries[pMsg->seq % pRcvBuf->size]);
×
918
    }
919
    pRcvBuf->entries[pMsg->seq % pRcvBuf->size] = pMsg;
85,299✔
920
    ppMsg[0] = NULL;
85,299✔
921
    pRcvBuf->end = TMAX(pMsg->seq + 1, pRcvBuf->end);
85,299✔
922
  } else if (pMsg->seq < pRcvBuf->start) {
×
923
    code = syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, code);
×
924
    goto _out;
×
925
  }
926

927
  for (int64_t seq = pRcvBuf->cursor + 1; seq < pRcvBuf->end; ++seq) {
170,598✔
928
    if (pRcvBuf->entries[seq % pRcvBuf->size]) {
97,116✔
929
      pRcvBuf->cursor = seq;
85,299✔
930
    } else {
931
      break;
11,817✔
932
    }
933
  }
934

935
  for (int64_t seq = pRcvBuf->start; seq <= pRcvBuf->cursor; ++seq) {
170,598✔
936
    if ((code = snapshotReceiverGotData(pReceiver, pRcvBuf->entries[seq % pRcvBuf->size])) != 0) {
85,299!
937
      if (code >= SYNC_SNAPSHOT_SEQ_INVALID) {
×
938
        code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
939
      }
940
    }
941
    pRcvBuf->start = seq + 1;
85,299✔
942
    if (syncSnapSendRsp(pReceiver, pRcvBuf->entries[seq % pRcvBuf->size], NULL, 0, 0, code) != 0) {
85,299!
943
      sError("failed to send snap rsp");
×
944
    }
945
    pRcvBuf->entryDeleteCb(pRcvBuf->entries[seq % pRcvBuf->size]);
85,299✔
946
    pRcvBuf->entries[seq % pRcvBuf->size] = NULL;
85,299✔
947
    if (code) goto _out;
85,299!
948
  }
949

950
_out:
85,299✔
951
  (void)taosThreadMutexUnlock(&pRcvBuf->mutex);
85,299✔
952
  TAOS_RETURN(code);
85,299✔
953
}
954

955
static int32_t syncNodeOnSnapshotReceive(SSyncNode *pSyncNode, SyncSnapshotSend **ppMsg) {
85,299✔
956
  // condition 4
957
  // transfering
958
  SyncSnapshotSend *pMsg = ppMsg[0];
85,299✔
959
  if (!pMsg) return TSDB_CODE_SYN_INTERNAL_ERROR;
85,299!
960
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
85,299✔
961
  int64_t                timeNow = taosGetTimestampMs();
85,299✔
962
  int32_t                code = 0;
85,299✔
963

964
  if (snapshotReceiverSignatureCmp(pReceiver, pMsg) != 0) {
85,299!
965
    code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
966
    sRError(pReceiver, "failed to receive snapshot data since %s.", tstrerror(code));
×
967
    return syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, code);
×
968
  }
969

970
  return syncSnapBufferRecv(pReceiver, ppMsg);
85,299✔
971
}
972

973
static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
105✔
974
  // condition 2
975
  // end, finish FSM
976
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
105✔
977
  int64_t                timeNow = taosGetTimestampMs();
105✔
978
  int32_t                code = 0;
105✔
979

980
  if (snapshotReceiverSignatureCmp(pReceiver, pMsg) != 0) {
105!
981
    sRError(pReceiver, "snapshot end failed since startTime:%" PRId64 " not equal to msg startTime:%" PRId64,
×
982
            pReceiver->startTime, pMsg->startTime);
983
    code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
984
    goto _SEND_REPLY;
×
985
  }
986

987
  code = snapshotReceiverFinish(pReceiver, pMsg);
105✔
988
  if (code == 0) {
105!
989
    snapshotReceiverStop(pReceiver);
105✔
990
  }
991

992
_SEND_REPLY:;
×
993

994
  // build msg
995
  SRpcMsg rpcMsg = {0};
105✔
996
  if ((code = syncBuildSnapshotSendRsp(&rpcMsg, 0, pSyncNode->vgId)) != 0) {
105!
997
    sRError(pReceiver, "snapshot receiver build rsp failed since %s", tstrerror(code));
×
998
    TAOS_RETURN(code);
×
999
  }
1000

1001
  SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
105✔
1002
  pRspMsg->srcId = pSyncNode->myRaftId;
105✔
1003
  pRspMsg->destId = pMsg->srcId;
105✔
1004
  pRspMsg->term = raftStoreGetTerm(pSyncNode);
105✔
1005
  pRspMsg->lastIndex = pMsg->lastIndex;
105✔
1006
  pRspMsg->lastTerm = pMsg->lastTerm;
105✔
1007
  pRspMsg->startTime = pMsg->startTime;
105✔
1008
  pRspMsg->ack = pReceiver->ack;  // receiver maybe already closed
105✔
1009
  pRspMsg->code = code;
105✔
1010
  pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;
105✔
1011

1012
  // send msg
1013
  syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver end");
105✔
1014
  if ((code = syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg)) != 0) {
105!
1015
    sRError(pReceiver, "snapshot receiver send rsp failed since %s", tstrerror(code));
×
1016
    TAOS_RETURN(code);
×
1017
  }
1018

1019
  TAOS_RETURN(code);
105✔
1020
}
1021

1022
int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
85,614✔
1023
  SyncSnapshotSend     **ppMsg = (SyncSnapshotSend **)&pRpcMsg->pCont;
85,614✔
1024
  SyncSnapshotSend      *pMsg = ppMsg[0];
85,614✔
1025
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
85,614✔
1026
  int32_t                code = 0;
85,614✔
1027

1028
  // if already drop replica, do not process
1029
  if (!syncNodeInRaftGroup(pSyncNode, &pMsg->srcId)) {
85,614!
1030
    syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "not in my config");
×
1031
    code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
1032
    TAOS_RETURN(code);
×
1033
  }
1034

1035
  if (pMsg->term < raftStoreGetTerm(pSyncNode)) {
85,614!
1036
    sRError(pReceiver, "reject snap replication with smaller term. msg term:%" PRId64 ", seq:%d", pMsg->term,
×
1037
            pMsg->seq);
1038
    code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
1039
    if (syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, code) != 0) sError("failed to send snap rsp");
×
1040
    TAOS_RETURN(code);
×
1041
  }
1042

1043
  if (pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole != TAOS_SYNC_ROLE_LEARNER) {
85,614✔
1044
    if (pMsg->term > raftStoreGetTerm(pSyncNode)) {
592!
1045
      syncNodeStepDown(pSyncNode, pMsg->term);
×
1046
    }
1047
  } else {
1048
    syncNodeUpdateTermWithoutStepDown(pSyncNode, pMsg->term);
85,022✔
1049
  }
1050

1051
  if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER && pSyncNode->state != TAOS_SYNC_STATE_LEARNER) {
85,614!
1052
    sRError(pReceiver, "snapshot receiver not a follower or learner");
×
1053
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1054
    TAOS_RETURN(code);
×
1055
  }
1056

1057
  if (pMsg->seq < SYNC_SNAPSHOT_SEQ_PREP || pMsg->seq > SYNC_SNAPSHOT_SEQ_END) {
85,614!
1058
    sRError(pReceiver, "snap replication msg with invalid seq:%d", pMsg->seq);
×
1059
    code = TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG;
×
1060
    TAOS_RETURN(code);
×
1061
  }
1062

1063
  // prepare
1064
  if (pMsg->seq == SYNC_SNAPSHOT_SEQ_PREP) {
85,614✔
1065
    sInfo("vgId:%d, prepare snap replication. msg signature:(%" PRId64 ", %" PRId64 ")", pSyncNode->vgId, pMsg->term,
105!
1066
          pMsg->startTime);
1067
    code = syncNodeOnSnapshotPrep(pSyncNode, pMsg);
105✔
1068
    goto _out;
105✔
1069
  }
1070

1071
  // begin
1072
  if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) {
85,509✔
1073
    sInfo("vgId:%d, begin snap replication. msg signature:(%" PRId64 ", %" PRId64 ")", pSyncNode->vgId, pMsg->term,
105!
1074
          pMsg->startTime);
1075
    code = syncNodeOnSnapshotBegin(pSyncNode, pMsg);
105✔
1076
    goto _out;
105✔
1077
  }
1078

1079
  // data
1080
  if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
85,404!
1081
    code = syncNodeOnSnapshotReceive(pSyncNode, ppMsg);
85,299✔
1082
    goto _out;
85,299✔
1083
  }
1084

1085
  // end
1086
  if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
105!
1087
    sInfo("vgId:%d, end snap replication. msg signature:(%" PRId64 ", %" PRId64 ")", pSyncNode->vgId, pMsg->term,
105!
1088
          pMsg->startTime);
1089
    code = syncNodeOnSnapshotEnd(pSyncNode, pMsg);
105✔
1090
    if (code != 0) {
105!
1091
      sRError(pReceiver, "failed to end snapshot.");
×
1092
      goto _out;
×
1093
    }
1094

1095
    code = syncLogBufferReInit(pSyncNode->pLogBuf, pSyncNode);
105✔
1096
    if (code != 0) {
105!
1097
      sRError(pReceiver, "failed to reinit log buffer since %s", tstrerror(code));
×
1098
    }
1099
    goto _out;
105✔
1100
  }
1101

1102
_out:;
×
1103
  syncNodeResetElectTimer(pSyncNode);
85,614✔
1104
  TAOS_RETURN(code);
85,614✔
1105
}
1106

1107
static int32_t syncSnapSenderExchgSnapInfo(SSyncNode *pSyncNode, SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
106✔
1108
  if (pMsg->payloadType != TDMT_SYNC_PREP_SNAPSHOT_REPLY) return TSDB_CODE_SYN_INTERNAL_ERROR;
106!
1109

1110
  SSyncTLV *datHead = (void *)pMsg->data;
106✔
1111
  if (datHead->typ != pMsg->payloadType) {
106!
1112
    sSError(pSender, "unexpected data type in data of SyncSnapshotRsp. typ: %d", datHead->typ);
×
1113
    TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT);
×
1114
  }
1115
  int32_t dataLen = sizeof(SSyncTLV) + datHead->len;
106✔
1116

1117
  SSnapshotParam *pParam = &pSender->snapshotParam;
106✔
1118
  void           *data = taosMemoryRealloc(pParam->data, dataLen);
106✔
1119
  if (data == NULL) {
106!
1120
    TAOS_RETURN(terrno);
×
1121
  }
1122
  (void)memcpy(data, pMsg->data, dataLen);
106✔
1123

1124
  pParam->data = data;
106✔
1125
  data = NULL;
106✔
1126
  sSInfo(pSender, "data of snapshot param. len: %d", datHead->len);
106!
1127
  return 0;
106✔
1128
}
1129

1130
// sender
1131
static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
106✔
1132
  int32_t   code = 0;
106✔
1133
  SSnapshot snapshot = {0};
106✔
1134

1135
  if (pMsg->snapBeginIndex > pSyncNode->commitIndex) {
106!
1136
    sSError(pSender,
×
1137
            "snapshot begin index is greater than commit index. snapBeginIndex:%" PRId64 ", commitIndex:%" PRId64,
1138
            pMsg->snapBeginIndex, pSyncNode->commitIndex);
1139
    TAOS_RETURN(TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG);
×
1140
  }
1141

1142
  (void)taosThreadMutexLock(&pSender->pSndBuf->mutex);
106✔
1143
  TAOS_CHECK_GOTO(pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot), NULL, _out);
106!
1144

1145
  // prepare <begin, end>
1146
  pSender->snapshotParam.start = pMsg->snapBeginIndex;
106✔
1147
  pSender->snapshotParam.end = snapshot.lastApplyIndex;
106✔
1148

1149
  sSInfo(pSender, "prepare snapshot, recv-begin:%" PRId64 ", snapshot.last:%" PRId64 ", snapshot.term:%" PRId64,
106!
1150
         pMsg->snapBeginIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
1151

1152
  // update sender
1153
  pSender->snapshot = snapshot;
106✔
1154

1155
  // start reader
1156
  if (pMsg->payloadType == TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
106!
1157
    TAOS_CHECK_GOTO(syncSnapSenderExchgSnapInfo(pSyncNode, pSender, pMsg), NULL, _out);
106!
1158
  }
1159

1160
  code = pSyncNode->pFsm->FpSnapshotStartRead(pSyncNode->pFsm, &pSender->snapshotParam, &pSender->pReader);
106✔
1161
  if (code != 0) {
106!
1162
    sSError(pSender, "prepare snapshot failed since %s", tstrerror(code));
×
1163
    goto _out;
×
1164
  }
1165

1166
  // update next index
1167
  syncIndexMgrSetIndex(pSyncNode->pNextIndex, &pMsg->srcId, snapshot.lastApplyIndex + 1);
106✔
1168

1169
  code = snapshotSend(pSender);
106✔
1170

1171
_out:
106✔
1172
  (void)taosThreadMutexUnlock(&pSender->pSndBuf->mutex);
106✔
1173
  TAOS_RETURN(code);
106✔
1174
}
1175

1176
static int32_t snapshotSenderSignatureCmp(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
171,080✔
1177
  if (pSender->term < pMsg->term) return -1;
171,080!
1178
  if (pSender->term > pMsg->term) return 1;
171,080!
1179
  if (pSender->startTime < pMsg->startTime) return -1;
171,080!
1180
  if (pSender->startTime > pMsg->startTime) return 1;
171,080!
1181
  return 0;
171,080✔
1182
}
1183

1184
static int32_t syncSnapBufferSend(SSyncSnapshotSender *pSender, SyncSnapshotRsp **ppMsg) {
85,434✔
1185
  int32_t          code = 0;
85,434✔
1186
  SSyncSnapBuffer *pSndBuf = pSender->pSndBuf;
85,434✔
1187
  SyncSnapshotRsp *pMsg = ppMsg[0];
85,434✔
1188

1189
  (void)taosThreadMutexLock(&pSndBuf->mutex);
85,434✔
1190
  if (snapshotSenderSignatureCmp(pSender, pMsg) != 0) {
85,434!
1191
    code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
1192
    goto _out;
×
1193
  }
1194

1195
  if (pSender->pReader == NULL || pSender->finish || !snapshotSenderIsStart(pSender)) {
85,434!
1196
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1197
    goto _out;
×
1198
  }
1199

1200
  if (pMsg->ack - pSndBuf->start >= pSndBuf->size) {
85,434!
1201
    code = TSDB_CODE_SYN_BUFFER_FULL;
×
1202
    goto _out;
×
1203
  }
1204

1205
  if (!(pSndBuf->start <= pSndBuf->cursor + 1 && pSndBuf->cursor < pSndBuf->end)) {
85,434!
1206
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1207
    goto _out;
×
1208
  }
1209

1210
  if (pMsg->ack > pSndBuf->cursor && pMsg->ack < pSndBuf->end) {
85,434!
1211
    SyncSnapBlock *pBlk = pSndBuf->entries[pMsg->ack % pSndBuf->size];
85,328✔
1212
    if (!pBlk) {
85,328!
1213
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1214
      goto _out;
×
1215
    }
1216
    pBlk->acked = 1;
85,328✔
1217
  }
1218

1219
  for (int64_t ack = pSndBuf->cursor + 1; ack < pSndBuf->end; ++ack) {
170,762✔
1220
    SyncSnapBlock *pBlk = pSndBuf->entries[ack % pSndBuf->size];
170,550✔
1221
    if (pBlk->acked) {
170,550✔
1222
      pSndBuf->cursor = ack;
85,328✔
1223
    } else {
1224
      break;
85,222✔
1225
    }
1226
  }
1227

1228
  for (int64_t ack = pSndBuf->start; ack <= pSndBuf->cursor; ++ack) {
170,762✔
1229
    pSndBuf->entryDeleteCb(pSndBuf->entries[ack % pSndBuf->size]);
85,328✔
1230
    pSndBuf->entries[ack % pSndBuf->size] = NULL;
85,328✔
1231
    pSndBuf->start = ack + 1;
85,328✔
1232
  }
1233

1234
  while (pSender->seq != SYNC_SNAPSHOT_SEQ_END && pSender->seq - pSndBuf->start < tsSnapReplMaxWaitN) {
170,868✔
1235
    if ((code = snapshotSend(pSender)) != 0) {
85,434!
1236
      goto _out;
×
1237
    }
1238
  }
1239

1240
  if (pSender->seq == SYNC_SNAPSHOT_SEQ_END && pSndBuf->end <= pSndBuf->start) {
85,434✔
1241
    if ((code = snapshotSend(pSender)) != 0) {
106!
1242
      goto _out;
×
1243
    }
1244
  }
1245
_out:
85,434✔
1246
  (void)taosThreadMutexUnlock(&pSndBuf->mutex);
85,434✔
1247
  TAOS_RETURN(code);
85,434✔
1248
}
1249

1250
int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
85,646✔
1251
  SyncSnapshotRsp **ppMsg = (SyncSnapshotRsp **)&pRpcMsg->pCont;
85,646✔
1252
  SyncSnapshotRsp  *pMsg = ppMsg[0];
85,646✔
1253
  int32_t           code = 0;
85,646✔
1254

1255
  // if already drop replica, do not process
1256
  if (!syncNodeInRaftGroup(pSyncNode, &pMsg->srcId)) {
85,646!
1257
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "maybe replica already dropped");
×
1258
    TAOS_RETURN(TSDB_CODE_SYN_MISMATCHED_SIGNATURE);
×
1259
  }
1260

1261
  // get sender
1262
  SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, &pMsg->srcId);
85,646✔
1263
  if (pSender == NULL) {
85,646!
1264
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "sender is null");
×
1265
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
1266
  }
1267

1268
  if (!snapshotSenderIsStart(pSender)) {
85,646!
1269
    sSError(pSender, "snapshot sender stopped. sender startTime:%" PRId64 ", msg startTime:%" PRId64,
×
1270
            pSender->startTime, pMsg->startTime);
1271
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
1272
  }
1273

1274
  // check signature
1275
  int32_t order = 0;
85,646✔
1276
  if ((order = snapshotSenderSignatureCmp(pSender, pMsg)) > 0) {
85,646!
1277
    sSWarn(pSender, "ignore a stale snap rsp, msg signature:(%" PRId64 ", %" PRId64 ").", pMsg->term, pMsg->startTime);
×
1278
    TAOS_RETURN(TSDB_CODE_SYN_MISMATCHED_SIGNATURE);
×
1279
  } else if (order < 0) {
85,646!
1280
    sSError(pSender, "snapshot sender is stale. stop");
×
1281
    code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
1282
    goto _ERROR;
×
1283
  }
1284

1285
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
85,646!
1286
    sSError(pSender, "snapshot sender not leader");
×
1287
    code = TSDB_CODE_SYN_NOT_LEADER;
×
1288
    goto _ERROR;
×
1289
  }
1290

1291
  SyncTerm currentTerm = raftStoreGetTerm(pSyncNode);
85,646✔
1292
  if (pMsg->term != currentTerm) {
85,646!
1293
    sSError(pSender, "snapshot sender term mismatch, msg term:%" PRId64 " currentTerm:%" PRId64, pMsg->term,
×
1294
            currentTerm);
1295
    code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
1296
    goto _ERROR;
×
1297
  }
1298

1299
  if (pMsg->code != 0) {
85,646!
1300
    sSError(pSender, "snapshot sender receive error:%s 0x%x and stop sender", tstrerror(pMsg->code), pMsg->code);
×
1301
    code = pMsg->code;
×
1302
    goto _ERROR;
×
1303
  }
1304

1305
  // send begin
1306
  if (pMsg->ack == SYNC_SNAPSHOT_SEQ_PREP) {
85,646✔
1307
    sSInfo(pSender, "process prepare rsp");
106!
1308
    if ((code = syncNodeOnSnapshotPrepRsp(pSyncNode, pSender, pMsg)) != 0) {
106!
1309
      goto _ERROR;
×
1310
    }
1311
  }
1312

1313
  // send msg of data or end
1314
  if (pMsg->ack >= SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->ack < SYNC_SNAPSHOT_SEQ_END) {
85,646✔
1315
    if ((code = syncSnapBufferSend(pSender, ppMsg)) != 0) {
85,434!
1316
      sSError(pSender, "failed to replicate snap since %s. seq:%d, pReader:%p, finish:%d", tstrerror(code),
×
1317
              pSender->seq, pSender->pReader, pSender->finish);
1318
      goto _ERROR;
×
1319
    }
1320
  }
1321

1322
  // end
1323
  if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
85,646✔
1324
    sSInfo(pSender, "process end rsp");
106!
1325
    snapshotSenderStop(pSender, true);
106✔
1326
    TAOS_CHECK_GOTO(syncNodeReplicateReset(pSyncNode, &pMsg->srcId), NULL, _ERROR);
106!
1327
  }
1328

1329
  return 0;
85,646✔
1330

1331
_ERROR:
×
1332
  snapshotSenderStop(pSender, false);
×
1333
  if (syncNodeReplicateReset(pSyncNode, &pMsg->srcId) != 0) sError("failed to reset replicate");
×
1334
  TAOS_RETURN(code);
×
1335
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc