• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3531

19 Nov 2024 10:42AM UTC coverage: 60.213% (-0.006%) from 60.219%
#3531

push

travis-ci

web-flow
Merge pull request #28777 from taosdata/fix/3.0/TD-32366

fix:TD-32366/stmt add geometry datatype check

118529 of 252344 branches covered (46.97%)

Branch coverage included in aggregate %.

7 of 48 new or added lines in 3 files covered. (14.58%)

2282 existing lines in 115 files now uncovered.

199096 of 275161 relevant lines covered (72.36%)

6067577.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.17
/source/libs/sync/src/syncSnapshot.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "syncSnapshot.h"
18
#include "syncIndexMgr.h"
19
#include "syncPipeline.h"
20
#include "syncRaftCfg.h"
21
#include "syncRaftLog.h"
22
#include "syncRaftStore.h"
23
#include "syncReplication.h"
24
#include "syncUtil.h"
25
#include "tglobal.h"
26

27
static SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths);
28

29
static void syncSnapBufferReset(SSyncSnapBuffer *pBuf) {
462,426✔
30
  for (int64_t i = pBuf->start; i < pBuf->end; ++i) {
462,426!
31
    if (pBuf->entryDeleteCb) {
×
32
      pBuf->entryDeleteCb(pBuf->entries[i % pBuf->size]);
×
33
    }
34
    pBuf->entries[i % pBuf->size] = NULL;
×
35
  }
36
  pBuf->start = SYNC_SNAPSHOT_SEQ_BEGIN + 1;
462,426✔
37
  pBuf->end = pBuf->start;
462,426✔
38
  pBuf->cursor = pBuf->start - 1;
462,426✔
39
}
462,426✔
40

41
static void syncSnapBufferDestroy(SSyncSnapBuffer **ppBuf) {
231,145✔
42
  if (ppBuf == NULL || ppBuf[0] == NULL) return;
231,145!
43
  SSyncSnapBuffer *pBuf = ppBuf[0];
231,146✔
44

45
  syncSnapBufferReset(pBuf);
231,146✔
46

47
  (void)taosThreadMutexDestroy(&pBuf->mutex);
231,129✔
48
  taosMemoryFree(ppBuf[0]);
231,123✔
49
  ppBuf[0] = NULL;
231,092✔
50
  return;
231,092✔
51
}
52

53
static int32_t syncSnapBufferCreate(SSyncSnapBuffer **ppBuf) {
231,166✔
54
  SSyncSnapBuffer *pBuf = taosMemoryCalloc(1, sizeof(SSyncSnapBuffer));
231,166✔
55
  if (pBuf == NULL) {
231,139!
UNCOV
56
    *ppBuf = NULL;
×
UNCOV
57
    TAOS_RETURN(terrno);
×
58
  }
59
  pBuf->size = sizeof(pBuf->entries) / sizeof(void *);
231,143✔
60
  if (pBuf->size != TSDB_SYNC_SNAP_BUFFER_SIZE) return TSDB_CODE_SYN_INTERNAL_ERROR;
231,143!
61
  (void)taosThreadMutexInit(&pBuf->mutex, NULL);
231,143✔
62
  *ppBuf = pBuf;
231,128✔
63
  TAOS_RETURN(0);
231,128✔
64
}
65

66
int32_t snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex, SSyncSnapshotSender **ppSender) {
216,853✔
67
  int32_t code = 0;
216,853✔
68
  *ppSender = NULL;
216,853✔
69
  bool condition = (pSyncNode->pFsm->FpSnapshotStartRead != NULL) && (pSyncNode->pFsm->FpSnapshotStopRead != NULL) &&
433,719!
70
                   (pSyncNode->pFsm->FpSnapshotDoRead != NULL);
216,866!
71
  if (!condition) {
216,853!
72
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
73
  }
74

75
  SSyncSnapshotSender *pSender = taosMemoryCalloc(1, sizeof(SSyncSnapshotSender));
216,853✔
76
  if (pSender == NULL) {
216,851!
77
    TAOS_RETURN(terrno);
×
78
  }
79

80
  pSender->start = false;
216,851✔
81
  pSender->seq = SYNC_SNAPSHOT_SEQ_INVALID;
216,851✔
82
  pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
216,851✔
83
  pSender->pReader = NULL;
216,851✔
84
  pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS;
216,851✔
85
  pSender->pSyncNode = pSyncNode;
216,851✔
86
  pSender->replicaIndex = replicaIndex;
216,851✔
87
  pSender->term = raftStoreGetTerm(pSyncNode);
216,851✔
88
  pSender->startTime = -1;
216,971✔
89
  pSender->finish = false;
216,971✔
90

91
  code = pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &pSender->snapshot);
216,971✔
92
  if (code != 0) {
216,958!
93
    taosMemoryFreeClear(pSender);
×
94
    TAOS_RETURN(code);
×
95
  }
96
  SSyncSnapBuffer *pSndBuf = NULL;
216,958✔
97
  code = syncSnapBufferCreate(&pSndBuf);
216,958✔
98
  if (pSndBuf == NULL) {
216,908!
99
    taosMemoryFreeClear(pSender);
×
100
    TAOS_RETURN(code);
×
101
  }
102
  pSndBuf->entryDeleteCb = syncSnapBlockDestroy;
216,908✔
103
  pSender->pSndBuf = pSndBuf;
216,908✔
104

105
  syncSnapBufferReset(pSender->pSndBuf);
216,908✔
106
  *ppSender = pSender;
216,905✔
107
  TAOS_RETURN(code);
216,905✔
108
}
109

110
void syncSnapBlockDestroy(void *ptr) {
2,328✔
111
  SyncSnapBlock *pBlk = ptr;
2,328✔
112
  if (pBlk->pBlock != NULL) {
2,328✔
113
    taosMemoryFree(pBlk->pBlock);
2,243✔
114
    pBlk->pBlock = NULL;
2,243✔
115
    pBlk->blockLen = 0;
2,243✔
116
  }
117
  taosMemoryFree(pBlk);
2,328✔
118
}
2,328✔
119

120
static int32_t snapshotSenderClearInfoData(SSyncSnapshotSender *pSender) {
216,964✔
121
  if (pSender->snapshotParam.data) {
216,964✔
122
    taosMemoryFree(pSender->snapshotParam.data);
85✔
123
    pSender->snapshotParam.data = NULL;
85✔
124
  }
125

126
  if (pSender->snapshot.data) {
216,964!
127
    taosMemoryFree(pSender->snapshot.data);
×
128
    pSender->snapshot.data = NULL;
×
129
  }
130
  return 0;
216,964✔
131
}
132

133
void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
216,914✔
134
  if (pSender == NULL) return;
216,914!
135

136
  // close reader
137
  if (pSender->pReader != NULL) {
216,914!
138
    pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
×
139
    pSender->pReader = NULL;
×
140
  }
141

142
  // free snap buffer
143
  if (pSender->pSndBuf) {
216,914!
144
    syncSnapBufferDestroy(&pSender->pSndBuf);
216,919✔
145
  }
146

147
  (void)snapshotSenderClearInfoData(pSender);
216,874✔
148

149
  // free sender
150
  taosMemoryFree(pSender);
216,880✔
151
}
152

153
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return atomic_load_8(&pSender->start); }
218,238✔
154

155
int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
85✔
156
  int32_t code = 0;
85✔
157

158
  int8_t started = atomic_val_compare_exchange_8(&pSender->start, false, true);
85✔
159
  if (started) return 0;
85!
160

161
  pSender->seq = SYNC_SNAPSHOT_SEQ_PREP;
85✔
162
  pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
85✔
163
  pSender->pReader = NULL;
85✔
164
  pSender->snapshotParam.start = SYNC_INDEX_INVALID;
85✔
165
  pSender->snapshotParam.end = SYNC_INDEX_INVALID;
85✔
166
  pSender->snapshot.data = NULL;
85✔
167
  pSender->snapshotParam.end = SYNC_INDEX_INVALID;
85✔
168
  pSender->snapshot.lastApplyIndex = SYNC_INDEX_INVALID;
85✔
169
  pSender->snapshot.lastApplyTerm = SYNC_TERM_INVALID;
85✔
170
  pSender->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;
85✔
171

172
  (void)memset(&pSender->lastConfig, 0, sizeof(pSender->lastConfig));
85✔
173
  pSender->sendingMS = 0;
85✔
174
  pSender->term = raftStoreGetTerm(pSender->pSyncNode);
85✔
175
  pSender->startTime = taosGetMonoTimestampMs();
85✔
176
  pSender->lastSendTime = taosGetTimestampMs();
85✔
177
  pSender->finish = false;
85✔
178

179
  // Get snapshot info
180
  SSyncNode *pSyncNode = pSender->pSyncNode;
85✔
181
  SSnapshot  snapInfo = {.type = TDMT_SYNC_PREP_SNAPSHOT};
85✔
182
  if ((code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapInfo)) != 0) {
85!
183
    sSError(pSender, "snapshot get info failure since %s", tstrerror(code));
×
184
    goto _out;
×
185
  }
186

187
  void   *pData = snapInfo.data;
85✔
188
  int32_t type = (pData) ? snapInfo.type : 0;
85!
189
  int32_t dataLen = 0;
85✔
190
  if (pData) {
85!
191
    SSyncTLV *datHead = pData;
85✔
192
    if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT) {
85!
193
      sSError(pSender, "unexpected data typ in data of snapshot info. typ: %d", datHead->typ);
×
194
      code = TSDB_CODE_INVALID_DATA_FMT;
×
195
      goto _out;
×
196
    }
197
    dataLen = sizeof(SSyncTLV) + datHead->len;
85✔
198
  }
199

200
  if ((code = syncSnapSendMsg(pSender, pSender->seq, pData, dataLen, type)) != 0) {
85!
201
    goto _out;
×
202
  }
203

204
  SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
85✔
205
  sSInfo(pSender, "snapshot sender start, to dnode:%d.", DID(&destId));
85!
206
_out:
×
207
  if (snapInfo.data) {
85!
208
    taosMemoryFree(snapInfo.data);
85✔
209
    snapInfo.data = NULL;
85✔
210
  }
211
  TAOS_RETURN(code);
85✔
212
}
213

214
void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
85✔
215
  sSDebug(pSender, "snapshot sender stop, finish:%d reader:%p", finish, pSender->pReader);
85!
216

217
  // update flag
218
  int8_t stopped = !atomic_val_compare_exchange_8(&pSender->start, true, false);
85✔
219
  if (stopped) return;
85!
220
  (void)taosThreadMutexLock(&pSender->pSndBuf->mutex);
85✔
221
  {
222
    pSender->finish = finish;
85✔
223

224
    // close reader
225
    if (pSender->pReader != NULL) {
85!
226
      pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
85✔
227
      pSender->pReader = NULL;
85✔
228
    }
229

230
    syncSnapBufferReset(pSender->pSndBuf);
85✔
231

232
    (void)snapshotSenderClearInfoData(pSender);
85✔
233

234
    SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
85✔
235
    sSInfo(pSender, "snapshot sender stop, to dnode:%d, finish:%d", DID(&destId), finish);
85!
236
  }
237
  (void)taosThreadMutexUnlock(&pSender->pSndBuf->mutex);
85✔
238
}
239

240
int32_t syncSnapSendMsg(SSyncSnapshotSender *pSender, int32_t seq, void *pBlock, int32_t blockLen, int32_t typ) {
2,498✔
241
  int32_t code = 0;
2,498✔
242
  SRpcMsg rpcMsg = {0};
2,498✔
243

244
  if ((code = syncBuildSnapshotSend(&rpcMsg, blockLen, pSender->pSyncNode->vgId)) != 0) {
2,498!
245
    sSError(pSender, "failed to build snap replication msg since %s", tstrerror(code));
×
246
    goto _OUT;
×
247
  }
248

249
  SyncSnapshotSend *pMsg = rpcMsg.pCont;
2,498✔
250
  pMsg->srcId = pSender->pSyncNode->myRaftId;
2,498✔
251
  pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
2,498✔
252
  pMsg->term = pSender->term;
2,498✔
253
  pMsg->beginIndex = pSender->snapshotParam.start;
2,498✔
254
  pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
2,498✔
255
  pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
2,498✔
256
  pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
2,498✔
257
  pMsg->lastConfig = pSender->lastConfig;
2,498✔
258
  pMsg->startTime = pSender->startTime;
2,498✔
259
  pMsg->seq = seq;
2,498✔
260

261
  if (pBlock != NULL && blockLen > 0) {
2,498!
262
    (void)memcpy(pMsg->data, pBlock, blockLen);
2,328✔
263
  }
264
  pMsg->payloadType = typ;
2,498✔
265

266
  // send msg
267
  if ((code = syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg)) != 0) {
2,498!
268
    sSError(pSender, "failed to send snap replication msg since %s. seq:%d", tstrerror(code), seq);
×
269
    goto _OUT;
×
270
  }
271

272
_OUT:
2,498✔
273
  TAOS_RETURN(code);
2,498✔
274
}
275

276
// when sender receive ack, call this function to send msg from seq
277
// seq = ack + 1, already updated
278
static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
2,498✔
279
  int32_t        code = 0;
2,498✔
280
  SyncSnapBlock *pBlk = NULL;
2,498✔
281

282
  if (pSender->seq < SYNC_SNAPSHOT_SEQ_END) {
2,498✔
283
    pSender->seq++;
2,413✔
284

285
    if (pSender->seq > SYNC_SNAPSHOT_SEQ_BEGIN) {
2,413✔
286
      pBlk = taosMemoryCalloc(1, sizeof(SyncSnapBlock));
2,328✔
287
      if (pBlk == NULL) {
2,328!
288
        code = terrno;
×
289
        goto _OUT;
×
290
      }
291

292
      pBlk->seq = pSender->seq;
2,328✔
293

294
      // read data
295
      code = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader, &pBlk->pBlock,
2,328✔
296
                                                        &pBlk->blockLen);
297
      if (code != 0) {
2,328!
298
        sSError(pSender, "snapshot sender read failed since %s", tstrerror(code));
×
299
        goto _OUT;
×
300
      }
301

302
      if (pBlk->blockLen > 0) {
2,328✔
303
        // has read data
304
        sSDebug(pSender, "snapshot sender continue to read, blockLen:%d seq:%d", pBlk->blockLen, pBlk->seq);
2,243!
305
      } else {
306
        // read finish, update seq to end
307
        pSender->seq = SYNC_SNAPSHOT_SEQ_END;
85✔
308
        sSInfo(pSender, "snapshot sender read to the end");
85!
309
        goto _OUT;
85✔
310
      }
311
    }
312
  }
313

314
  if (!(pSender->seq >= SYNC_SNAPSHOT_SEQ_BEGIN && pSender->seq <= SYNC_SNAPSHOT_SEQ_END)) {
2,413!
315
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
316
    goto _OUT;
×
317
  }
318

319
  // send msg
320
  int32_t blockLen = (pBlk) ? pBlk->blockLen : 0;
2,413✔
321
  void   *pBlock = (pBlk) ? pBlk->pBlock : NULL;
2,413✔
322
  if ((code = syncSnapSendMsg(pSender, pSender->seq, pBlock, blockLen, 0)) != 0) {
2,413!
323
    goto _OUT;
×
324
  }
325

326
  // put in buffer
327
  int64_t nowMs = taosGetTimestampMs();
2,413✔
328
  if (pBlk) {
2,413✔
329
    if (!(pBlk->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pBlk->seq < SYNC_SNAPSHOT_SEQ_END)) {
2,243!
330
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
331
      goto _OUT;
×
332
    }
333
    pBlk->sendTimeMs = nowMs;
2,243✔
334
    pSender->pSndBuf->entries[pSender->seq % pSender->pSndBuf->size] = pBlk;
2,243✔
335
    pBlk = NULL;
2,243✔
336
    pSender->pSndBuf->end = TMAX(pSender->seq + 1, pSender->pSndBuf->end);
2,243✔
337
  }
338
  pSender->lastSendTime = nowMs;
2,413✔
339

340
_OUT:;
2,498✔
341
  if (pBlk != NULL) {
2,498✔
342
    syncSnapBlockDestroy(pBlk);
85✔
343
    pBlk = NULL;
85✔
344
  }
345
  TAOS_RETURN(code);
2,498✔
346
}
347

348
// send snapshot data from cache
349
int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
×
350
  SSyncSnapBuffer *pSndBuf = pSender->pSndBuf;
×
351
  int32_t          code = 0;
×
352
  (void)taosThreadMutexLock(&pSndBuf->mutex);
×
353
  if (pSender->pReader == NULL || pSender->finish || !snapshotSenderIsStart(pSender)) {
×
354
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
355
    goto _out;
×
356
  }
357

358
  for (int32_t seq = pSndBuf->cursor + 1; seq < pSndBuf->end; ++seq) {
×
359
    SyncSnapBlock *pBlk = pSndBuf->entries[seq % pSndBuf->size];
×
360
    if (!pBlk) {
×
361
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
362
      goto _out;
×
363
    }
364
    int64_t nowMs = taosGetTimestampMs();
×
365
    if (pBlk->acked || nowMs < pBlk->sendTimeMs + SYNC_SNAP_RESEND_MS) {
×
366
      continue;
×
367
    }
368
    if ((code = syncSnapSendMsg(pSender, pBlk->seq, pBlk->pBlock, pBlk->blockLen, 0)) != 0) {
×
369
      goto _out;
×
370
    }
371
    pBlk->sendTimeMs = nowMs;
×
372
  }
373

374
  if (pSender->seq != SYNC_SNAPSHOT_SEQ_END && pSndBuf->end <= pSndBuf->start) {
×
375
    if ((code = snapshotSend(pSender)) != 0) {
×
376
      goto _out;
×
377
    }
378
  }
379

380
  if (pSender->seq == SYNC_SNAPSHOT_SEQ_END && pSndBuf->end <= pSndBuf->start) {
×
381
    if ((code = syncSnapSendMsg(pSender, pSender->seq, NULL, 0, 0)) != 0) {
×
382
      goto _out;
×
383
    }
384
  }
385
_out:;
×
386
  (void)taosThreadMutexUnlock(&pSndBuf->mutex);
×
387
  TAOS_RETURN(code);
×
388
}
389

390
int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) {
85✔
391
  SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, pDestId);
85✔
392
  if (pSender == NULL) {
85!
393
    sNError(pSyncNode, "snapshot sender start error since get failed");
×
394
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
395
  }
396

397
  if (snapshotSenderIsStart(pSender)) {
85!
398
    sSDebug(pSender, "snapshot sender already start, ignore");
×
399
    return 0;
×
400
  }
401

402
  taosMsleep(1);
85✔
403

404
  int32_t code = snapshotSenderStart(pSender);
85✔
405
  if (code != 0) {
85!
406
    sSError(pSender, "snapshot sender start error since %s", tstrerror(code));
×
407
    TAOS_RETURN(code);
×
408
  }
409

410
  return 0;
85✔
411
}
412

413
// receiver
414
int32_t snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId, SSyncSnapshotReceiver **ppReceiver) {
14,212✔
415
  int32_t code = 0;
14,212✔
416
  *ppReceiver = NULL;
14,212✔
417
  bool condition = (pSyncNode->pFsm->FpSnapshotStartWrite != NULL) && (pSyncNode->pFsm->FpSnapshotStopWrite != NULL) &&
28,437!
418
                   (pSyncNode->pFsm->FpSnapshotDoWrite != NULL);
14,225!
419
  if (!condition) {
14,212!
420
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
421
  }
422

423
  SSyncSnapshotReceiver *pReceiver = taosMemoryCalloc(1, sizeof(SSyncSnapshotReceiver));
14,212✔
424
  if (pReceiver == NULL) {
14,225!
425
    TAOS_RETURN(terrno);
×
426
  }
427

428
  pReceiver->start = false;
14,225✔
429
  pReceiver->startTime = 0;
14,225✔
430
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
14,225✔
431
  pReceiver->pWriter = NULL;
14,225✔
432
  code = taosThreadMutexInit(&pReceiver->writerMutex, NULL);
14,225✔
433
  if (code != 0) {
14,225!
434
    taosMemoryFree(pReceiver);
×
435
    pReceiver = NULL;
×
436
    TAOS_RETURN(code);
×
437
  }
438
  pReceiver->pSyncNode = pSyncNode;
14,225✔
439
  pReceiver->fromId = fromId;
14,225✔
440
  pReceiver->term = raftStoreGetTerm(pSyncNode);
14,225✔
441
  pReceiver->snapshot.data = NULL;
14,227✔
442
  pReceiver->snapshot.lastApplyIndex = SYNC_INDEX_INVALID;
14,227✔
443
  pReceiver->snapshot.lastApplyTerm = 0;
14,227✔
444
  pReceiver->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;
14,227✔
445

446
  SSyncSnapBuffer *pRcvBuf = NULL;
14,227✔
447
  code = syncSnapBufferCreate(&pRcvBuf);
14,227✔
448
  if (pRcvBuf == NULL) {
14,226!
449
    int32_t ret = taosThreadMutexDestroy(&pReceiver->writerMutex);
×
450
    if (ret != 0) {
×
451
      sError("failed to destroy mutex since %s", tstrerror(ret));
×
452
    }
453
    taosMemoryFree(pReceiver);
×
454
    pReceiver = NULL;
×
455
    TAOS_RETURN(code);
×
456
  }
457
  pRcvBuf->entryDeleteCb = rpcFreeCont;
14,226✔
458
  pReceiver->pRcvBuf = pRcvBuf;
14,226✔
459

460
  syncSnapBufferReset(pReceiver->pRcvBuf);
14,226✔
461
  *ppReceiver = pReceiver;
14,226✔
462
  TAOS_RETURN(code);
14,226✔
463
}
464

465
static int32_t snapshotReceiverClearInfoData(SSyncSnapshotReceiver *pReceiver) {
14,310✔
466
  if (pReceiver->snapshotParam.data) {
14,310✔
467
    taosMemoryFree(pReceiver->snapshotParam.data);
84✔
468
    pReceiver->snapshotParam.data = NULL;
84✔
469
  }
470

471
  if (pReceiver->snapshot.data) {
14,310!
472
    taosMemoryFree(pReceiver->snapshot.data);
×
473
    pReceiver->snapshot.data = NULL;
×
474
  }
475
  return 0;
14,310✔
476
}
477

478
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
14,225✔
479
  if (pReceiver == NULL) return;
14,225!
480

481
  (void)taosThreadMutexLock(&pReceiver->writerMutex);
14,225✔
482
  // close writer
483
  if (pReceiver->pWriter != NULL) {
14,226!
484
    int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
×
485
                                                                   false, &pReceiver->snapshot);
486
    if (code != 0) {
×
487
      sError("vgId:%d, snapshot receiver stop failed while destroy since %s", pReceiver->pSyncNode->vgId,
×
488
             tstrerror(code));
489
    }
490
    pReceiver->pWriter = NULL;
×
491
  }
492
  (void)taosThreadMutexUnlock(&pReceiver->writerMutex);
14,226✔
493

494
  (void)taosThreadMutexDestroy(&pReceiver->writerMutex);
14,225✔
495

496
  // free snap buf
497
  if (pReceiver->pRcvBuf) {
14,225!
498
    syncSnapBufferDestroy(&pReceiver->pRcvBuf);
14,226✔
499
  }
500

501
  (void)snapshotReceiverClearInfoData(pReceiver);
14,225✔
502

503
  // free receiver
504
  taosMemoryFree(pReceiver);
14,226✔
505
}
506

507
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) {
40,728✔
508
  return (pReceiver != NULL ? atomic_load_8(&pReceiver->start) : false);
40,728!
509
}
510

511
static int32_t snapshotReceiverSignatureCmp(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
2,361✔
512
  if (pReceiver->term < pMsg->term) return -1;
2,361!
513
  if (pReceiver->term > pMsg->term) return 1;
2,361!
514
  if (pReceiver->startTime < pMsg->startTime) return -1;
2,361!
515
  if (pReceiver->startTime > pMsg->startTime) return 1;
2,361!
516
  return 0;
2,361✔
517
}
518

519
static int32_t snapshotReceiverStartWriter(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) {
84✔
520
  if (pReceiver->pWriter != NULL) {
84!
521
    sRError(pReceiver, "vgId:%d, snapshot receiver writer is not null", pReceiver->pSyncNode->vgId);
×
522
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
523
  }
524

525
  // update ack
526
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
84✔
527

528
  // update snapshot
529
  pReceiver->snapshot.lastApplyIndex = pBeginMsg->lastIndex;
84✔
530
  pReceiver->snapshot.lastApplyTerm = pBeginMsg->lastTerm;
84✔
531
  pReceiver->snapshot.lastConfigIndex = pBeginMsg->lastConfigIndex;
84✔
532
  pReceiver->snapshotParam.start = pBeginMsg->beginIndex;
84✔
533
  pReceiver->snapshotParam.end = pBeginMsg->lastIndex;
84✔
534

535
  // start writer
536
  int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, &pReceiver->snapshotParam,
84✔
537
                                                                  &pReceiver->pWriter);
538
  if (code != 0) {
84!
539
    sRError(pReceiver, "snapshot receiver start write failed since %s", tstrerror(code));
×
540
    TAOS_RETURN(code);
×
541
  }
542

543
  // event log
544
  sRInfo(pReceiver, "snapshot receiver start write");
84!
545
  return 0;
84✔
546
}
547

548
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pPreMsg) {
84✔
549
  if (snapshotReceiverIsStart(pReceiver)) {
84!
550
    sRInfo(pReceiver, "snapshot receiver has started");
×
551
    return;
×
552
  }
553

554
  int8_t started = atomic_val_compare_exchange_8(&pReceiver->start, false, true);
84✔
555
  if (started) return;
84!
556

557
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_PREP;
84✔
558
  pReceiver->term = pPreMsg->term;
84✔
559
  pReceiver->fromId = pPreMsg->srcId;
84✔
560
  pReceiver->startTime = pPreMsg->startTime;
84✔
561

562
  pReceiver->snapshotParam.start = syncNodeGetSnapBeginIndex(pReceiver->pSyncNode);
84✔
563
  pReceiver->snapshotParam.end = -1;
84✔
564

565
  sRInfo(pReceiver, "snapshot receiver start, from dnode:%d.", DID(&pReceiver->fromId));
84!
566
}
567

568
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
84✔
569
  sRDebug(pReceiver, "snapshot receiver stop, not apply, writer:%p", pReceiver->pWriter);
84!
570

571
  int8_t stopped = !atomic_val_compare_exchange_8(&pReceiver->start, true, false);
84✔
572
  if (stopped) return;
84!
573

574
  (void)taosThreadMutexLock(&pReceiver->writerMutex);
84✔
575
  {
576
    if (pReceiver->pWriter != NULL) {
84!
577
      int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
×
578
                                                                     false, &pReceiver->snapshot);
579
      if (code != 0) {
×
580
        sRError(pReceiver, "snapshot receiver stop write failed since %s", tstrerror(code));
×
581
      }
582
      pReceiver->pWriter = NULL;
×
583
    } else {
584
      sRInfo(pReceiver, "snapshot receiver stop, writer is null");
84!
585
    }
586
  }
587
  (void)taosThreadMutexUnlock(&pReceiver->writerMutex);
84✔
588

589
  (void)taosThreadMutexLock(&pReceiver->pRcvBuf->mutex);
84✔
590
  {
591
    syncSnapBufferReset(pReceiver->pRcvBuf);
84✔
592

593
    (void)snapshotReceiverClearInfoData(pReceiver);
84✔
594
  }
595
  (void)taosThreadMutexUnlock(&pReceiver->pRcvBuf->mutex);
84✔
596
}
597

598
static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
84✔
599
  int32_t code = 0;
84✔
600
  if (pReceiver->pWriter != NULL) {
84!
601
    // write data
602
    sRInfo(pReceiver, "snapshot receiver write about to finish, blockLen:%d seq:%d", pMsg->dataLen, pMsg->seq);
84!
603
    if (pMsg->dataLen > 0) {
84!
604
      code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, pMsg->data,
×
605
                                                           pMsg->dataLen);
×
606
      if (code != 0) {
×
607
        sRError(pReceiver, "failed to finish snapshot receiver write since %s", tstrerror(code));
×
608
        TAOS_RETURN(code);
×
609
      }
610
    }
611

612
    // update commit index
613
    if (pReceiver->snapshot.lastApplyIndex > pReceiver->pSyncNode->commitIndex) {
84!
614
      pReceiver->pSyncNode->commitIndex = pReceiver->snapshot.lastApplyIndex;
84✔
615
    }
616

617
    // maybe update term
618
    if (pReceiver->snapshot.lastApplyTerm > raftStoreGetTerm(pReceiver->pSyncNode)) {
84!
619
      raftStoreSetTerm(pReceiver->pSyncNode, pReceiver->snapshot.lastApplyTerm);
×
620
    }
621

622
    (void)taosThreadMutexLock(&pReceiver->writerMutex);
84✔
623
    if (pReceiver->pWriter != NULL) {
84!
624
      // stop writer, apply data
625
      code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true,
84✔
626
                                                             &pReceiver->snapshot);
627
      if (code != 0) {
84!
628
        sRError(pReceiver, "snapshot receiver apply failed since %s", tstrerror(code));
×
629
        TAOS_RETURN(code);
×
630
      }
631
      pReceiver->pWriter = NULL;
84✔
632
      sRInfo(pReceiver, "snapshot receiver write stopped");
84!
633
    }
634
    (void)taosThreadMutexUnlock(&pReceiver->writerMutex);
84✔
635

636
    // update progress
637
    pReceiver->ack = SYNC_SNAPSHOT_SEQ_END;
84✔
638

639
    // get fsmState
640
    SSnapshot snapshot = {0};
84✔
641
    code = pReceiver->pSyncNode->pFsm->FpGetSnapshotInfo(pReceiver->pSyncNode->pFsm, &snapshot);
84✔
642
    if (code != 0) {
84!
643
      sRError(pReceiver, "snapshot receiver get snapshot info failed since %s", tstrerror(code));
×
644
      TAOS_RETURN(code);
×
645
    }
646
    pReceiver->pSyncNode->fsmState = snapshot.state;
84✔
647

648
    // reset wal
649
    code =
650
        pReceiver->pSyncNode->pLogStore->syncLogRestoreFromSnapshot(pReceiver->pSyncNode->pLogStore, pMsg->lastIndex);
84✔
651
    if (code != 0) {
84!
652
      sRError(pReceiver, "failed to snapshot receiver log restore since %s", tstrerror(code));
×
653
      TAOS_RETURN(code);
×
654
    }
655
    sRInfo(pReceiver, "wal log restored from snapshot");
84!
656
  } else {
657
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
658
    sRError(pReceiver, "snapshot receiver finish error since writer is null");
×
659
    TAOS_RETURN(code);
×
660
  }
661

662
  return 0;
84✔
663
}
664

665
static int32_t snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
2,193✔
666
  if (pMsg->seq != pReceiver->ack + 1) {
2,193!
667
    sRError(pReceiver, "snapshot receiver invalid seq, ack:%d seq:%d", pReceiver->ack, pMsg->seq);
×
668
    TAOS_RETURN(TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG);
×
669
  }
670

671
  if (pReceiver->pWriter == NULL) {
2,193!
672
    sRError(pReceiver, "snapshot receiver failed to write data since writer is null");
×
673
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
674
  }
675

676
  sRDebug(pReceiver, "snapshot receiver continue to write, blockLen:%d seq:%d", pMsg->dataLen, pMsg->seq);
2,193!
677

678
  if (pMsg->dataLen > 0) {
2,193!
679
    // apply data block
680
    int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
2,193✔
681
                                                                 pMsg->data, pMsg->dataLen);
2,193✔
682
    if (code != 0) {
2,193!
683
      sRError(pReceiver, "snapshot receiver continue write failed since %s", tstrerror(code));
×
684
      TAOS_RETURN(code);
×
685
    }
686
  }
687

688
  // update progress
689
  pReceiver->ack = pMsg->seq;
2,193✔
690

691
  // event log
692
  sRDebug(pReceiver, "snapshot receiver continue to write finish");
2,193!
693
  return 0;
2,193✔
694
}
695

696
SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths) {
168✔
697
  SyncIndex snapStart = SYNC_INDEX_INVALID;
168✔
698

699
  if (syncNodeIsMnode(ths)) {
168!
700
    snapStart = SYNC_INDEX_BEGIN;
×
701
    sNInfo(ths, "snapshot begin index is %" PRId64 " since its mnode", snapStart);
×
702
  } else {
703
    SSyncLogStoreData *pData = ths->pLogStore->data;
168✔
704
    SWal              *pWal = pData->pWal;
168✔
705

706
    int64_t walCommitVer = walGetCommittedVer(pWal);
168✔
707
    snapStart = TMAX(ths->commitIndex, walCommitVer) + 1;
168✔
708

709
    sNInfo(ths, "snapshot begin index is %" PRId64, snapStart);
168!
710
  }
711

712
  return snapStart;
168✔
713
}
714

715
static int32_t syncSnapReceiverExchgSnapInfo(SSyncNode *pSyncNode, SSyncSnapshotReceiver *pReceiver,
84✔
716
                                             SyncSnapshotSend *pMsg, SSnapshot *pInfo) {
717
  if (pMsg->payloadType != TDMT_SYNC_PREP_SNAPSHOT) return TSDB_CODE_SYN_INTERNAL_ERROR;
84!
718
  int32_t code = 0, lino = 0;
84✔
719

720
  // copy snap info from leader
721
  void *data = taosMemoryCalloc(1, pMsg->dataLen);
84✔
722
  if (data == NULL) {
84!
723
    TAOS_CHECK_EXIT(terrno);
×
724
  }
725
  pInfo->data = data;
84✔
726
  data = NULL;
84✔
727
  (void)memcpy(pInfo->data, pMsg->data, pMsg->dataLen);
84✔
728

729
  // exchange snap info
730
  if ((code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, pInfo)) != 0) {
84!
731
    sRError(pReceiver, "failed to get snapshot info. type: %d", pMsg->payloadType);
×
732
    goto _exit;
×
733
  }
734
  SSyncTLV *datHead = pInfo->data;
84✔
735
  if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
84!
736
    sRError(pReceiver, "unexpected data typ in data of snapshot info. typ: %d", datHead->typ);
×
737
    code = TSDB_CODE_INVALID_DATA_FMT;
×
738
    goto _exit;
×
739
  }
740
  int32_t dataLen = sizeof(SSyncTLV) + datHead->len;
84✔
741

742
  // save exchanged snap info
743
  SSnapshotParam *pParam = &pReceiver->snapshotParam;
84✔
744
  data = taosMemoryRealloc(pParam->data, dataLen);
84✔
745
  if (data == NULL) {
84!
746
    code = terrno;
×
747
    sError("vgId:%d, failed to realloc memory for snapshot prep due to %s. dataLen:%d", pSyncNode->vgId,
×
748
           tstrerror(code), dataLen);
749
    goto _exit;
×
750
  }
751
  pParam->data = data;
84✔
752
  data = NULL;
84✔
753
  (void)memcpy(pParam->data, pInfo->data, dataLen);
84✔
754

755
_exit:
84✔
756
  TAOS_RETURN(code);
84✔
757
}
758

759
static int32_t syncNodeOnSnapshotPrep(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
84✔
760
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
84✔
761
  int64_t                timeNow = taosGetTimestampMs();
84✔
762
  int32_t                code = 0;
84✔
763

764
  if (snapshotReceiverIsStart(pReceiver)) {
84!
765
    // already start
766
    int32_t order = 0;
×
767
    if ((order = snapshotReceiverSignatureCmp(pReceiver, pMsg)) < 0) {
×
768
      sRInfo(pReceiver,
×
769
             "received a new snapshot preparation. restart receiver."
770
             " msg signature:(%" PRId64 ", %" PRId64 ")",
771
             pMsg->term, pMsg->startTime);
772
      goto _START_RECEIVER;
×
773
    } else if (order == 0) {
×
774
      sRInfo(pReceiver,
×
775
             "received a duplicate snapshot preparation. send reply."
776
             " msg signature:(%" PRId64 ", %" PRId64 ")",
777
             pMsg->term, pMsg->startTime);
778
      goto _SEND_REPLY;
×
779
    } else {
780
      // ignore
781
      sRError(pReceiver,
×
782
              "received a stale snapshot preparation. ignore."
783
              " msg signature:(%" PRId64 ", %" PRId64 ")",
784
              pMsg->term, pMsg->startTime);
785
      code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
786
      goto _SEND_REPLY;
×
787
    }
788
  } else {
789
    // start new
790
    sRInfo(pReceiver, "snapshot receiver not start yet so start new one");
84!
791
    goto _START_RECEIVER;
84✔
792
  }
793

794
_START_RECEIVER:
84✔
795
  if (snapshotReceiverIsStart(pReceiver)) {
84!
796
    sRInfo(pReceiver, "snapshot receiver already start and force stop pre one");
×
797
    snapshotReceiverStop(pReceiver);
×
798
  }
799

800
  snapshotReceiverStart(pReceiver, pMsg);
84✔
801

802
_SEND_REPLY:;
84✔
803

804
  SSnapshot snapInfo = {.type = TDMT_SYNC_PREP_SNAPSHOT_REPLY};
84✔
805
  int32_t   dataLen = 0;
84✔
806
  if (pMsg->payloadType == TDMT_SYNC_PREP_SNAPSHOT) {
84!
807
    if ((code = syncSnapReceiverExchgSnapInfo(pSyncNode, pReceiver, pMsg, &snapInfo)) != 0) {
84!
808
      goto _out;
×
809
    }
810
    SSyncTLV *datHead = snapInfo.data;
84✔
811
    dataLen = sizeof(SSyncTLV) + datHead->len;
84✔
812
  }
813

814
  // send response
815
  int32_t type = (snapInfo.data) ? snapInfo.type : 0;
84!
816
  if ((code = syncSnapSendRsp(pReceiver, pMsg, snapInfo.data, dataLen, type, code)) != 0) {
84!
817
    goto _out;
×
818
  }
819

820
_out:
84✔
821
  if (snapInfo.data) {
84!
822
    taosMemoryFree(snapInfo.data);
84✔
823
    snapInfo.data = NULL;
84✔
824
  }
825
  TAOS_RETURN(code);
84✔
826
}
827

828
static int32_t syncNodeOnSnapshotBegin(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
84✔
829
  // condition 1
830
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
84✔
831
  int32_t                code = TSDB_CODE_SYN_INTERNAL_ERROR;
84✔
832

833
  if (!snapshotReceiverIsStart(pReceiver)) {
84!
834
    sRError(pReceiver, "failed to begin snapshot receiver since not started");
×
835
    goto _SEND_REPLY;
×
836
  }
837

838
  if (snapshotReceiverSignatureCmp(pReceiver, pMsg) != 0) {
84!
839
    code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
840
    sRError(pReceiver, "failed to begin snapshot receiver since %s", tstrerror(code));
×
841
    goto _SEND_REPLY;
×
842
  }
843

844
  // start writer
845
  if ((code = snapshotReceiverStartWriter(pReceiver, pMsg)) != 0) {
84!
846
    sRError(pReceiver, "failed to start snapshot writer since %s", tstrerror(code));
×
847
    goto _SEND_REPLY;
×
848
  }
849

850
  SyncIndex beginIndex = syncNodeGetSnapBeginIndex(pSyncNode);
84✔
851
  if (pReceiver->snapshotParam.start != beginIndex) {
84!
852
    sRError(pReceiver, "snapshot begin index is changed unexpectedly. sver:%" PRId64 ", beginIndex:%" PRId64,
×
853
            pReceiver->snapshotParam.start, beginIndex);
854
    goto _SEND_REPLY;
×
855
  }
856

857
  code = 0;
84✔
858
_SEND_REPLY:
84✔
859

860
  // send response
861
  TAOS_CHECK_RETURN(syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, code));
84!
862

863
  TAOS_RETURN(code);
84✔
864
}
865

866
int32_t syncSnapSendRsp(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg, void *pBlock, int32_t blockLen,
2,361✔
867
                        int32_t type, int32_t rspCode) {
868
  int32_t    code = 0;
2,361✔
869
  SSyncNode *pSyncNode = pReceiver->pSyncNode;
2,361✔
870
  // build msg
871
  SRpcMsg rpcMsg = {0};
2,361✔
872
  if ((code = syncBuildSnapshotSendRsp(&rpcMsg, blockLen, pSyncNode->vgId)) != 0) {
2,361!
873
    sRError(pReceiver, "failed to build snapshot receiver resp since %s", tstrerror(code));
×
874
    TAOS_RETURN(code);
×
875
  }
876

877
  SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
2,361✔
878
  pRspMsg->srcId = pSyncNode->myRaftId;
2,361✔
879
  pRspMsg->destId = pMsg->srcId;
2,361✔
880
  pRspMsg->term = pMsg->term;
2,361✔
881
  pRspMsg->lastIndex = pMsg->lastIndex;
2,361✔
882
  pRspMsg->lastTerm = pMsg->lastTerm;
2,361✔
883
  pRspMsg->startTime = pMsg->startTime;
2,361✔
884
  pRspMsg->ack = pMsg->seq;
2,361✔
885
  pRspMsg->code = rspCode;
2,361✔
886
  pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;
2,361✔
887
  pRspMsg->payloadType = type;
2,361✔
888

889
  if (pBlock != NULL && blockLen > 0) {
2,361!
890
    (void)memcpy(pRspMsg->data, pBlock, blockLen);
84✔
891
  }
892

893
  // send msg
894
  if ((code = syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg)) != 0) {
2,361!
895
    sRError(pReceiver, "failed to send snapshot receiver resp since %s", tstrerror(code));
×
896
    TAOS_RETURN(code);
×
897
  }
898
  return 0;
2,361✔
899
}
900

901
static int32_t syncSnapBufferRecv(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend **ppMsg) {
2,193✔
902
  int32_t           code = 0;
2,193✔
903
  SSyncSnapBuffer  *pRcvBuf = pReceiver->pRcvBuf;
2,193✔
904
  SyncSnapshotSend *pMsg = ppMsg[0];
2,193✔
905

906
  (void)taosThreadMutexLock(&pRcvBuf->mutex);
2,193✔
907

908
  if (pMsg->seq - pRcvBuf->start >= pRcvBuf->size) {
2,193!
909
    code = TSDB_CODE_SYN_BUFFER_FULL;
×
910
    goto _out;
×
911
  }
912

913
  if (!(pRcvBuf->start <= pRcvBuf->cursor + 1 && pRcvBuf->cursor < pRcvBuf->end)) return TSDB_CODE_SYN_INTERNAL_ERROR;
2,193!
914

915
  if (pMsg->seq > pRcvBuf->cursor) {
2,193!
916
    if (pRcvBuf->entries[pMsg->seq % pRcvBuf->size]) {
2,193!
917
      pRcvBuf->entryDeleteCb(pRcvBuf->entries[pMsg->seq % pRcvBuf->size]);
×
918
    }
919
    pRcvBuf->entries[pMsg->seq % pRcvBuf->size] = pMsg;
2,193✔
920
    ppMsg[0] = NULL;
2,193✔
921
    pRcvBuf->end = TMAX(pMsg->seq + 1, pRcvBuf->end);
2,193✔
922
  } else if (pMsg->seq < pRcvBuf->start) {
×
923
    code = syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, code);
×
924
    goto _out;
×
925
  }
926

927
  for (int64_t seq = pRcvBuf->cursor + 1; seq < pRcvBuf->end; ++seq) {
4,386✔
928
    if (pRcvBuf->entries[seq % pRcvBuf->size]) {
2,847✔
929
      pRcvBuf->cursor = seq;
2,193✔
930
    } else {
931
      break;
654✔
932
    }
933
  }
934

935
  for (int64_t seq = pRcvBuf->start; seq <= pRcvBuf->cursor; ++seq) {
4,386✔
936
    if ((code = snapshotReceiverGotData(pReceiver, pRcvBuf->entries[seq % pRcvBuf->size])) != 0) {
2,193!
937
      if (code >= SYNC_SNAPSHOT_SEQ_INVALID) {
×
938
        code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
939
      }
940
    }
941
    pRcvBuf->start = seq + 1;
2,193✔
942
    if (syncSnapSendRsp(pReceiver, pRcvBuf->entries[seq % pRcvBuf->size], NULL, 0, 0, code) != 0) {
2,193!
943
      sError("failed to send snap rsp");
×
944
    }
945
    pRcvBuf->entryDeleteCb(pRcvBuf->entries[seq % pRcvBuf->size]);
2,193✔
946
    pRcvBuf->entries[seq % pRcvBuf->size] = NULL;
2,193✔
947
    if (code) goto _out;
2,193!
948
  }
949

950
_out:
2,193✔
951
  (void)taosThreadMutexUnlock(&pRcvBuf->mutex);
2,193✔
952
  TAOS_RETURN(code);
2,193✔
953
}
954

955
static int32_t syncNodeOnSnapshotReceive(SSyncNode *pSyncNode, SyncSnapshotSend **ppMsg) {
2,193✔
956
  // condition 4
957
  // transfering
958
  SyncSnapshotSend *pMsg = ppMsg[0];
2,193✔
959
  if (!pMsg) return TSDB_CODE_SYN_INTERNAL_ERROR;
2,193!
960
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
2,193✔
961
  int64_t                timeNow = taosGetTimestampMs();
2,193✔
962
  int32_t                code = 0;
2,193✔
963

964
  if (snapshotReceiverSignatureCmp(pReceiver, pMsg) != 0) {
2,193!
965
    code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
966
    sRError(pReceiver, "failed to receive snapshot data since %s.", tstrerror(code));
×
967
    return syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, code);
×
968
  }
969

970
  return syncSnapBufferRecv(pReceiver, ppMsg);
2,193✔
971
}
972

973
static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
84✔
974
  // condition 2
975
  // end, finish FSM
976
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
84✔
977
  int64_t                timeNow = taosGetTimestampMs();
84✔
978
  int32_t                code = 0;
84✔
979

980
  if (snapshotReceiverSignatureCmp(pReceiver, pMsg) != 0) {
84!
981
    sRError(pReceiver, "snapshot end failed since startTime:%" PRId64 " not equal to msg startTime:%" PRId64,
×
982
            pReceiver->startTime, pMsg->startTime);
983
    code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
984
    goto _SEND_REPLY;
×
985
  }
986

987
  code = snapshotReceiverFinish(pReceiver, pMsg);
84✔
988
  if (code == 0) {
84!
989
    snapshotReceiverStop(pReceiver);
84✔
990
  }
991

992
_SEND_REPLY:;
×
993

994
  // build msg
995
  SRpcMsg rpcMsg = {0};
84✔
996
  if ((code = syncBuildSnapshotSendRsp(&rpcMsg, 0, pSyncNode->vgId)) != 0) {
84!
997
    sRError(pReceiver, "snapshot receiver build rsp failed since %s", tstrerror(code));
×
998
    TAOS_RETURN(code);
×
999
  }
1000

1001
  SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
84✔
1002
  pRspMsg->srcId = pSyncNode->myRaftId;
84✔
1003
  pRspMsg->destId = pMsg->srcId;
84✔
1004
  pRspMsg->term = raftStoreGetTerm(pSyncNode);
84✔
1005
  pRspMsg->lastIndex = pMsg->lastIndex;
84✔
1006
  pRspMsg->lastTerm = pMsg->lastTerm;
84✔
1007
  pRspMsg->startTime = pMsg->startTime;
84✔
1008
  pRspMsg->ack = pReceiver->ack;  // receiver maybe already closed
84✔
1009
  pRspMsg->code = code;
84✔
1010
  pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;
84✔
1011

1012
  // send msg
1013
  syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver end");
84✔
1014
  if ((code = syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg)) != 0) {
84!
1015
    sRError(pReceiver, "snapshot receiver send rsp failed since %s", tstrerror(code));
×
1016
    TAOS_RETURN(code);
×
1017
  }
1018

1019
  TAOS_RETURN(code);
84✔
1020
}
1021

1022
int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
2,445✔
1023
  SyncSnapshotSend     **ppMsg = (SyncSnapshotSend **)&pRpcMsg->pCont;
2,445✔
1024
  SyncSnapshotSend      *pMsg = ppMsg[0];
2,445✔
1025
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
2,445✔
1026
  int32_t                code = 0;
2,445✔
1027

1028
  // if already drop replica, do not process
1029
  if (!syncNodeInRaftGroup(pSyncNode, &pMsg->srcId)) {
2,445!
1030
    syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "not in my config");
×
1031
    code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
1032
    TAOS_RETURN(code);
×
1033
  }
1034

1035
  if (pMsg->term < raftStoreGetTerm(pSyncNode)) {
2,445!
1036
    sRError(pReceiver, "reject snap replication with smaller term. msg term:%" PRId64 ", seq:%d", pMsg->term,
×
1037
            pMsg->seq);
1038
    code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
1039
    if (syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, code) != 0) sError("failed to send snap rsp");
×
1040
    TAOS_RETURN(code);
×
1041
  }
1042

1043
  if (pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole != TAOS_SYNC_ROLE_LEARNER) {
2,445✔
1044
    if (pMsg->term > raftStoreGetTerm(pSyncNode)) {
622!
1045
      syncNodeStepDown(pSyncNode, pMsg->term);
×
1046
    }
1047
  } else {
1048
    syncNodeUpdateTermWithoutStepDown(pSyncNode, pMsg->term);
1,823✔
1049
  }
1050

1051
  if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER && pSyncNode->state != TAOS_SYNC_STATE_LEARNER) {
2,445!
1052
    sRError(pReceiver, "snapshot receiver not a follower or learner");
×
1053
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1054
    TAOS_RETURN(code);
×
1055
  }
1056

1057
  if (pMsg->seq < SYNC_SNAPSHOT_SEQ_PREP || pMsg->seq > SYNC_SNAPSHOT_SEQ_END) {
2,445!
1058
    sRError(pReceiver, "snap replication msg with invalid seq:%d", pMsg->seq);
×
1059
    code = TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG;
×
1060
    TAOS_RETURN(code);
×
1061
  }
1062

1063
  // prepare
1064
  if (pMsg->seq == SYNC_SNAPSHOT_SEQ_PREP) {
2,445✔
1065
    sInfo("vgId:%d, prepare snap replication. msg signature:(%" PRId64 ", %" PRId64 ")", pSyncNode->vgId, pMsg->term,
84!
1066
          pMsg->startTime);
1067
    code = syncNodeOnSnapshotPrep(pSyncNode, pMsg);
84✔
1068
    goto _out;
84✔
1069
  }
1070

1071
  // begin
1072
  if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) {
2,361✔
1073
    sInfo("vgId:%d, begin snap replication. msg signature:(%" PRId64 ", %" PRId64 ")", pSyncNode->vgId, pMsg->term,
84!
1074
          pMsg->startTime);
1075
    code = syncNodeOnSnapshotBegin(pSyncNode, pMsg);
84✔
1076
    goto _out;
84✔
1077
  }
1078

1079
  // data
1080
  if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
2,277!
1081
    code = syncNodeOnSnapshotReceive(pSyncNode, ppMsg);
2,193✔
1082
    goto _out;
2,193✔
1083
  }
1084

1085
  // end
1086
  if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
84!
1087
    sInfo("vgId:%d, end snap replication. msg signature:(%" PRId64 ", %" PRId64 ")", pSyncNode->vgId, pMsg->term,
84!
1088
          pMsg->startTime);
1089
    code = syncNodeOnSnapshotEnd(pSyncNode, pMsg);
84✔
1090
    if (code != 0) {
84!
1091
      sRError(pReceiver, "failed to end snapshot.");
×
1092
      goto _out;
×
1093
    }
1094

1095
    code = syncLogBufferReInit(pSyncNode->pLogBuf, pSyncNode);
84✔
1096
    if (code != 0) {
84!
1097
      sRError(pReceiver, "failed to reinit log buffer since %s", tstrerror(code));
×
1098
    }
1099
    goto _out;
84✔
1100
  }
1101

1102
_out:;
×
1103
  syncNodeResetElectTimer(pSyncNode);
2,445✔
1104
  TAOS_RETURN(code);
2,445✔
1105
}
1106

1107
static int32_t syncSnapSenderExchgSnapInfo(SSyncNode *pSyncNode, SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
85✔
1108
  if (pMsg->payloadType != TDMT_SYNC_PREP_SNAPSHOT_REPLY) return TSDB_CODE_SYN_INTERNAL_ERROR;
85!
1109

1110
  SSyncTLV *datHead = (void *)pMsg->data;
85✔
1111
  if (datHead->typ != pMsg->payloadType) {
85!
1112
    sSError(pSender, "unexpected data type in data of SyncSnapshotRsp. typ: %d", datHead->typ);
×
1113
    TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT);
×
1114
  }
1115
  int32_t dataLen = sizeof(SSyncTLV) + datHead->len;
85✔
1116

1117
  SSnapshotParam *pParam = &pSender->snapshotParam;
85✔
1118
  void           *data = taosMemoryRealloc(pParam->data, dataLen);
85✔
1119
  if (data == NULL) {
85!
1120
    TAOS_RETURN(terrno);
×
1121
  }
1122
  (void)memcpy(data, pMsg->data, dataLen);
85✔
1123

1124
  pParam->data = data;
85✔
1125
  data = NULL;
85✔
1126
  sSInfo(pSender, "data of snapshot param. len: %d", datHead->len);
85!
1127
  return 0;
85✔
1128
}
1129

1130
// sender
1131
static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
85✔
1132
  int32_t   code = 0;
85✔
1133
  SSnapshot snapshot = {0};
85✔
1134

1135
  if (pMsg->snapBeginIndex > pSyncNode->commitIndex) {
85!
1136
    sSError(pSender,
×
1137
            "snapshot begin index is greater than commit index. snapBeginIndex:%" PRId64 ", commitIndex:%" PRId64,
1138
            pMsg->snapBeginIndex, pSyncNode->commitIndex);
1139
    TAOS_RETURN(TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG);
×
1140
  }
1141

1142
  (void)taosThreadMutexLock(&pSender->pSndBuf->mutex);
85✔
1143
  TAOS_CHECK_GOTO(pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot), NULL, _out);
85!
1144

1145
  // prepare <begin, end>
1146
  pSender->snapshotParam.start = pMsg->snapBeginIndex;
85✔
1147
  pSender->snapshotParam.end = snapshot.lastApplyIndex;
85✔
1148

1149
  sSInfo(pSender, "prepare snapshot, recv-begin:%" PRId64 ", snapshot.last:%" PRId64 ", snapshot.term:%" PRId64,
85!
1150
         pMsg->snapBeginIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
1151

1152
  // update sender
1153
  pSender->snapshot = snapshot;
85✔
1154

1155
  // start reader
1156
  if (pMsg->payloadType == TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
85!
1157
    TAOS_CHECK_GOTO(syncSnapSenderExchgSnapInfo(pSyncNode, pSender, pMsg), NULL, _out);
85!
1158
  }
1159

1160
  code = pSyncNode->pFsm->FpSnapshotStartRead(pSyncNode->pFsm, &pSender->snapshotParam, &pSender->pReader);
85✔
1161
  if (code != 0) {
85!
1162
    sSError(pSender, "prepare snapshot failed since %s", tstrerror(code));
×
1163
    goto _out;
×
1164
  }
1165

1166
  // update next index
1167
  syncIndexMgrSetIndex(pSyncNode->pNextIndex, &pMsg->srcId, snapshot.lastApplyIndex + 1);
85✔
1168

1169
  code = snapshotSend(pSender);
85✔
1170

1171
_out:
85✔
1172
  (void)taosThreadMutexUnlock(&pSender->pSndBuf->mutex);
85✔
1173
  TAOS_RETURN(code);
85✔
1174
}
1175

1176
static int32_t snapshotSenderSignatureCmp(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
4,826✔
1177
  if (pSender->term < pMsg->term) return -1;
4,826!
1178
  if (pSender->term > pMsg->term) return 1;
4,826!
1179
  if (pSender->startTime < pMsg->startTime) return -1;
4,826!
1180
  if (pSender->startTime > pMsg->startTime) return 1;
4,826!
1181
  return 0;
4,826✔
1182
}
1183

1184
static int32_t syncSnapBufferSend(SSyncSnapshotSender *pSender, SyncSnapshotRsp **ppMsg) {
2,328✔
1185
  int32_t          code = 0;
2,328✔
1186
  SSyncSnapBuffer *pSndBuf = pSender->pSndBuf;
2,328✔
1187
  SyncSnapshotRsp *pMsg = ppMsg[0];
2,328✔
1188

1189
  (void)taosThreadMutexLock(&pSndBuf->mutex);
2,328✔
1190
  if (snapshotSenderSignatureCmp(pSender, pMsg) != 0) {
2,328!
1191
    code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
1192
    goto _out;
×
1193
  }
1194

1195
  if (pSender->pReader == NULL || pSender->finish || !snapshotSenderIsStart(pSender)) {
2,328!
1196
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1197
    goto _out;
×
1198
  }
1199

1200
  if (pMsg->ack - pSndBuf->start >= pSndBuf->size) {
2,328!
1201
    code = TSDB_CODE_SYN_BUFFER_FULL;
×
1202
    goto _out;
×
1203
  }
1204

1205
  if (!(pSndBuf->start <= pSndBuf->cursor + 1 && pSndBuf->cursor < pSndBuf->end)) {
2,328!
1206
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1207
    goto _out;
×
1208
  }
1209

1210
  if (pMsg->ack > pSndBuf->cursor && pMsg->ack < pSndBuf->end) {
2,328!
1211
    SyncSnapBlock *pBlk = pSndBuf->entries[pMsg->ack % pSndBuf->size];
2,243✔
1212
    if (!pBlk) {
2,243!
1213
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1214
      goto _out;
×
1215
    }
1216
    pBlk->acked = 1;
2,243✔
1217
  }
1218

1219
  for (int64_t ack = pSndBuf->cursor + 1; ack < pSndBuf->end; ++ack) {
4,571✔
1220
    SyncSnapBlock *pBlk = pSndBuf->entries[ack % pSndBuf->size];
4,401✔
1221
    if (pBlk->acked) {
4,401✔
1222
      pSndBuf->cursor = ack;
2,243✔
1223
    } else {
1224
      break;
2,158✔
1225
    }
1226
  }
1227

1228
  for (int64_t ack = pSndBuf->start; ack <= pSndBuf->cursor; ++ack) {
4,571✔
1229
    pSndBuf->entryDeleteCb(pSndBuf->entries[ack % pSndBuf->size]);
2,243✔
1230
    pSndBuf->entries[ack % pSndBuf->size] = NULL;
2,243✔
1231
    pSndBuf->start = ack + 1;
2,243✔
1232
  }
1233

1234
  while (pSender->seq != SYNC_SNAPSHOT_SEQ_END && pSender->seq - pSndBuf->start < tsSnapReplMaxWaitN) {
4,656✔
1235
    if ((code = snapshotSend(pSender)) != 0) {
2,328!
1236
      goto _out;
×
1237
    }
1238
  }
1239

1240
  if (pSender->seq == SYNC_SNAPSHOT_SEQ_END && pSndBuf->end <= pSndBuf->start) {
2,328✔
1241
    if ((code = snapshotSend(pSender)) != 0) {
85!
1242
      goto _out;
×
1243
    }
1244
  }
1245
_out:
2,328✔
1246
  (void)taosThreadMutexUnlock(&pSndBuf->mutex);
2,328✔
1247
  TAOS_RETURN(code);
2,328✔
1248
}
1249

1250
int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
2,498✔
1251
  SyncSnapshotRsp **ppMsg = (SyncSnapshotRsp **)&pRpcMsg->pCont;
2,498✔
1252
  SyncSnapshotRsp  *pMsg = ppMsg[0];
2,498✔
1253
  int32_t           code = 0;
2,498✔
1254

1255
  // if already drop replica, do not process
1256
  if (!syncNodeInRaftGroup(pSyncNode, &pMsg->srcId)) {
2,498!
1257
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "maybe replica already dropped");
×
1258
    TAOS_RETURN(TSDB_CODE_SYN_MISMATCHED_SIGNATURE);
×
1259
  }
1260

1261
  // get sender
1262
  SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, &pMsg->srcId);
2,498✔
1263
  if (pSender == NULL) {
2,498!
1264
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "sender is null");
×
1265
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
1266
  }
1267

1268
  if (!snapshotSenderIsStart(pSender)) {
2,498!
1269
    sSError(pSender, "snapshot sender stopped. sender startTime:%" PRId64 ", msg startTime:%" PRId64,
×
1270
            pSender->startTime, pMsg->startTime);
1271
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
1272
  }
1273

1274
  // check signature
1275
  int32_t order = 0;
2,498✔
1276
  if ((order = snapshotSenderSignatureCmp(pSender, pMsg)) > 0) {
2,498!
1277
    sSWarn(pSender, "ignore a stale snap rsp, msg signature:(%" PRId64 ", %" PRId64 ").", pMsg->term, pMsg->startTime);
×
1278
    TAOS_RETURN(TSDB_CODE_SYN_MISMATCHED_SIGNATURE);
×
1279
  } else if (order < 0) {
2,498!
1280
    sSError(pSender, "snapshot sender is stale. stop");
×
1281
    code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
1282
    goto _ERROR;
×
1283
  }
1284

1285
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
2,498!
1286
    sSError(pSender, "snapshot sender not leader");
×
1287
    code = TSDB_CODE_SYN_NOT_LEADER;
×
1288
    goto _ERROR;
×
1289
  }
1290

1291
  SyncTerm currentTerm = raftStoreGetTerm(pSyncNode);
2,498✔
1292
  if (pMsg->term != currentTerm) {
2,498!
1293
    sSError(pSender, "snapshot sender term mismatch, msg term:%" PRId64 " currentTerm:%" PRId64, pMsg->term,
×
1294
            currentTerm);
1295
    code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
1296
    goto _ERROR;
×
1297
  }
1298

1299
  if (pMsg->code != 0) {
2,498!
1300
    sSError(pSender, "snapshot sender receive error:%s 0x%x and stop sender", tstrerror(pMsg->code), pMsg->code);
×
1301
    code = pMsg->code;
×
1302
    goto _ERROR;
×
1303
  }
1304

1305
  // send begin
1306
  if (pMsg->ack == SYNC_SNAPSHOT_SEQ_PREP) {
2,498✔
1307
    sSInfo(pSender, "process prepare rsp");
85!
1308
    if ((code = syncNodeOnSnapshotPrepRsp(pSyncNode, pSender, pMsg)) != 0) {
85!
1309
      goto _ERROR;
×
1310
    }
1311
  }
1312

1313
  // send msg of data or end
1314
  if (pMsg->ack >= SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->ack < SYNC_SNAPSHOT_SEQ_END) {
2,498✔
1315
    if ((code = syncSnapBufferSend(pSender, ppMsg)) != 0) {
2,328!
1316
      sSError(pSender, "failed to replicate snap since %s. seq:%d, pReader:%p, finish:%d", tstrerror(code),
×
1317
              pSender->seq, pSender->pReader, pSender->finish);
1318
      goto _ERROR;
×
1319
    }
1320
  }
1321

1322
  // end
1323
  if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
2,498✔
1324
    sSInfo(pSender, "process end rsp");
85!
1325
    snapshotSenderStop(pSender, true);
85✔
1326
    TAOS_CHECK_GOTO(syncNodeReplicateReset(pSyncNode, &pMsg->srcId), NULL, _ERROR);
85!
1327
  }
1328

1329
  return 0;
2,498✔
1330

1331
_ERROR:
×
1332
  snapshotSenderStop(pSender, false);
×
1333
  if (syncNodeReplicateReset(pSyncNode, &pMsg->srcId) != 0) sError("failed to reset replicate");
×
1334
  TAOS_RETURN(code);
×
1335
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc