• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5056

17 May 2026 01:15AM UTC coverage: 73.384% (+0.03%) from 73.355%
#5056

push

travis-ci

web-flow
feat (TDgpt): Dynamic Model Synchronization Enhancements (#35344)

* refactor: do some internal refactor.

* fix: fix multiprocess sync issue.

* feat: add dynamic anomaly detection and forecasting services

* fix: log error message for undeploying model in exception handling

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* fix: handle undeploy when model exists only on disk

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/286aafa0-c3ce-4c27-b803-2707571e9dc1

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: guard dynamic registry concurrent access

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: tighten service list locking scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: restore prophet support and update tests per review feedback

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: improve test name and move copy inside lock scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* Potential fix for pull request finding

Co-au... (continued)

281643 of 383795 relevant lines covered (73.38%)

135942701.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.94
/source/libs/sync/src/syncSnapshot.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "syncSnapshot.h"
18
#include "syncIndexMgr.h"
19
#include "syncPipeline.h"
20
#include "syncRaftCfg.h"
21
#include "syncRaftLog.h"
22
#include "syncRaftStore.h"
23
#include "syncReplication.h"
24
#include "syncUtil.h"
25
#include "tglobal.h"
26

27
static SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths);
28

29
static void syncSnapBufferReset(SSyncSnapBuffer *pBuf) {
178,438,072✔
30
  for (int64_t i = pBuf->start; i < pBuf->end; ++i) {
178,442,628✔
31
    if (pBuf->entryDeleteCb) {
4,556✔
32
      pBuf->entryDeleteCb(pBuf->entries[i % pBuf->size]);
4,556✔
33
    }
34
    pBuf->entries[i % pBuf->size] = NULL;
4,556✔
35
  }
36
  pBuf->start = SYNC_SNAPSHOT_SEQ_BEGIN + 1;
178,437,252✔
37
  pBuf->end = pBuf->start;
178,437,252✔
38
  pBuf->cursor = pBuf->start - 1;
178,437,652✔
39
}
178,437,579✔
40

41
static void syncSnapBufferDestroy(SSyncSnapBuffer **ppBuf) {
89,097,634✔
42
  if (ppBuf == NULL || ppBuf[0] == NULL) return;
89,097,634✔
43
  SSyncSnapBuffer *pBuf = ppBuf[0];
89,097,634✔
44

45
  syncSnapBufferReset(pBuf);
89,097,634✔
46

47
  (void)taosThreadMutexDestroy(&pBuf->mutex);
89,097,634✔
48
  taosMemoryFree(ppBuf[0]);
89,097,634✔
49
  ppBuf[0] = NULL;
89,093,510✔
50
  return;
89,093,868✔
51
}
52

53
static int32_t syncSnapBufferCreate(SSyncSnapBuffer **ppBuf) {
89,274,988✔
54
  SSyncSnapBuffer *pBuf = taosMemoryCalloc(1, sizeof(SSyncSnapBuffer));
89,274,988✔
55
  if (pBuf == NULL) {
89,271,982✔
56
    *ppBuf = NULL;
×
57
    TAOS_RETURN(terrno);
×
58
  }
59
  pBuf->size = sizeof(pBuf->entries) / sizeof(void *);
89,271,982✔
60
  if (pBuf->size != TSDB_SYNC_SNAP_BUFFER_SIZE) return TSDB_CODE_SYN_INTERNAL_ERROR;
89,271,982✔
61
  (void)taosThreadMutexInit(&pBuf->mutex, NULL);
89,272,790✔
62
  *ppBuf = pBuf;
89,273,534✔
63
  TAOS_RETURN(0);
89,273,534✔
64
}
65

66
int32_t snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex, SSyncSnapshotSender **ppSender) {
83,741,227✔
67
  int32_t code = 0;
83,741,227✔
68
  *ppSender = NULL;
83,741,227✔
69
  bool condition = (pSyncNode->pFsm->FpSnapshotStartRead != NULL) && (pSyncNode->pFsm->FpSnapshotStopRead != NULL) &&
167,487,105✔
70
                   (pSyncNode->pFsm->FpSnapshotDoRead != NULL);
83,743,021✔
71
  if (!condition) {
83,744,015✔
72
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
73
  }
74

75
  SSyncSnapshotSender *pSender = taosMemoryCalloc(1, sizeof(SSyncSnapshotSender));
83,744,015✔
76
  if (pSender == NULL) {
83,736,653✔
77
    TAOS_RETURN(terrno);
×
78
  }
79

80
  pSender->start = false;
83,736,653✔
81
  pSender->seq = SYNC_SNAPSHOT_SEQ_INVALID;
83,736,653✔
82
  pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
83,737,450✔
83
  pSender->pReader = NULL;
83,737,450✔
84
  pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS;
83,737,450✔
85
  pSender->pSyncNode = pSyncNode;
83,738,684✔
86
  pSender->replicaIndex = replicaIndex;
83,738,684✔
87
  pSender->term = raftStoreGetTerm(pSyncNode);
83,738,312✔
88
  pSender->senderStartTime = -1;
83,744,170✔
89
  pSender->finish = false;
83,743,968✔
90

91
  code = pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &pSender->snapshot);
83,744,170✔
92
  if (code != 0) {
83,743,694✔
93
    taosMemoryFreeClear(pSender);
×
94
    TAOS_RETURN(code);
×
95
  }
96
  SSyncSnapBuffer *pSndBuf = NULL;
83,743,694✔
97
  code = syncSnapBufferCreate(&pSndBuf);
83,743,694✔
98
  if (pSndBuf == NULL) {
83,741,705✔
99
    taosMemoryFreeClear(pSender);
×
100
    TAOS_RETURN(code);
×
101
  }
102
  pSndBuf->entryDeleteCb = syncSnapBlockDestroy;
83,741,705✔
103
  pSender->pSndBuf = pSndBuf;
83,741,705✔
104

105
  syncSnapBufferReset(pSender->pSndBuf);
83,741,705✔
106
  *ppSender = pSender;
83,741,618✔
107
  TAOS_RETURN(code);
83,741,618✔
108
}
109

110
void syncSnapBlockDestroy(void *ptr) {
446,501✔
111
  SyncSnapBlock *pBlk = ptr;
446,501✔
112
  if (pBlk->pBlock != NULL) {
446,501✔
113
    taosMemoryFree(pBlk->pBlock);
413,130✔
114
    pBlk->pBlock = NULL;
413,130✔
115
    pBlk->blockLen = 0;
413,130✔
116
  }
117
  taosMemoryFree(pBlk);
446,501✔
118
}
446,501✔
119

120
static int32_t snapshotSenderClearInfoData(SSyncSnapshotSender *pSender) {
83,588,825✔
121
  if (pSender->snapshotParam.data) {
83,588,825✔
122
    taosMemoryFree(pSender->snapshotParam.data);
33,371✔
123
    pSender->snapshotParam.data = NULL;
33,371✔
124
  }
125

126
  if (pSender->snapshot.data) {
83,597,505✔
127
    taosMemoryFree(pSender->snapshot.data);
×
128
    pSender->snapshot.data = NULL;
×
129
  }
130
  return 0;
83,597,643✔
131
}
132

133
void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
83,565,514✔
134
  if (pSender == NULL) return;
83,565,514✔
135

136
  // close reader
137
  if (pSender->pReader != NULL) {
83,565,514✔
138
    pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
×
139
    pSender->pReader = NULL;
×
140
  }
141

142
  // free snap buffer
143
  if (pSender->pSndBuf) {
83,565,514✔
144
    syncSnapBufferDestroy(&pSender->pSndBuf);
83,565,872✔
145
  }
146

147
  (void)snapshotSenderClearInfoData(pSender);
83,561,748✔
148

149
  // free sender
150
  taosMemoryFree(pSender);
83,564,272✔
151
}
152

153
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return atomic_load_8(&pSender->start); }
83,788,058✔
154

155
int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
33,371✔
156
  int32_t code = 0;
33,371✔
157

158
  int8_t started = atomic_val_compare_exchange_8(&pSender->start, false, true);
33,371✔
159
  if (started) return 0;
33,371✔
160

161
  pSender->seq = SYNC_SNAPSHOT_SEQ_PREP;
33,371✔
162
  pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
33,371✔
163
  pSender->pReader = NULL;
33,371✔
164
  pSender->snapshotParam.start = SYNC_INDEX_INVALID;
33,371✔
165
  pSender->snapshotParam.end = SYNC_INDEX_INVALID;
33,371✔
166
  pSender->snapshot.data = NULL;
33,371✔
167
  pSender->snapshotParam.end = SYNC_INDEX_INVALID;
33,371✔
168
  pSender->snapshot.lastApplyIndex = SYNC_INDEX_INVALID;
33,371✔
169
  pSender->snapshot.lastApplyTerm = SYNC_TERM_INVALID;
33,371✔
170
  pSender->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;
33,371✔
171

172
  (void)memset(&pSender->lastConfig, 0, sizeof(pSender->lastConfig));
33,371✔
173
  pSender->sendingMS = 0;
33,371✔
174
  pSender->term = raftStoreGetTerm(pSender->pSyncNode);
33,371✔
175
  pSender->senderStartTime = taosGetMonoTimestampMs();
33,371✔
176
  pSender->lastSendTime = taosGetTimestampMs();
33,371✔
177
  pSender->finish = false;
33,371✔
178

179
  // Get snapshot info
180
  SSyncNode *pSyncNode = pSender->pSyncNode;
33,371✔
181
  SSnapshot  snapInfo = {.type = TDMT_SYNC_PREP_SNAPSHOT};
33,371✔
182
  if ((code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapInfo)) != 0) {
33,371✔
183
    sSError(pSender, "snapshot get info failure since %s", tstrerror(code));
×
184
    goto _out;
×
185
  }
186

187
  void   *pData = snapInfo.data;
33,371✔
188
  int32_t type = (pData) ? snapInfo.type : 0;
33,371✔
189
  int32_t dataLen = 0;
33,371✔
190
  if (pData) {
33,371✔
191
    SSyncTLV *datHead = pData;
33,371✔
192
    if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT) {
33,371✔
193
      sSError(pSender, "unexpected data typ in data of snapshot info. typ: %d", datHead->typ);
×
194
      code = TSDB_CODE_INVALID_DATA_FMT;
×
195
      goto _out;
×
196
    }
197
    dataLen = sizeof(SSyncTLV) + datHead->len;
33,371✔
198
  }
199

200
  sInfo("vgId:%d, send msg:%s", pSyncNode->vgId, TMSG_INFO(type));
33,371✔
201
  if ((code = syncSnapSendMsg(pSender, pSender->seq, pData, dataLen, type)) != 0) {
33,371✔
202
    goto _out;
×
203
  }
204

205
  SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
33,371✔
206
  sSInfo(pSender, "snapshot sender start, to dnode:%d.", DID(&destId));
33,371✔
207
_out:
33,371✔
208
  if (snapInfo.data) {
33,371✔
209
    taosMemoryFree(snapInfo.data);
33,371✔
210
    snapInfo.data = NULL;
33,371✔
211
  }
212
  TAOS_RETURN(code);
33,371✔
213
}
214

215
void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
33,371✔
216
  sSDebug(pSender, "snapshot sender stop, finish:%d reader:%p", finish, pSender->pReader);
33,371✔
217

218
  // update flag
219
  int8_t stopped = !atomic_val_compare_exchange_8(&pSender->start, true, false);
33,371✔
220
  if (stopped) return;
33,371✔
221
  (void)taosThreadMutexLock(&pSender->pSndBuf->mutex);
33,371✔
222
  {
223
    pSender->finish = finish;
33,371✔
224

225
    // close reader
226
    if (pSender->pReader != NULL) {
33,371✔
227
      pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
33,371✔
228
      pSender->pReader = NULL;
33,371✔
229
    }
230

231
    syncSnapBufferReset(pSender->pSndBuf);
33,371✔
232

233
    (void)snapshotSenderClearInfoData(pSender);
33,371✔
234

235
    SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
33,371✔
236
    sSInfo(pSender, "snapshot sender stop, to dnode:%d, finish:%d", DID(&destId), finish);
33,371✔
237
  }
238
  (void)taosThreadMutexUnlock(&pSender->pSndBuf->mutex);
33,371✔
239
}
240

241
int32_t syncSnapSendMsg(SSyncSnapshotSender *pSender, int32_t seq, void *pBlock, int32_t blockLen, int32_t typ) {
513,108✔
242
  int32_t code = 0;
513,108✔
243
  SRpcMsg rpcMsg = {0};
513,108✔
244

245
  if ((code = syncBuildSnapshotSend(&rpcMsg, blockLen, pSender->pSyncNode->vgId)) != 0) {
513,108✔
246
    sSError(pSender, "failed to build snap replication msg since %s", tstrerror(code));
×
247
    goto _OUT;
×
248
  }
249

250
  SyncSnapshotSend *pMsg = rpcMsg.pCont;
513,108✔
251
  pMsg->srcId = pSender->pSyncNode->myRaftId;
513,108✔
252
  pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
513,108✔
253
  pMsg->term = pSender->term;
513,108✔
254
  pMsg->beginIndex = pSender->snapshotParam.start;
513,108✔
255
  pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
513,108✔
256
  pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
513,108✔
257
  pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
513,108✔
258
  pMsg->lastConfig = pSender->lastConfig;
513,108✔
259
  pMsg->snapStartTime = pSender->senderStartTime;
513,108✔
260
  pMsg->seq = seq;
513,108✔
261

262
  if (pBlock != NULL && blockLen > 0) {
513,108✔
263
    (void)memcpy(pMsg->data, pBlock, blockLen);
446,501✔
264
  }
265
  pMsg->payloadType = typ;
513,108✔
266

267
  // send msg
268
  if ((code = syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg)) != 0) {
513,108✔
269
    sSError(pSender, "failed to send snap replication msg since %s. seq:%d", tstrerror(code), seq);
×
270
    goto _OUT;
×
271
  }
272

273
_OUT:
513,108✔
274
  TAOS_RETURN(code);
513,108✔
275
}
276

277
// when sender receive ack, call this function to send msg from seq
278
// seq = ack + 1, already updated
279
static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
513,108✔
280
  int32_t        code = 0;
513,108✔
281
  SyncSnapBlock *pBlk = NULL;
513,108✔
282

283
  if (pSender->seq < SYNC_SNAPSHOT_SEQ_END) {
513,108✔
284
    pSender->seq++;
479,872✔
285

286
    if (pSender->seq > SYNC_SNAPSHOT_SEQ_BEGIN) {
479,872✔
287
      pBlk = taosMemoryCalloc(1, sizeof(SyncSnapBlock));
446,501✔
288
      if (pBlk == NULL) {
446,501✔
289
        code = terrno;
×
290
        goto _OUT;
×
291
      }
292

293
      pBlk->seq = pSender->seq;
446,501✔
294

295
      // read data
296
      code = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader, &pBlk->pBlock,
446,501✔
297
                                                        &pBlk->blockLen);
298
      if (code != 0) {
446,501✔
299
        sSError(pSender, "snapshot sender read failed since %s", tstrerror(code));
×
300
        goto _OUT;
×
301
      }
302

303
      if (pBlk->blockLen > 0) {
446,501✔
304
        // has read data
305
        sSDebug(pSender, "snapshot sender continue to read, blockLen:%d seq:%d", pBlk->blockLen, pBlk->seq);
413,130✔
306
      } else {
307
        // read finish, update seq to end
308
        pSender->seq = SYNC_SNAPSHOT_SEQ_END;
33,371✔
309
        sSInfo(pSender, "snapshot sender read to the end");
33,371✔
310
        goto _OUT;
33,371✔
311
      }
312
    }
313
  }
314

315
  if (!(pSender->seq >= SYNC_SNAPSHOT_SEQ_BEGIN && pSender->seq <= SYNC_SNAPSHOT_SEQ_END)) {
479,737✔
316
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
317
    goto _OUT;
×
318
  }
319

320
  // send msg
321
  int32_t blockLen = (pBlk) ? pBlk->blockLen : 0;
479,737✔
322
  void   *pBlock = (pBlk) ? pBlk->pBlock : NULL;
479,737✔
323
  if ((code = syncSnapSendMsg(pSender, pSender->seq, pBlock, blockLen, 0)) != 0) {
479,737✔
324
    goto _OUT;
×
325
  }
326

327
  // put in buffer
328
  int64_t nowMs = taosGetTimestampMs();
479,737✔
329
  if (pBlk) {
479,737✔
330
    if (!(pBlk->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pBlk->seq < SYNC_SNAPSHOT_SEQ_END)) {
413,130✔
331
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
332
      goto _OUT;
×
333
    }
334
    pBlk->sendTimeMs = nowMs;
413,130✔
335
    pSender->pSndBuf->entries[pSender->seq % pSender->pSndBuf->size] = pBlk;
413,130✔
336
    pBlk = NULL;
413,130✔
337
    pSender->pSndBuf->end = TMAX(pSender->seq + 1, pSender->pSndBuf->end);
413,130✔
338
  }
339
  pSender->lastSendTime = nowMs;
479,737✔
340

341
_OUT:;
513,108✔
342
  if (pBlk != NULL) {
513,108✔
343
    syncSnapBlockDestroy(pBlk);
33,371✔
344
    pBlk = NULL;
33,371✔
345
  }
346
  TAOS_RETURN(code);
513,108✔
347
}
348

349
// send snapshot data from cache
350
int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
×
351
  SSyncSnapBuffer *pSndBuf = pSender->pSndBuf;
×
352
  int32_t          code = 0;
×
353
  (void)taosThreadMutexLock(&pSndBuf->mutex);
×
354
  if (pSender->pReader == NULL || pSender->finish || !snapshotSenderIsStart(pSender)) {
×
355
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
356
    goto _out;
×
357
  }
358

359
  for (int32_t seq = pSndBuf->cursor + 1; seq < pSndBuf->end; ++seq) {
×
360
    SyncSnapBlock *pBlk = pSndBuf->entries[seq % pSndBuf->size];
×
361
    if (!pBlk) {
×
362
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
363
      goto _out;
×
364
    }
365
    int64_t nowMs = taosGetTimestampMs();
×
366
    if (pBlk->acked || nowMs < pBlk->sendTimeMs + SYNC_SNAP_RESEND_MS) {
×
367
      continue;
×
368
    }
369
    if ((code = syncSnapSendMsg(pSender, pBlk->seq, pBlk->pBlock, pBlk->blockLen, 0)) != 0) {
×
370
      goto _out;
×
371
    }
372
    pBlk->sendTimeMs = nowMs;
×
373
  }
374

375
  if (pSender->seq != SYNC_SNAPSHOT_SEQ_END && pSndBuf->end <= pSndBuf->start) {
×
376
    if ((code = snapshotSend(pSender)) != 0) {
×
377
      goto _out;
×
378
    }
379
  }
380

381
  if (pSender->seq == SYNC_SNAPSHOT_SEQ_END && pSndBuf->end <= pSndBuf->start) {
×
382
    if ((code = syncSnapSendMsg(pSender, pSender->seq, NULL, 0, 0)) != 0) {
×
383
      goto _out;
×
384
    }
385
  }
386
_out:;
×
387
  (void)taosThreadMutexUnlock(&pSndBuf->mutex);
×
388
  TAOS_RETURN(code);
×
389
}
390

391
int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId, char *reason) {
33,872✔
392
  SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, pDestId);
33,872✔
393
  if (pSender == NULL) {
33,872✔
394
    sNError(pSyncNode, "snapshot sender start error since get failed");
×
395
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
396
  }
397

398
  if (snapshotSenderIsStart(pSender)) {
33,872✔
399
    sSDebug(pSender, "snapshot sender already start, ignore");
501✔
400
    return 0;
501✔
401
  }
402

403
  taosMsleep(1);
33,371✔
404

405
  sInfo("vgId:%d, snapshot replication progress:1/8:leader:1/4 to dnode:%d, reason:%s", pSyncNode->vgId, DID(pDestId),
33,371✔
406
        reason);
407

408
  int32_t code = snapshotSenderStart(pSender);
33,371✔
409
  if (code != 0) {
33,371✔
410
    sSError(pSender, "snapshot sender start error since %s", tstrerror(code));
×
411
    TAOS_RETURN(code);
×
412
  }
413

414
  return 0;
33,371✔
415
}
416

417
// receiver
418
int32_t snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId, SSyncSnapshotReceiver **ppReceiver) {
5,531,831✔
419
  int32_t code = 0;
5,531,831✔
420
  *ppReceiver = NULL;
5,531,831✔
421
  bool condition = (pSyncNode->pFsm->FpSnapshotStartWrite != NULL) && (pSyncNode->pFsm->FpSnapshotStopWrite != NULL) &&
11,063,662✔
422
                   (pSyncNode->pFsm->FpSnapshotDoWrite != NULL);
5,531,831✔
423
  if (!condition) {
5,531,831✔
424
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
425
  }
426

427
  SSyncSnapshotReceiver *pReceiver = taosMemoryCalloc(1, sizeof(SSyncSnapshotReceiver));
5,531,831✔
428
  if (pReceiver == NULL) {
5,530,195✔
429
    TAOS_RETURN(terrno);
×
430
  }
431

432
  pReceiver->start = false;
5,530,195✔
433
  pReceiver->receiverStartTime = 0;
5,530,195✔
434
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
5,530,195✔
435
  pReceiver->pWriter = NULL;
5,531,023✔
436
  code = taosThreadMutexInit(&pReceiver->writerMutex, NULL);
5,531,023✔
437
  if (code != 0) {
5,531,831✔
438
    taosMemoryFree(pReceiver);
×
439
    pReceiver = NULL;
×
440
    TAOS_RETURN(code);
×
441
  }
442
  pReceiver->pSyncNode = pSyncNode;
5,531,831✔
443
  pReceiver->fromId = fromId;
5,531,831✔
444
  pReceiver->term = raftStoreGetTerm(pSyncNode);
5,531,831✔
445
  pReceiver->snapshot.data = NULL;
5,531,831✔
446
  pReceiver->snapshot.lastApplyIndex = SYNC_INDEX_INVALID;
5,531,831✔
447
  pReceiver->snapshot.lastApplyTerm = 0;
5,531,831✔
448
  pReceiver->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;
5,531,831✔
449

450
  SSyncSnapBuffer *pRcvBuf = NULL;
5,531,831✔
451
  code = syncSnapBufferCreate(&pRcvBuf);
5,531,831✔
452
  if (pRcvBuf == NULL) {
5,531,831✔
453
    int32_t ret = taosThreadMutexDestroy(&pReceiver->writerMutex);
×
454
    if (ret != 0) {
×
455
      sError("failed to destroy mutex since %s", tstrerror(ret));
×
456
    }
457
    taosMemoryFree(pReceiver);
×
458
    pReceiver = NULL;
×
459
    TAOS_RETURN(code);
×
460
  }
461
  pRcvBuf->entryDeleteCb = rpcFreeCont;
5,531,831✔
462
  pReceiver->pRcvBuf = pRcvBuf;
5,531,831✔
463

464
  syncSnapBufferReset(pReceiver->pRcvBuf);
5,531,831✔
465
  *ppReceiver = pReceiver;
5,531,831✔
466
  TAOS_RETURN(code);
5,531,831✔
467
}
468

469
static int32_t snapshotReceiverClearInfoData(SSyncSnapshotReceiver *pReceiver) {
5,565,110✔
470
  if (pReceiver->snapshotParam.data) {
5,565,110✔
471
    taosMemoryFree(pReceiver->snapshotParam.data);
33,348✔
472
    pReceiver->snapshotParam.data = NULL;
33,348✔
473
  }
474

475
  if (pReceiver->snapshot.data) {
5,565,110✔
476
    taosMemoryFree(pReceiver->snapshot.data);
×
477
    pReceiver->snapshot.data = NULL;
×
478
  }
479
  return 0;
5,565,110✔
480
}
481

482
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
5,531,762✔
483
  if (pReceiver == NULL) return;
5,531,762✔
484

485
  (void)taosThreadMutexLock(&pReceiver->writerMutex);
5,531,762✔
486
  // close writer
487
  if (pReceiver->pWriter != NULL) {
5,531,762✔
488
    int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
×
489
                                                                   false, &pReceiver->snapshot);
490
    if (code != 0) {
×
491
      sError("vgId:%d, snapshot receiver stop failed while destroy since %s", pReceiver->pSyncNode->vgId,
×
492
             tstrerror(code));
493
    }
494
    pReceiver->pWriter = NULL;
×
495
  }
496
  (void)taosThreadMutexUnlock(&pReceiver->writerMutex);
5,531,762✔
497

498
  (void)taosThreadMutexDestroy(&pReceiver->writerMutex);
5,531,762✔
499

500
  // free snap buf
501
  if (pReceiver->pRcvBuf) {
5,531,762✔
502
    syncSnapBufferDestroy(&pReceiver->pRcvBuf);
5,531,762✔
503
  }
504

505
  (void)snapshotReceiverClearInfoData(pReceiver);
5,531,762✔
506

507
  // free receiver
508
  taosMemoryFree(pReceiver);
5,531,762✔
509
}
510

511
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) {
15,654,356✔
512
  return (pReceiver != NULL ? atomic_load_8(&pReceiver->start) : false);
15,654,356✔
513
}
514

515
static int32_t snapshotReceiverSignatureCmp(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
478,284✔
516
  int32_t code = 0;
478,284✔
517
  if (pReceiver->term < pMsg->term) {
478,284✔
518
    code = -1;
×
519
    goto _OVER;
×
520
  }
521
  if (pReceiver->term > pMsg->term) {
478,284✔
522
    code = 1;
×
523
    goto _OVER;
×
524
  }
525
  if (pReceiver->receiverStartTime < pMsg->snapStartTime) {
478,284✔
526
    code = -2;
×
527
    goto _OVER;
×
528
  }
529
  if (pReceiver->receiverStartTime > pMsg->snapStartTime) {
478,284✔
530
    code = 2;
×
531
    goto _OVER;
×
532
  }
533
_OVER:
478,284✔
534
  if (code > 0) {
478,284✔
535
    sRError(pReceiver, "receiver signature failed, stale snapshot, result:%d, msg signature:(%" PRId64 ", %" PRId64 ")",
×
536
            code, pMsg->term, pMsg->snapStartTime);
537
  } else if (code < 0) {
478,284✔
538
    sRWarn(pReceiver,
×
539
           "receiver signature failed, result:%d, a newer snapshot, msg signature:(%" PRId64 ", %" PRId64 ")", code,
540
           pMsg->term, pMsg->snapStartTime);
541
  }
542
  return code;
478,284✔
543
}
544

545
static int32_t snapshotReceiverStartWriter(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) {
33,348✔
546
  if (pReceiver->pWriter != NULL) {
33,348✔
547
    sRError(pReceiver, "snapshot receiver writer already started before");
×
548
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
549
  }
550

551
  // update ack
552
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
33,348✔
553

554
  // update snapshot
555
  pReceiver->snapshot.lastApplyIndex = pBeginMsg->lastIndex;
33,348✔
556
  pReceiver->snapshot.lastApplyTerm = pBeginMsg->lastTerm;
33,348✔
557
  pReceiver->snapshot.lastConfigIndex = pBeginMsg->lastConfigIndex;
33,348✔
558
  pReceiver->snapshotParam.start = pBeginMsg->beginIndex;
33,348✔
559
  pReceiver->snapshotParam.end = pBeginMsg->lastIndex;
33,348✔
560

561
  // start writer
562
  int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, &pReceiver->snapshotParam,
33,348✔
563
                                                                  &pReceiver->pWriter);
564
  if (code != 0) {
33,348✔
565
    sRError(pReceiver, "snapshot receiver start writer failed since %s", tstrerror(code));
×
566
    TAOS_RETURN(code);
×
567
  }
568

569
  // event log
570
  sRInfo(pReceiver, "snapshot receiver writer started");
33,348✔
571
  return 0;
33,348✔
572
}
573

574
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pPreMsg) {
33,348✔
575
  if (snapshotReceiverIsStart(pReceiver)) {
33,348✔
576
    sRInfo(pReceiver, "snapshot receiver has started");
×
577
    return;
×
578
  }
579

580
  int8_t started = atomic_val_compare_exchange_8(&pReceiver->start, false, true);
33,348✔
581
  if (started) return;
33,348✔
582

583
  pReceiver->ack = SYNC_SNAPSHOT_SEQ_PREP;
33,348✔
584
  pReceiver->term = pPreMsg->term;
33,348✔
585
  pReceiver->fromId = pPreMsg->srcId;
33,348✔
586
  pReceiver->receiverStartTime = pPreMsg->snapStartTime;
33,348✔
587

588
  pReceiver->snapshotParam.start = syncNodeGetSnapBeginIndex(pReceiver->pSyncNode);
33,348✔
589
  pReceiver->snapshotParam.end = -1;
33,348✔
590

591
  sRInfo(pReceiver, "snapshot receiver start, from dnode:%d.", DID(&pReceiver->fromId));
33,348✔
592
}
593

594
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
33,348✔
595
  sRDebug(pReceiver, "snapshot receiver stop, not apply, writer:%p", pReceiver->pWriter);
33,348✔
596

597
  int8_t stopped = !atomic_val_compare_exchange_8(&pReceiver->start, true, false);
33,348✔
598
  if (stopped) return;
33,348✔
599

600
  (void)taosThreadMutexLock(&pReceiver->writerMutex);
33,348✔
601
  {
602
    if (pReceiver->pWriter != NULL) {
33,348✔
603
      int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
133✔
604
                                                                     false, &pReceiver->snapshot);
605
      if (code != 0) {
133✔
606
        sRError(pReceiver, "snapshot receiver stop write failed since %s", tstrerror(code));
×
607
      }
608
      pReceiver->pWriter = NULL;
133✔
609
    } else {
610
      sRInfo(pReceiver, "snapshot receiver stop, writer is null");
33,215✔
611
    }
612
  }
613
  (void)taosThreadMutexUnlock(&pReceiver->writerMutex);
33,348✔
614

615
  (void)taosThreadMutexLock(&pReceiver->pRcvBuf->mutex);
33,348✔
616
  {
617
    syncSnapBufferReset(pReceiver->pRcvBuf);
33,348✔
618

619
    (void)snapshotReceiverClearInfoData(pReceiver);
33,348✔
620
  }
621
  (void)taosThreadMutexUnlock(&pReceiver->pRcvBuf->mutex);
33,348✔
622
}
623

624
static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
33,215✔
625
  int32_t code = 0;
33,215✔
626
  if (pReceiver->pWriter != NULL) {
33,215✔
627
    // write data
628
    sRInfo(pReceiver, "snapshot receiver write about to finish, blockLen:%d seq:%d", pMsg->dataLen, pMsg->seq);
33,215✔
629
    if (pMsg->dataLen > 0) {
33,215✔
630
      (void)taosThreadMutexLock(&pReceiver->writerMutex);
×
631
      code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, pMsg->data,
×
632
                                                           pMsg->dataLen);
×
633
      (void)taosThreadMutexUnlock(&pReceiver->writerMutex);
×
634
      if (code != 0) {
×
635
        sRError(pReceiver, "failed to finish snapshot receiver write since %s", tstrerror(code));
×
636
        TAOS_RETURN(code);
×
637
      }
638
    }
639

640
    // update commit index
641
    if (pReceiver->snapshot.lastApplyIndex > pReceiver->pSyncNode->commitIndex) {
33,215✔
642
      pReceiver->pSyncNode->commitIndex = pReceiver->snapshot.lastApplyIndex;
33,215✔
643
    }
644

645
    // maybe update term
646
    if (pReceiver->snapshot.lastApplyTerm > raftStoreGetTerm(pReceiver->pSyncNode)) {
33,215✔
647
      raftStoreSetTerm(pReceiver->pSyncNode, pReceiver->snapshot.lastApplyTerm);
×
648
    }
649

650
    (void)taosThreadMutexLock(&pReceiver->writerMutex);
33,215✔
651
    if (pReceiver->pWriter != NULL) {
33,215✔
652
      // stop writer, apply data
653
      code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true,
33,215✔
654
                                                             &pReceiver->snapshot);
655
      if (code != 0) {
33,215✔
656
        sRError(pReceiver, "snapshot receiver apply failed since %s", tstrerror(code));
×
657
        TAOS_RETURN(code);
×
658
      }
659
      pReceiver->pWriter = NULL;
33,215✔
660
      sRInfo(pReceiver, "snapshot receiver write stopped");
33,215✔
661
    }
662
    (void)taosThreadMutexUnlock(&pReceiver->writerMutex);
33,215✔
663

664
    // update progress
665
    pReceiver->ack = SYNC_SNAPSHOT_SEQ_END;
33,215✔
666

667
    // get fsmState
668
    SSnapshot snapshot = {0};
33,215✔
669
    code = pReceiver->pSyncNode->pFsm->FpGetSnapshotInfo(pReceiver->pSyncNode->pFsm, &snapshot);
33,215✔
670
    if (code != 0) {
33,215✔
671
      sRError(pReceiver, "snapshot receiver get snapshot info failed since %s", tstrerror(code));
×
672
      TAOS_RETURN(code);
×
673
    }
674
    pReceiver->pSyncNode->fsmState = snapshot.state;
33,215✔
675

676
    // reset wal
677
    code =
678
        pReceiver->pSyncNode->pLogStore->syncLogRestoreFromSnapshot(pReceiver->pSyncNode->pLogStore, pMsg->lastIndex);
33,215✔
679
    if (code != 0) {
33,215✔
680
      sRError(pReceiver, "failed to snapshot receiver log restore since %s", tstrerror(code));
×
681
      TAOS_RETURN(code);
×
682
    }
683
    sRInfo(pReceiver, "wal log restored from snapshot");
33,215✔
684
  } else {
685
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
686
    sRError(pReceiver, "snapshot receiver finish error since writer is null");
×
687
    TAOS_RETURN(code);
×
688
  }
689

690
  return 0;
33,215✔
691
}
692

693
static int32_t snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
410,524✔
694
  if (pMsg->seq != pReceiver->ack + 1) {
410,524✔
695
    sRError(pReceiver, "snapshot receiver invalid seq, ack:%d seq:%d", pReceiver->ack, pMsg->seq);
×
696
    TAOS_RETURN(TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG);
×
697
  }
698

699
  (void)taosThreadMutexLock(&pReceiver->writerMutex);
410,524✔
700

701
  if (pReceiver->pWriter == NULL) {
410,524✔
702
    (void)taosThreadMutexUnlock(&pReceiver->writerMutex);
133✔
703
    sRError(pReceiver, "snapshot receiver failed to write data since writer is null");
133✔
704
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
133✔
705
  }
706

707
  sRDebug(pReceiver, "snapshot receiver continue to write, blockLen:%d seq:%d", pMsg->dataLen, pMsg->seq);
410,391✔
708

709
  if (pMsg->dataLen > 0) {
410,391✔
710
    // apply data block
711
    int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
820,782✔
712
                                                                 pMsg->data, pMsg->dataLen);
410,391✔
713
    if (code != 0) {
410,391✔
714
      (void)taosThreadMutexUnlock(&pReceiver->writerMutex);
×
715
      sRError(pReceiver, "snapshot receiver continue write failed since %s", tstrerror(code));
×
716
      TAOS_RETURN(code);
×
717
    }
718
  }
719

720
  (void)taosThreadMutexUnlock(&pReceiver->writerMutex);
410,391✔
721

722
  // update progress
723
  pReceiver->ack = pMsg->seq;
410,391✔
724

725
  // event log
726
  sRDebug(pReceiver, "snapshot receiver continue to write finish");
410,391✔
727
  return 0;
410,391✔
728
}
729

730
SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths) {
66,696✔
731
  SyncIndex snapStart = SYNC_INDEX_INVALID;
66,696✔
732

733
  if (syncNodeIsMnode(ths)) {
66,696✔
734
    snapStart = SYNC_INDEX_BEGIN;
×
735
    sNInfo(ths, "snapshot begin index is %" PRId64 " since its mnode", snapStart);
×
736
  } else {
737
    SSyncLogStoreData *pData = ths->pLogStore->data;
66,696✔
738
    SWal              *pWal = pData->pWal;
66,696✔
739

740
    int64_t walCommitVer = walGetCommittedVer(pWal);
66,696✔
741
    snapStart = TMAX(ths->commitIndex, walCommitVer) + 1;
66,696✔
742

743
    sNInfo(ths, "snapshot begin index is %" PRId64, snapStart);
66,696✔
744
  }
745

746
  return snapStart;
66,696✔
747
}
748

749
static int32_t syncSnapReceiverExchgSnapInfo(SSyncNode *pSyncNode, SSyncSnapshotReceiver *pReceiver,
33,348✔
750
                                             SyncSnapshotSend *pMsg, SSnapshot *pInfo) {
751
  if (pMsg->payloadType != TDMT_SYNC_PREP_SNAPSHOT) return TSDB_CODE_SYN_INTERNAL_ERROR;
33,348✔
752
  int32_t code = 0, lino = 0;
33,348✔
753

754
  // copy snap info from leader
755
  void *data = taosMemoryCalloc(1, pMsg->dataLen);
33,348✔
756
  if (data == NULL) {
33,348✔
757
    TAOS_CHECK_EXIT(terrno);
×
758
  }
759
  pInfo->data = data;
33,348✔
760
  data = NULL;
33,348✔
761
  (void)memcpy(pInfo->data, pMsg->data, pMsg->dataLen);
33,348✔
762

763
  // exchange snap info
764
  if ((code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, pInfo)) != 0) {
33,348✔
765
    sRError(pReceiver, "failed to get snapshot info. type: %d", pMsg->payloadType);
×
766
    goto _exit;
×
767
  }
768
  SSyncTLV *datHead = pInfo->data;
33,348✔
769
  if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
33,348✔
770
    sRError(pReceiver, "unexpected data typ in data of snapshot info. typ: %d", datHead->typ);
×
771
    code = TSDB_CODE_INVALID_DATA_FMT;
×
772
    goto _exit;
×
773
  }
774
  int32_t dataLen = sizeof(SSyncTLV) + datHead->len;
33,348✔
775

776
  // save exchanged snap info
777
  SSnapshotParam *pParam = &pReceiver->snapshotParam;
33,348✔
778
  data = taosMemoryRealloc(pParam->data, dataLen);
33,348✔
779
  if (data == NULL) {
33,348✔
780
    code = terrno;
×
781
    sError("vgId:%d, failed to realloc memory for snapshot prep due to %s. dataLen:%d", pSyncNode->vgId,
×
782
           tstrerror(code), dataLen);
783
    goto _exit;
×
784
  }
785
  pParam->data = data;
33,348✔
786
  data = NULL;
33,348✔
787
  (void)memcpy(pParam->data, pInfo->data, dataLen);
33,348✔
788

789
_exit:
33,348✔
790
  TAOS_RETURN(code);
33,348✔
791
}
792

793
static int32_t syncNodeOnSnapshotPrep(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
33,348✔
794
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
33,348✔
795
  int64_t                timeNow = taosGetTimestampMs();
33,348✔
796
  int32_t                code = 0;
33,348✔
797

798
  if (snapshotReceiverIsStart(pReceiver)) {
33,348✔
799
    // already start
800
    int32_t order = 0;
×
801
    if ((order = snapshotReceiverSignatureCmp(pReceiver, pMsg)) < 0) {  // order < 0
×
802
      sWarn("failed to prepare snapshot, received a new snapshot preparation. restart receiver.");
×
803
      goto _START_RECEIVER;
×
804
    } else if (order == 0) {  // order == 0
×
805
      sInfo("prepare snapshot, received a duplicate snapshot preparation. send reply.");
×
806
      goto _SEND_REPLY;
×
807
    } else {  // order > 0
808
      // ignore
809
      sError("failed to prepare snapshot, received a stale snapshot preparation. ignore.");
×
810
      code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
811
      goto _SEND_REPLY;
×
812
    }
813
  } else {
814
    // start new
815
    sRInfo(pReceiver, "snapshot receiver not start yet so start new one");
33,348✔
816
    goto _START_RECEIVER;
33,348✔
817
  }
818

819
_START_RECEIVER:
33,348✔
820
  if (snapshotReceiverIsStart(pReceiver)) {
33,348✔
821
    sRInfo(pReceiver, "snapshot receiver already start and force stop pre one");
×
822
    snapshotReceiverStop(pReceiver);
×
823
  }
824

825
  snapshotReceiverStart(pReceiver, pMsg);
33,348✔
826

827
_SEND_REPLY:;
33,348✔
828

829
  SSnapshot snapInfo = {.type = TDMT_SYNC_PREP_SNAPSHOT_REPLY};
33,348✔
830
  int32_t   dataLen = 0;
33,348✔
831
  if (pMsg->payloadType == TDMT_SYNC_PREP_SNAPSHOT) {
33,348✔
832
    if ((code = syncSnapReceiverExchgSnapInfo(pSyncNode, pReceiver, pMsg, &snapInfo)) != 0) {
33,348✔
833
      goto _out;
×
834
    }
835
    SSyncTLV *datHead = snapInfo.data;
33,348✔
836
    dataLen = sizeof(SSyncTLV) + datHead->len;
33,348✔
837
  }
838

839
  // send response
840
  int32_t type = (snapInfo.data) ? snapInfo.type : 0;
33,348✔
841
  if ((code = syncSnapSendRsp(pReceiver, pMsg, snapInfo.data, dataLen, type, code)) != 0) {
33,348✔
842
    goto _out;
×
843
  }
844

845
_out:
33,348✔
846
  if (snapInfo.data) {
33,348✔
847
    taosMemoryFree(snapInfo.data);
33,348✔
848
    snapInfo.data = NULL;
33,348✔
849
  }
850
  TAOS_RETURN(code);
33,348✔
851
}
852

853
static int32_t syncNodeOnSnapshotBegin(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
33,348✔
854
  // condition 1
855
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
33,348✔
856
  int32_t                code = TSDB_CODE_SYN_INTERNAL_ERROR;
33,348✔
857

858
  if (!snapshotReceiverIsStart(pReceiver)) {
33,348✔
859
    sRError(pReceiver, "failed to begin snapshot receiver since not started");
×
860
    goto _SEND_REPLY;
×
861
  }
862

863
  if (snapshotReceiverSignatureCmp(pReceiver, pMsg) != 0) {
33,348✔
864
    code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
865
    sError("failed to begin snapshot, since %s", tstrerror(code));
×
866
    goto _SEND_REPLY;
×
867
  }
868

869
  // start writer
870
  if ((code = snapshotReceiverStartWriter(pReceiver, pMsg)) != 0) {
33,348✔
871
    sRError(pReceiver, "failed to start snapshot writer since %s", tstrerror(code));
×
872
    goto _SEND_REPLY;
×
873
  }
874

875
  SyncIndex beginIndex = syncNodeGetSnapBeginIndex(pSyncNode);
33,348✔
876
  if (pReceiver->snapshotParam.start != beginIndex) {
33,348✔
877
    sRError(pReceiver, "snapshot begin index is changed unexpectedly. sver:%" PRId64 ", beginIndex:%" PRId64,
×
878
            pReceiver->snapshotParam.start, beginIndex);
879
    goto _SEND_REPLY;
×
880
  }
881

882
  code = 0;
33,348✔
883
_SEND_REPLY:
33,348✔
884

885
  // send response
886
  TAOS_CHECK_RETURN(syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, code));
33,348✔
887

888
  TAOS_RETURN(code);
33,348✔
889
}
890

891
int32_t syncSnapSendRsp(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg, void *pBlock, int32_t blockLen,
477,220✔
892
                        int32_t type, int32_t rspCode) {
893
  int32_t    code = 0;
477,220✔
894
  SSyncNode *pSyncNode = pReceiver->pSyncNode;
477,220✔
895
  // build msg
896
  SRpcMsg rpcMsg = {0};
477,220✔
897
  if ((code = syncBuildSnapshotSendRsp(&rpcMsg, blockLen, pSyncNode->vgId)) != 0) {
477,220✔
898
    sRError(pReceiver, "failed to build snapshot receiver resp since %s", tstrerror(code));
×
899
    TAOS_RETURN(code);
×
900
  }
901

902
  SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
477,220✔
903
  pRspMsg->srcId = pSyncNode->myRaftId;
477,220✔
904
  pRspMsg->destId = pMsg->srcId;
477,220✔
905
  pRspMsg->term = pMsg->term;
477,220✔
906
  pRspMsg->lastIndex = pMsg->lastIndex;
477,220✔
907
  pRspMsg->lastTerm = pMsg->lastTerm;
477,220✔
908
  pRspMsg->startTime = pMsg->snapStartTime;
477,220✔
909
  pRspMsg->ack = pMsg->seq;
477,220✔
910
  pRspMsg->code = rspCode;
477,220✔
911
  pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;
477,220✔
912
  pRspMsg->payloadType = type;
477,220✔
913

914
  if (pBlock != NULL && blockLen > 0) {
477,220✔
915
    (void)memcpy(pRspMsg->data, pBlock, blockLen);
33,348✔
916
  }
917

918
  // send msg
919
  if ((code = syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg)) != 0) {
477,220✔
920
    sRError(pReceiver, "failed to send snapshot receiver resp since %s", tstrerror(code));
×
921
    TAOS_RETURN(code);
×
922
  }
923
  return 0;
477,220✔
924
}
925

926
static int32_t syncSnapBufferRecv(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend **ppMsg) {
411,721✔
927
  int32_t           code = 0;
411,721✔
928
  SSyncSnapBuffer  *pRcvBuf = pReceiver->pRcvBuf;
411,721✔
929
  SyncSnapshotSend *pMsg = ppMsg[0];
411,721✔
930

931
  (void)taosThreadMutexLock(&pRcvBuf->mutex);
411,721✔
932

933
  if (pMsg->seq - pRcvBuf->start >= pRcvBuf->size) {
411,721✔
934
    code = TSDB_CODE_SYN_BUFFER_FULL;
×
935
    goto _out;
×
936
  }
937

938
  if (!(pRcvBuf->start <= pRcvBuf->cursor + 1 && pRcvBuf->cursor < pRcvBuf->end)) return TSDB_CODE_SYN_INTERNAL_ERROR;
411,721✔
939

940
  if (pMsg->seq > pRcvBuf->cursor) {
411,721✔
941
    if (pRcvBuf->entries[pMsg->seq % pRcvBuf->size]) {
411,721✔
942
      pRcvBuf->entryDeleteCb(pRcvBuf->entries[pMsg->seq % pRcvBuf->size]);
×
943
    }
944
    pRcvBuf->entries[pMsg->seq % pRcvBuf->size] = pMsg;
411,721✔
945
    ppMsg[0] = NULL;
411,721✔
946
    pRcvBuf->end = TMAX(pMsg->seq + 1, pRcvBuf->end);
411,721✔
947
  } else if (pMsg->seq < pRcvBuf->start) {
×
948
    code = syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, code);
×
949
    goto _out;
×
950
  }
951

952
  for (int64_t seq = pRcvBuf->cursor + 1; seq < pRcvBuf->end; ++seq) {
822,245✔
953
    if (pRcvBuf->entries[seq % pRcvBuf->size]) {
595,315✔
954
      pRcvBuf->cursor = seq;
410,524✔
955
    } else {
956
      break;
184,791✔
957
    }
958
  }
959

960
  for (int64_t seq = pRcvBuf->start; seq <= pRcvBuf->cursor; ++seq) {
822,112✔
961
    if ((code = snapshotReceiverGotData(pReceiver, pRcvBuf->entries[seq % pRcvBuf->size])) != 0) {
410,524✔
962
      if (code >= SYNC_SNAPSHOT_SEQ_INVALID) {
133✔
963
        code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
964
      }
965
    }
966
    pRcvBuf->start = seq + 1;
410,524✔
967
    if (syncSnapSendRsp(pReceiver, pRcvBuf->entries[seq % pRcvBuf->size], NULL, 0, 0, code) != 0) {
410,524✔
968
      sError("failed to send snap rsp");
×
969
    }
970
    pRcvBuf->entryDeleteCb(pRcvBuf->entries[seq % pRcvBuf->size]);
410,524✔
971
    pRcvBuf->entries[seq % pRcvBuf->size] = NULL;
410,524✔
972
    if (code) goto _out;
410,524✔
973
  }
974

975
_out:
411,588✔
976
  (void)taosThreadMutexUnlock(&pRcvBuf->mutex);
411,721✔
977
  TAOS_RETURN(code);
411,721✔
978
}
979

980
static int32_t syncNodeOnSnapshotReceive(SSyncNode *pSyncNode, SyncSnapshotSend **ppMsg) {
411,721✔
981
  // condition 4
982
  // transfering
983
  SyncSnapshotSend *pMsg = ppMsg[0];
411,721✔
984
  if (!pMsg) return TSDB_CODE_SYN_INTERNAL_ERROR;
411,721✔
985
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
411,721✔
986
  int64_t                timeNow = taosGetTimestampMs();
411,721✔
987
  int32_t                code = 0;
411,721✔
988

989
  if (snapshotReceiverSignatureCmp(pReceiver, pMsg) != 0) {
411,721✔
990
    code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
991
    sError("failed to receive snapshot data, since %s", tstrerror(code));
×
992
    return syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, code);
×
993
  }
994

995
  return syncSnapBufferRecv(pReceiver, ppMsg);
411,721✔
996
}
997

998
static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
33,215✔
999
  // condition 2
1000
  // end, finish FSM
1001
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
33,215✔
1002
  int64_t                timeNow = taosGetTimestampMs();
33,215✔
1003
  int32_t                code = 0;
33,215✔
1004

1005
  if (snapshotReceiverSignatureCmp(pReceiver, pMsg) != 0) {
33,215✔
1006
    code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
1007
    sError("failed to end snapshot, since %s", tstrerror(code));
×
1008
    goto _SEND_REPLY;
×
1009
  }
1010

1011
  code = snapshotReceiverFinish(pReceiver, pMsg);
33,215✔
1012
  if (code == 0) {
33,215✔
1013
    snapshotReceiverStop(pReceiver);
33,215✔
1014
  }
1015

1016
_SEND_REPLY:;
×
1017

1018
  // build msg
1019
  SRpcMsg rpcMsg = {0};
33,215✔
1020
  if ((code = syncBuildSnapshotSendRsp(&rpcMsg, 0, pSyncNode->vgId)) != 0) {
33,215✔
1021
    sRError(pReceiver, "snapshot receiver build rsp failed since %s", tstrerror(code));
×
1022
    TAOS_RETURN(code);
×
1023
  }
1024

1025
  SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
33,215✔
1026
  pRspMsg->srcId = pSyncNode->myRaftId;
33,215✔
1027
  pRspMsg->destId = pMsg->srcId;
33,215✔
1028
  pRspMsg->term = raftStoreGetTerm(pSyncNode);
33,215✔
1029
  pRspMsg->lastIndex = pMsg->lastIndex;
33,215✔
1030
  pRspMsg->lastTerm = pMsg->lastTerm;
33,215✔
1031
  pRspMsg->startTime = pMsg->snapStartTime;
33,215✔
1032
  pRspMsg->ack = pReceiver->ack;  // receiver maybe already closed
33,215✔
1033
  pRspMsg->code = code;
33,215✔
1034
  pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;
33,215✔
1035

1036
  // send msg
1037
  syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver end", &rpcMsg.info.traceId);
33,215✔
1038
  if ((code = syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg)) != 0) {
33,215✔
1039
    sRError(pReceiver, "snapshot receiver send rsp failed since %s", tstrerror(code));
×
1040
    TAOS_RETURN(code);
×
1041
  }
1042

1043
  TAOS_RETURN(code);
33,215✔
1044
}
1045

1046
int64_t lastRecvPrintLog = 0;
1047

1048
int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
511,632✔
1049
  SyncSnapshotSend     **ppMsg = (SyncSnapshotSend **)&pRpcMsg->pCont;
511,632✔
1050
  SyncSnapshotSend      *pMsg = ppMsg[0];
511,632✔
1051
  SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
511,632✔
1052
  int32_t                code = 0;
511,632✔
1053

1054
  // if already drop replica, do not process
1055
  if (!syncNodeInRaftGroup(pSyncNode, &pMsg->srcId)) {
511,632✔
1056
    syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "not in my config", &pRpcMsg->info.traceId);
×
1057
    code = TSDB_CODE_SYN_NOT_IN_RAFT_GROUP;
×
1058
    TAOS_RETURN(code);
×
1059
  }
1060

1061
  if (pMsg->term < raftStoreGetTerm(pSyncNode)) {
511,632✔
1062
    sRError(pReceiver, "reject snap replication with smaller term. msg term:%" PRId64 ", seq:%d", pMsg->term,
×
1063
            pMsg->seq);
1064
    code = TSDB_CODE_SYN_TERM_NOT_MATCH;
×
1065
    if (syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, code) != 0) sError("failed to send snap rsp");
×
1066
    TAOS_RETURN(code);
×
1067
  }
1068

1069
  if (pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole != TAOS_SYNC_ROLE_LEARNER) {
511,632✔
1070
    if (pMsg->term > raftStoreGetTerm(pSyncNode)) {
237,216✔
1071
      syncNodeStepDown(pSyncNode, pMsg->term, pMsg->srcId, "snapshot");
×
1072
    }
1073
  } else {
1074
    syncNodeUpdateTermWithoutStepDown(pSyncNode, pMsg->term);
274,416✔
1075
  }
1076

1077
  if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER && pSyncNode->state != TAOS_SYNC_STATE_LEARNER) {
511,632✔
1078
    sRError(pReceiver, "snapshot receiver not a follower or learner");
×
1079
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1080
    TAOS_RETURN(code);
×
1081
  }
1082

1083
  if (pMsg->seq < SYNC_SNAPSHOT_SEQ_PREP || pMsg->seq > SYNC_SNAPSHOT_SEQ_END) {
511,632✔
1084
    sRError(pReceiver, "snap replication msg with invalid seq:%d", pMsg->seq);
×
1085
    code = TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG;
×
1086
    TAOS_RETURN(code);
×
1087
  }
1088

1089
  // prepare
1090
  if (pMsg->seq == SYNC_SNAPSHOT_SEQ_PREP) {
511,632✔
1091
    sInfo(
33,348✔
1092
        "vgId:%d, snapshot replication progress:2/8:follower:1/4, start to prepare, recv msg:%s, snap seq:%d, msg "
1093
        "signature:(%" PRId64 ", %" PRId64 ")",
1094
        pSyncNode->vgId, TMSG_INFO(pRpcMsg->msgType), pMsg->seq, pMsg->term, pMsg->snapStartTime);
1095
    pSyncNode->snapSeq = pMsg->seq;
33,348✔
1096
    code = syncNodeOnSnapshotPrep(pSyncNode, pMsg);
33,348✔
1097
    sDebug(
33,348✔
1098
        "vgId:%d, snapshot replication progress:2/8:follower:1/4, finish to prepare, recv msg:%s, snap seq:%d, msg "
1099
        "signature:(%" PRId64 ", %" PRId64 ")",
1100
        pSyncNode->vgId, TMSG_INFO(pRpcMsg->msgType), pMsg->seq, pMsg->term, pMsg->snapStartTime);
1101
    goto _out;
33,348✔
1102
  }
1103

1104
  // begin
1105
  if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) {
478,284✔
1106
    sInfo("vgId:%d, snapshot replication progress:4/8:follower:2/4, start to begin,replication. msg signature:(%" PRId64
33,348✔
1107
          ", %" PRId64 "), snapshot msg seq:%d",
1108
          pSyncNode->vgId, pMsg->term, pMsg->snapStartTime, pMsg->seq);
1109
    pSyncNode->snapSeq = pMsg->seq;
33,348✔
1110
    code = syncNodeOnSnapshotBegin(pSyncNode, pMsg);
33,348✔
1111
    sDebug("vgId:%d, snapshot replication progress:4/8:follower:2/4, finish to begin. msg signature:(%" PRId64
33,348✔
1112
           ", %" PRId64 ")",
1113
           pSyncNode->vgId, pMsg->term, pMsg->snapStartTime);
1114
    goto _out;
33,348✔
1115
  }
1116

1117
  // data
1118
  if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
444,936✔
1119
    int64_t currentTimestamp = taosGetTimestampMs()/1000;
411,721✔
1120
    if (currentTimestamp > lastRecvPrintLog) {
411,721✔
1121
      sInfo("vgId:%d, snapshot replication progress:6/8:follower:3/4, start to receive. msg signature:(%" PRId64
36,917✔
1122
            ", %" PRId64 "), snapshot msg seq:%d",
1123
            pSyncNode->vgId, pMsg->term, pMsg->snapStartTime, pMsg->seq);
1124

1125
    } else {
1126
      sDebug("vgId:%d, snapshot replication progress:6/8:follower:3/4, start to receive. msg signature:(%" PRId64
374,804✔
1127
             ", %" PRId64 "), snapshot msg seq:%d",
1128
             pSyncNode->vgId, pMsg->term, pMsg->snapStartTime, pMsg->seq);
1129
    }
1130
    pSyncNode->snapSeq = pMsg->seq;
411,721✔
1131
    lastRecvPrintLog = currentTimestamp;
411,721✔
1132
    code = syncNodeOnSnapshotReceive(pSyncNode, ppMsg);
411,721✔
1133
    sDebug("vgId:%d, snapshot replication progress:6/8:follower:3/4, finish to receive.", pSyncNode->vgId);
411,721✔
1134
    goto _out;
411,721✔
1135
  }
1136

1137
  // end
1138
  if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
33,215✔
1139
    sInfo("vgId:%d, snapshot replication progress:7/8:follower:4/4, start to end. msg signature:(%" PRId64 ", %" PRId64
33,215✔
1140
          "), snapshot msg seq:%d",
1141
          pSyncNode->vgId, pMsg->term, pMsg->snapStartTime, pMsg->seq);
1142
    pSyncNode->snapSeq = pMsg->seq;
33,215✔
1143
    code = syncNodeOnSnapshotEnd(pSyncNode, pMsg);
33,215✔
1144
    if (code != 0) {
33,215✔
1145
      sRError(pReceiver, "failed to end snapshot.");
×
1146
      goto _out;
×
1147
    }
1148

1149
    code = syncLogBufferReInit(pSyncNode->pLogBuf, pSyncNode);
33,215✔
1150
    if (code != 0) {
33,215✔
1151
      sRError(pReceiver, "failed to reinit log buffer since %s", tstrerror(code));
×
1152
    }
1153
    sDebug("vgId:%d, snapshot replication progress:7/7:follower:4/4, finish to end. msg signature:(%" PRId64
33,215✔
1154
           ", %" PRId64 ")",
1155
           pSyncNode->vgId, pMsg->term, pMsg->snapStartTime);
1156
    goto _out;
33,215✔
1157
  }
1158

1159
_out:;
×
1160
  syncNodeResetElectTimer(pSyncNode);
511,632✔
1161
  TAOS_RETURN(code);
511,632✔
1162
}
1163

1164
static int32_t syncSnapSenderExchgSnapInfo(SSyncNode *pSyncNode, SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
33,371✔
1165
  if (pMsg->payloadType != TDMT_SYNC_PREP_SNAPSHOT_REPLY) return TSDB_CODE_SYN_INTERNAL_ERROR;
33,371✔
1166

1167
  SSyncTLV *datHead = (void *)pMsg->data;
33,371✔
1168
  if (datHead->typ != pMsg->payloadType) {
33,371✔
1169
    sSError(pSender, "unexpected data type in data of SyncSnapshotRsp. typ: %d", datHead->typ);
×
1170
    TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT);
×
1171
  }
1172
  int32_t dataLen = sizeof(SSyncTLV) + datHead->len;
33,371✔
1173

1174
  SSnapshotParam *pParam = &pSender->snapshotParam;
33,371✔
1175
  void           *data = taosMemoryRealloc(pParam->data, dataLen);
33,371✔
1176
  if (data == NULL) {
33,371✔
1177
    TAOS_RETURN(terrno);
×
1178
  }
1179
  (void)memcpy(data, pMsg->data, dataLen);
33,371✔
1180

1181
  pParam->data = data;
33,371✔
1182
  data = NULL;
33,371✔
1183
  sSInfo(pSender, "data of snapshot param. len: %d", datHead->len);
33,371✔
1184
  return 0;
33,371✔
1185
}
1186

1187
// sender
1188
static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
33,371✔
1189
  int32_t   code = 0;
33,371✔
1190
  SSnapshot snapshot = {0};
33,371✔
1191

1192
  if (pMsg->snapBeginIndex > pSyncNode->commitIndex + 1) {
33,371✔
1193
    sSError(pSender,
×
1194
            "snapshot begin index is greater than commit index. msg snapBeginIndex:%" PRId64
1195
            ", node commitIndex:%" PRId64,
1196
            pMsg->snapBeginIndex, pSyncNode->commitIndex);
1197
    TAOS_RETURN(TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG);
×
1198
  }
1199

1200
  (void)taosThreadMutexLock(&pSender->pSndBuf->mutex);
33,371✔
1201
  TAOS_CHECK_GOTO(pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot), NULL, _out);
33,371✔
1202

1203
  // prepare <begin, end>
1204
  pSender->snapshotParam.start = pMsg->snapBeginIndex;
33,371✔
1205
  pSender->snapshotParam.end = snapshot.lastApplyIndex;
33,371✔
1206

1207
  sSInfo(pSender, "prepare snapshot, recv-begin:%" PRId64 ", snapshot.last:%" PRId64 ", snapshot.term:%" PRId64,
33,371✔
1208
         pMsg->snapBeginIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
1209

1210
  // update sender
1211
  pSender->snapshot = snapshot;
33,371✔
1212

1213
  // start reader
1214
  if (pMsg->payloadType == TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
33,371✔
1215
    TAOS_CHECK_GOTO(syncSnapSenderExchgSnapInfo(pSyncNode, pSender, pMsg), NULL, _out);
33,371✔
1216
  }
1217

1218
  code = pSyncNode->pFsm->FpSnapshotStartRead(pSyncNode->pFsm, &pSender->snapshotParam, &pSender->pReader);
33,371✔
1219
  if (code != 0) {
33,371✔
1220
    sSError(pSender, "prepare snapshot failed since %s", tstrerror(code));
×
1221
    goto _out;
×
1222
  }
1223

1224
  // update next index
1225
  syncIndexMgrSetIndex(pSyncNode->pNextIndex, &pMsg->srcId, snapshot.lastApplyIndex + 1);
33,371✔
1226

1227
  code = snapshotSend(pSender);
33,371✔
1228

1229
_out:
33,371✔
1230
  (void)taosThreadMutexUnlock(&pSender->pSndBuf->mutex);
33,371✔
1231
  TAOS_RETURN(code);
33,371✔
1232
}
1233

1234
static int32_t snapshotSenderSignatureCmp(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
955,019✔
1235
  int32_t code = 0;
955,019✔
1236
  if (pSender->term < pMsg->term) return -1;
955,019✔
1237
  if (pSender->term > pMsg->term) return 1;
955,019✔
1238
  if (pSender->senderStartTime < pMsg->startTime) return -2;
955,019✔
1239
  if (pSender->senderStartTime > pMsg->startTime) return 2;
955,019✔
1240
  if (code != 0)
955,019✔
1241
    sSError(pSender, "sender signature failed, result:%d, msg signature:(%" PRId64 ", %" PRId64 ")", code, pMsg->term,
×
1242
            pMsg->startTime);
1243
  return 0;
955,019✔
1244
}
1245

1246
static int32_t syncSnapBufferSend(SSyncSnapshotSender *pSender, SyncSnapshotRsp **ppMsg) {
444,206✔
1247
  int32_t          code = 0;
444,206✔
1248
  SSyncSnapBuffer *pSndBuf = pSender->pSndBuf;
444,206✔
1249
  SyncSnapshotRsp *pMsg = ppMsg[0];
444,206✔
1250

1251
  (void)taosThreadMutexLock(&pSndBuf->mutex);
444,206✔
1252
  if (snapshotSenderSignatureCmp(pSender, pMsg) != 0) {
444,206✔
1253
    code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
1254
    sError("failed to send snapshot data, since %s", tstrerror(code));
×
1255
    goto _out;
×
1256
  }
1257

1258
  if (pSender->pReader == NULL || pSender->finish || !snapshotSenderIsStart(pSender)) {
444,206✔
1259
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1260
    goto _out;
×
1261
  }
1262

1263
  if (pMsg->ack - pSndBuf->start >= pSndBuf->size) {
444,206✔
1264
    code = TSDB_CODE_SYN_BUFFER_FULL;
×
1265
    goto _out;
×
1266
  }
1267

1268
  if (!(pSndBuf->start <= pSndBuf->cursor + 1 && pSndBuf->cursor < pSndBuf->end)) {
444,206✔
1269
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1270
    goto _out;
×
1271
  }
1272

1273
  if (pMsg->ack > pSndBuf->cursor && pMsg->ack < pSndBuf->end) {
444,206✔
1274
    SyncSnapBlock *pBlk = pSndBuf->entries[pMsg->ack % pSndBuf->size];
410,835✔
1275
    if (!pBlk) {
410,835✔
1276
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1277
      goto _out;
×
1278
    }
1279
    pBlk->acked = 1;
410,835✔
1280
  }
1281

1282
  for (int64_t ack = pSndBuf->cursor + 1; ack < pSndBuf->end; ++ack) {
855,041✔
1283
    SyncSnapBlock *pBlk = pSndBuf->entries[ack % pSndBuf->size];
788,434✔
1284
    if (pBlk->acked) {
788,434✔
1285
      pSndBuf->cursor = ack;
410,835✔
1286
    } else {
1287
      break;
377,599✔
1288
    }
1289
  }
1290

1291
  for (int64_t ack = pSndBuf->start; ack <= pSndBuf->cursor; ++ack) {
855,041✔
1292
    pSndBuf->entryDeleteCb(pSndBuf->entries[ack % pSndBuf->size]);
410,835✔
1293
    pSndBuf->entries[ack % pSndBuf->size] = NULL;
410,835✔
1294
    pSndBuf->start = ack + 1;
410,835✔
1295
  }
1296

1297
  while (pSender->seq != SYNC_SNAPSHOT_SEQ_END && pSender->seq - pSndBuf->start < tsSnapReplMaxWaitN) {
890,707✔
1298
    if ((code = snapshotSend(pSender)) != 0) {
446,501✔
1299
      goto _out;
×
1300
    }
1301
  }
1302

1303
  if (pSender->seq == SYNC_SNAPSHOT_SEQ_END && pSndBuf->end <= pSndBuf->start) {
444,206✔
1304
    if ((code = snapshotSend(pSender)) != 0) {
33,236✔
1305
      goto _out;
×
1306
    }
1307
  }
1308
_out:
444,206✔
1309
  (void)taosThreadMutexUnlock(&pSndBuf->mutex);
444,206✔
1310
  TAOS_RETURN(code);
444,206✔
1311
}
1312

1313
int64_t lastSendPrintLog = 0;
1314

1315
int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
510,813✔
1316
  SyncSnapshotRsp **ppMsg = (SyncSnapshotRsp **)&pRpcMsg->pCont;
510,813✔
1317
  SyncSnapshotRsp  *pMsg = ppMsg[0];
510,813✔
1318
  int32_t           code = 0;
510,813✔
1319

1320
  // if already drop replica, do not process
1321
  if (!syncNodeInRaftGroup(pSyncNode, &pMsg->srcId)) {
510,813✔
1322
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "maybe replica already dropped", &pRpcMsg->info.traceId);
×
1323
    TAOS_RETURN(TSDB_CODE_SYN_NOT_IN_RAFT_GROUP);
×
1324
  }
1325

1326
  // get sender
1327
  SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, &pMsg->srcId);
510,813✔
1328
  if (pSender == NULL) {
510,813✔
1329
    syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "sender is null", &pRpcMsg->info.traceId);
×
1330
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
1331
  }
1332

1333
  if (!snapshotSenderIsStart(pSender)) {
510,813✔
1334
    sSError(pSender, "snapshot sender stopped. sender startTime:%" PRId64 ", msg startTime:%" PRId64,
×
1335
            pSender->senderStartTime, pMsg->startTime);
1336
    TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
1337
  }
1338

1339
  // check signature
1340
  int32_t order = 0;
510,813✔
1341
  if ((order = snapshotSenderSignatureCmp(pSender, pMsg)) > 0) {
510,813✔
1342
    sError("failed to check snapshot rsp signature, ignore a stale snap rsp.");
×
1343
    TAOS_RETURN(TSDB_CODE_SYN_MISMATCHED_SIGNATURE);
×
1344
  } else if (order < 0) {
510,813✔
1345
    sError("failed to check snapshot rsp signature, snapshot sender is stale. stop");
×
1346
    code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
×
1347
    goto _ERROR;
×
1348
  }
1349

1350
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
510,813✔
1351
    sSError(pSender, "snapshot sender not leader");
×
1352
    code = TSDB_CODE_SYN_NOT_LEADER;
×
1353
    goto _ERROR;
×
1354
  }
1355

1356
  SyncTerm currentTerm = raftStoreGetTerm(pSyncNode);
510,813✔
1357
  if (pMsg->term != currentTerm) {
510,813✔
1358
    sSError(pSender, "snapshot sender term mismatch, msg term:%" PRId64 " currentTerm:%" PRId64, pMsg->term,
×
1359
            currentTerm);
1360
    code = TSDB_CODE_SYN_TERM_NOT_MATCH;
×
1361
    goto _ERROR;
×
1362
  }
1363

1364
  if (pMsg->code != 0) {
510,813✔
1365
    sSError(pSender, "snapshot sender receive error:%s 0x%x and stop sender", tstrerror(pMsg->code), pMsg->code);
×
1366
    code = pMsg->code;
×
1367
    goto _ERROR;
×
1368
  }
1369

1370
  // send begin
1371
  if (pMsg->ack == SYNC_SNAPSHOT_SEQ_PREP) {
510,813✔
1372
    sSInfo(pSender, "snapshot replication progress:3/8:leader:2/4, process prepare rsp, msg:%s, snap ack:%d, ",
33,371✔
1373
           TMSG_INFO(pRpcMsg->msgType), pMsg->ack);
1374
    pSyncNode->snapSeq = pMsg->ack;
33,371✔
1375
    if ((code = syncNodeOnSnapshotPrepRsp(pSyncNode, pSender, pMsg)) != 0) {
33,371✔
1376
      goto _ERROR;
×
1377
    }
1378
  }
1379

1380
  // send msg of data or end
1381
  if (pMsg->ack >= SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->ack < SYNC_SNAPSHOT_SEQ_END) {
510,813✔
1382
    int64_t currentTimestamp = taosGetTimestampMs()/1000;
444,206✔
1383
    if (currentTimestamp > lastSendPrintLog) {
444,206✔
1384
      sSInfo(pSender, "snapshot replication progress:5/8:leader:3/4, send buffer, msg:%s, snap ack:%d",
40,035✔
1385
             TMSG_INFO(pRpcMsg->msgType), pMsg->ack);
1386
    } else {
1387
      sSDebug(pSender, "snapshot replication progress:5/8:leader:3/4, send buffer, msg:%s, snap ack:%d",
404,171✔
1388
              TMSG_INFO(pRpcMsg->msgType), pMsg->ack);
1389
    }
1390
    lastSendPrintLog = currentTimestamp;
444,206✔
1391
    pSyncNode->snapSeq = pMsg->ack;
444,206✔
1392
    if ((code = syncSnapBufferSend(pSender, ppMsg)) != 0) {
444,206✔
1393
      sSError(pSender, "failed to replicate snap since %s. seq:%d, pReader:%p, finish:%d", tstrerror(code),
×
1394
              pSender->seq, pSender->pReader, pSender->finish);
1395
      goto _ERROR;
×
1396
    }
1397
  }
1398

1399
  // end
1400
  if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
510,813✔
1401
    sSInfo(pSender, "snapshot replication progress:8/8:leader:4/4, process end rsp");
33,236✔
1402
    pSyncNode->snapSeq = pMsg->ack;
33,236✔
1403
    snapshotSenderStop(pSender, true);
33,236✔
1404
    TAOS_CHECK_GOTO(syncNodeReplicateReset(pSyncNode, &pMsg->srcId), NULL, _ERROR);
33,236✔
1405
  }
1406

1407
  return 0;
510,813✔
1408

1409
_ERROR:
×
1410
  snapshotSenderStop(pSender, false);
×
1411
  if (syncNodeReplicateReset(pSyncNode, &pMsg->srcId) != 0) sError("failed to reset replicate");
×
1412
  TAOS_RETURN(code);
×
1413
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc