• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3873

21 Apr 2025 07:22AM UTC coverage: 63.063% (+0.1%) from 62.968%
#3873

push

travis-ci

GitHub
docs(opc): add perssit data support (#30783)

156631 of 316378 branches covered (49.51%)

Branch coverage included in aggregate %.

242184 of 316027 relevant lines covered (76.63%)

20271838.47 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

51.84
/source/libs/sync/src/syncMain.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "sync.h"
18
#include "syncAppendEntries.h"
19
#include "syncAppendEntriesReply.h"
20
#include "syncCommit.h"
21
#include "syncElection.h"
22
#include "syncEnv.h"
23
#include "syncIndexMgr.h"
24
#include "syncInt.h"
25
#include "syncMessage.h"
26
#include "syncPipeline.h"
27
#include "syncRaftCfg.h"
28
#include "syncRaftLog.h"
29
#include "syncRaftStore.h"
30
#include "syncReplication.h"
31
#include "syncRequestVote.h"
32
#include "syncRequestVoteReply.h"
33
#include "syncRespMgr.h"
34
#include "syncSnapshot.h"
35
#include "syncTimeout.h"
36
#include "syncUtil.h"
37
#include "syncVoteMgr.h"
38
#include "tglobal.h"
39
#include "tmisce.h"
40
#include "tref.h"
41

42
static void    syncNodeEqPingTimer(void* param, void* tmrId);
43
static void    syncNodeEqElectTimer(void* param, void* tmrId);
44
static void    syncNodeEqHeartbeatTimer(void* param, void* tmrId);
45
static int32_t syncNodeAppendNoop(SSyncNode* ths);
46
static void    syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId);
47
static bool    syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg);
48
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId);
49
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
50
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
51
static int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg);
52
static bool    syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config);
53
static int32_t syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex);
54
static bool    syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg);
55

56
static bool    syncNodeCanChange(SSyncNode* pSyncNode);
57
static int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode);
58
static int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader);
59
static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry);
60

61
static ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
62

63
int64_t syncOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
15,803✔
64
  sInfo("vgId:%d, start to open sync", pSyncInfo->vgId);
15,803✔
65
  SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo, vnodeVersion);
15,803✔
66
  if (pSyncNode == NULL) {
15,807!
67
    sError("vgId:%d, failed to open sync node", pSyncInfo->vgId);
×
68
    return -1;
×
69
  }
70

71
  pSyncNode->rid = syncNodeAdd(pSyncNode);
15,807✔
72
  if (pSyncNode->rid < 0) {
15,807!
73
    syncNodeClose(pSyncNode);
×
74
    return -1;
×
75
  }
76

77
  pSyncNode->pingBaseLine = pSyncInfo->pingMs;
15,807✔
78
  pSyncNode->pingTimerMS = pSyncInfo->pingMs;
15,807✔
79
  pSyncNode->electBaseLine = pSyncInfo->electMs;
15,807✔
80
  pSyncNode->hbBaseLine = pSyncInfo->heartbeatMs;
15,807✔
81
  pSyncNode->heartbeatTimerMS = pSyncInfo->heartbeatMs;
15,807✔
82
  pSyncNode->msgcb = pSyncInfo->msgcb;
15,807✔
83
  sInfo("vgId:%d, sync opened", pSyncInfo->vgId);
15,807✔
84
  return pSyncNode->rid;
15,807✔
85
}
86

87
int32_t syncStart(int64_t rid) {
15,806✔
88
  int32_t    code = 0;
15,806✔
89
  int32_t    vgId = 0;
15,806✔
90
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
15,806✔
91
  if (pSyncNode == NULL) {
15,806!
92
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
93
    if (terrno != 0) code = terrno;
×
94
    sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid);
×
95
    TAOS_RETURN(code);
×
96
  }
97
  vgId = pSyncNode->vgId;
15,806✔
98
  sInfo("vgId:%d, begin to start sync", pSyncNode->vgId);
15,806✔
99

100
  if ((code = syncNodeRestore(pSyncNode)) < 0) {
15,806!
101
    sError("vgId:%d, failed to restore sync log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
102
    goto _err;
×
103
  }
104
  sInfo("vgId:%d, sync node restore is executed", pSyncNode->vgId);
15,806✔
105

106
  if ((code = syncNodeStart(pSyncNode)) < 0) {
15,806!
107
    sError("vgId:%d, failed to start sync node since %s", pSyncNode->vgId, tstrerror(code));
×
108
    goto _err;
×
109
  }
110
  sInfo("vgId:%d, sync node start is executed", pSyncNode->vgId);
15,806✔
111

112
  syncNodeRelease(pSyncNode);
15,806✔
113

114
  sInfo("vgId:%d, sync started", vgId);
15,805✔
115

116
  TAOS_RETURN(code);
15,806✔
117

118
_err:
×
119
  syncNodeRelease(pSyncNode);
×
120
  TAOS_RETURN(code);
×
121
}
122

123
int32_t syncNodeGetConfig(int64_t rid, SSyncCfg* cfg) {
29,268✔
124
  int32_t    code = 0;
29,268✔
125
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
29,268✔
126

127
  if (pSyncNode == NULL) {
29,271!
128
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
129
    if (terrno != 0) code = terrno;
×
130
    sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid);
×
131
    TAOS_RETURN(code);
×
132
  }
133

134
  *cfg = pSyncNode->raftCfg.cfg;
29,271✔
135

136
  syncNodeRelease(pSyncNode);
29,271✔
137

138
  return 0;
29,270✔
139
}
140

141
void syncStop(int64_t rid) {
15,806✔
142
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
15,806✔
143
  if (pSyncNode != NULL) {
15,806!
144
    pSyncNode->isStart = false;
15,806✔
145
    syncNodeRelease(pSyncNode);
15,806✔
146
    syncNodeRemove(rid);
15,806✔
147
  }
148
}
15,802✔
149

150
void syncPreStop(int64_t rid) {
15,805✔
151
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
15,805✔
152
  if (pSyncNode != NULL) {
15,806!
153
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
15,806!
154
      sInfo("vgId:%d, stop snapshot receiver", pSyncNode->vgId);
×
155
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
156
    }
157
    syncNodePreClose(pSyncNode);
15,806✔
158
    syncNodeRelease(pSyncNode);
15,806✔
159
  }
160
}
15,805✔
161

162
void syncPostStop(int64_t rid) {
13,745✔
163
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
13,745✔
164
  if (pSyncNode != NULL) {
13,745!
165
    syncNodePostClose(pSyncNode);
13,745✔
166
    syncNodeRelease(pSyncNode);
13,744✔
167
  }
168
}
13,745✔
169

170
static bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pCfg) {
1,953✔
171
  if (!syncNodeInConfig(pSyncNode, pCfg)) return false;
1,953!
172
  return abs(pCfg->replicaNum - pSyncNode->replicaNum) <= 1;
1,953!
173
}
174

175
int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) {
2,069✔
176
  int32_t    code = 0;
2,069✔
177
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
2,069✔
178
  if (pSyncNode == NULL) {
2,069!
179
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
180
    if (terrno != 0) code = terrno;
×
181
    TAOS_RETURN(code);
×
182
  }
183

184
  if (pSyncNode->raftCfg.lastConfigIndex >= pNewCfg->lastIndex) {
2,069✔
185
    syncNodeRelease(pSyncNode);
116✔
186
    sInfo("vgId:%d, no need Reconfig, current index:%" PRId64 ", new index:%" PRId64, pSyncNode->vgId,
116!
187
          pSyncNode->raftCfg.lastConfigIndex, pNewCfg->lastIndex);
188
    return 0;
116✔
189
  }
190

191
  if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) {
1,953!
192
    syncNodeRelease(pSyncNode);
×
193
    code = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
×
194
    sError("vgId:%d, failed to reconfig since invalid new config", pSyncNode->vgId);
×
195
    TAOS_RETURN(code);
×
196
  }
197

198
  TAOS_CHECK_RETURN(syncNodeUpdateNewConfigIndex(pSyncNode, pNewCfg));
1,953!
199

200
  if (syncNodeDoConfigChange(pSyncNode, pNewCfg, pNewCfg->lastIndex) != 0) {
1,953!
201
    code = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
×
202
    sError("vgId:%d, failed to reconfig since do change error", pSyncNode->vgId);
×
203
    TAOS_RETURN(code);
×
204
  }
205

206
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER || pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
1,953!
207
    // TODO check return value
208
    TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
1,755!
209

210
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
28,080✔
211
      TAOS_CHECK_RETURN(syncHbTimerInit(pSyncNode, &pSyncNode->peerHeartbeatTimerArr[i], pSyncNode->replicasId[i]));
26,325!
212
    }
213

214
    TAOS_CHECK_RETURN(syncNodeStartHeartbeatTimer(pSyncNode));
1,755!
215
    // syncNodeReplicate(pSyncNode);
216
  }
217

218
  syncNodeRelease(pSyncNode);
1,953✔
219
  TAOS_RETURN(code);
1,953✔
220
}
221

222
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
2,900,745✔
223
  int32_t code = -1;
2,900,745✔
224
  if (!syncIsInit()) {
2,900,745!
225
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
226
    if (terrno != 0) code = terrno;
×
227
    TAOS_RETURN(code);
×
228
  }
229

230
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
2,900,728✔
231
  if (pSyncNode == NULL) {
2,901,278✔
232
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
358✔
233
    if (terrno != 0) code = terrno;
358!
234
    TAOS_RETURN(code);
×
235
  }
236

237
  switch (pMsg->msgType) {
2,900,920!
238
    case TDMT_SYNC_HEARTBEAT:
40,516✔
239
      code = syncNodeOnHeartbeat(pSyncNode, pMsg);
40,516✔
240
      break;
40,516✔
241
    case TDMT_SYNC_HEARTBEAT_REPLY:
39,586✔
242
      code = syncNodeOnHeartbeatReply(pSyncNode, pMsg);
39,586✔
243
      break;
39,586✔
244
    case TDMT_SYNC_TIMEOUT:
233,217✔
245
      code = syncNodeOnTimeout(pSyncNode, pMsg);
233,217✔
246
      break;
233,135✔
247
    case TDMT_SYNC_TIMEOUT_ELECTION:
1,453✔
248
      code = syncNodeOnTimeout(pSyncNode, pMsg);
1,453✔
249
      break;
1,453✔
250
    case TDMT_SYNC_CLIENT_REQUEST:
249,230✔
251
      code = syncNodeOnClientRequest(pSyncNode, pMsg, NULL);
249,230✔
252
      break;
249,231✔
253
    case TDMT_SYNC_REQUEST_VOTE:
2,243✔
254
      code = syncNodeOnRequestVote(pSyncNode, pMsg);
2,243✔
255
      break;
2,243✔
256
    case TDMT_SYNC_REQUEST_VOTE_REPLY:
2,043✔
257
      code = syncNodeOnRequestVoteReply(pSyncNode, pMsg);
2,043✔
258
      break;
2,043✔
259
    case TDMT_SYNC_APPEND_ENTRIES:
1,133,202✔
260
      code = syncNodeOnAppendEntries(pSyncNode, pMsg);
1,133,202✔
261
      break;
1,133,202✔
262
    case TDMT_SYNC_APPEND_ENTRIES_REPLY:
1,131,313✔
263
      code = syncNodeOnAppendEntriesReply(pSyncNode, pMsg);
1,131,313✔
264
      break;
1,131,311✔
265
    case TDMT_SYNC_SNAPSHOT_SEND:
13,899✔
266
      code = syncNodeOnSnapshot(pSyncNode, pMsg);
13,899✔
267
      break;
13,899✔
268
    case TDMT_SYNC_SNAPSHOT_RSP:
13,963✔
269
      code = syncNodeOnSnapshotRsp(pSyncNode, pMsg);
13,963✔
270
      break;
13,963✔
271
    case TDMT_SYNC_LOCAL_CMD:
40,240✔
272
      code = syncNodeOnLocalCmd(pSyncNode, pMsg);
40,240✔
273
      break;
40,240✔
274
    case TDMT_SYNC_FORCE_FOLLOWER:
15✔
275
      code = syncForceBecomeFollower(pSyncNode, pMsg);
15✔
276
      break;
15✔
277
    case TDMT_SYNC_SET_ASSIGNED_LEADER:
×
278
      code = syncBecomeAssignedLeader(pSyncNode, pMsg);
×
279
      break;
×
280
    default:
×
281
      code = TSDB_CODE_MSG_NOT_PROCESSED;
×
282
  }
283

284
  syncNodeRelease(pSyncNode);
2,900,837✔
285
  if (code != 0) {
2,900,863✔
286
    sDebug("vgId:%d, failed to process sync msg:%p type:%s since %s", pSyncNode->vgId, pMsg, TMSG_INFO(pMsg->msgType),
21!
287
           tstrerror(code));
288
  }
289
  TAOS_RETURN(code);
2,900,863✔
290
}
291

292
int32_t syncLeaderTransfer(int64_t rid) {
15,806✔
293
  int32_t    code = 0;
15,806✔
294
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
15,806✔
295
  if (pSyncNode == NULL) {
15,806!
296
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
297
    if (terrno != 0) code = terrno;
×
298
    TAOS_RETURN(code);
×
299
  }
300

301
  int32_t ret = syncNodeLeaderTransfer(pSyncNode);
15,806✔
302
  syncNodeRelease(pSyncNode);
15,806✔
303
  return ret;
15,806✔
304
}
305

306
int32_t syncForceBecomeFollower(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
15✔
307
  SRaftId id = {0};
15✔
308
  syncNodeBecomeFollower(ths, id, "force election");
15✔
309

310
  SRpcMsg rsp = {
15✔
311
      .code = 0,
312
      .pCont = pRpcMsg->info.rsp,
15✔
313
      .contLen = pRpcMsg->info.rspLen,
15✔
314
      .info = pRpcMsg->info,
315
  };
316
  tmsgSendRsp(&rsp);
15✔
317

318
  return 0;
15✔
319
}
320

321
int32_t syncBecomeAssignedLeader(SSyncNode* ths, SRpcMsg* pRpcMsg) {
×
322
  int32_t code = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
×
323
  void*   pHead = NULL;
×
324
  int32_t contLen = 0;
×
325

326
  SVArbSetAssignedLeaderReq req = {0};
×
327
  if (tDeserializeSVArbSetAssignedLeaderReq((char*)pRpcMsg->pCont + sizeof(SMsgHead), pRpcMsg->contLen, &req) != 0) {
×
328
    sError("vgId:%d, failed to deserialize SVArbSetAssignedLeaderReq", ths->vgId);
×
329
    code = TSDB_CODE_INVALID_MSG;
×
330
    goto _OVER;
×
331
  }
332

333
  if (ths->arbTerm > req.arbTerm) {
×
334
    sInfo("vgId:%d, skip to set assigned leader, msg with lower term, local:%" PRId64 "msg:%" PRId64, ths->vgId,
×
335
          ths->arbTerm, req.arbTerm);
336
    goto _OVER;
×
337
  }
338

339
  ths->arbTerm = TMAX(req.arbTerm, ths->arbTerm);
×
340

341
  if (!req.force) {
×
342
    if (strncmp(req.memberToken, ths->arbToken, TSDB_ARB_TOKEN_SIZE) != 0) {
×
343
      sInfo("vgId:%d, skip to set assigned leader, token mismatch, local:%s, msg:%s", ths->vgId, ths->arbToken,
×
344
            req.memberToken);
345
      goto _OVER;
×
346
    }
347
  }
348

349
  if (ths->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
×
350
    code = TSDB_CODE_SUCCESS;
×
351
    raftStoreNextTerm(ths);
×
352
    if (terrno != TSDB_CODE_SUCCESS) {
×
353
      code = terrno;
×
354
      sError("vgId:%d, failed to set next term since:%s", ths->vgId, tstrerror(code));
×
355
      goto _OVER;
×
356
    }
357
    syncNodeBecomeAssignedLeader(ths);
×
358

359
    if ((code = syncNodeAppendNoop(ths)) < 0) {
×
360
      sError("vgId:%d, assigned leader failed to append noop entry since %s", ths->vgId, tstrerror(code));
×
361
    }
362
  }
363

364
  SVArbSetAssignedLeaderRsp rsp = {0};
×
365
  rsp.arbToken = req.arbToken;
×
366
  rsp.memberToken = req.memberToken;
×
367
  rsp.vgId = ths->vgId;
×
368

369
  contLen = tSerializeSVArbSetAssignedLeaderRsp(NULL, 0, &rsp);
×
370
  if (contLen <= 0) {
×
371
    code = TSDB_CODE_OUT_OF_MEMORY;
×
372
    sError("vgId:%d, failed to serialize SVArbSetAssignedLeaderRsp", ths->vgId);
×
373
    goto _OVER;
×
374
  }
375
  pHead = rpcMallocCont(contLen);
×
376
  if (!pHead) {
×
377
    code = terrno;
×
378
    sError("vgId:%d, failed to malloc memory for SVArbSetAssignedLeaderRsp", ths->vgId);
×
379
    goto _OVER;
×
380
  }
381
  if (tSerializeSVArbSetAssignedLeaderRsp(pHead, contLen, &rsp) <= 0) {
×
382
    code = TSDB_CODE_OUT_OF_MEMORY;
×
383
    sError("vgId:%d, failed to serialize SVArbSetAssignedLeaderRsp", ths->vgId);
×
384
    rpcFreeCont(pHead);
×
385
    goto _OVER;
×
386
  }
387

388
  code = TSDB_CODE_SUCCESS;
×
389

390
_OVER:;
×
391
  SRpcMsg rspMsg = {
×
392
      .code = code,
393
      .pCont = pHead,
394
      .contLen = contLen,
395
      .info = pRpcMsg->info,
396
  };
397

398
  tmsgSendRsp(&rspMsg);
×
399

400
  tFreeSVArbSetAssignedLeaderReq(&req);
×
401
  TAOS_RETURN(code);
×
402
}
403

404
int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) {
×
405
  int32_t    code = 0;
×
406
  SSyncNode* pNode = syncNodeAcquire(rid);
×
407
  if (pNode == NULL) {
×
408
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
409
    if (terrno != 0) code = terrno;
×
410
    TAOS_RETURN(code);
×
411
  }
412

413
  SRpcMsg rpcMsg = {0, .info.notFreeAhandle = 1};
×
414
  int32_t ret = syncRespMgrGetAndDel(pNode->pSyncRespMgr, seq, &rpcMsg.info);
×
415
  rpcMsg.code = TSDB_CODE_SYN_TIMEOUT;
×
416

417
  syncNodeRelease(pNode);
×
418
  if (ret == 1) {
×
419
    sInfo("send timeout response, seq:%" PRId64 " handle:%p ahandle:%p", seq, rpcMsg.info.handle, rpcMsg.info.ahandle);
×
420
    code = rpcSendResponse(&rpcMsg);
×
421
    return code;
×
422
  } else {
423
    sError("no message handle to send timeout response, seq:%" PRId64, seq);
×
424
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
425
  }
426
}
427

428
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) {
46,699✔
429
  SyncIndex minMatchIndex = SYNC_INDEX_INVALID;
46,699✔
430

431
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
139,660✔
432
    SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
92,961✔
433
    if (minMatchIndex == SYNC_INDEX_INVALID) {
92,961✔
434
      minMatchIndex = matchIndex;
50,087✔
435
    } else if (matchIndex > 0 && matchIndex < minMatchIndex) {
42,874✔
436
      minMatchIndex = matchIndex;
1,607✔
437
    }
438
  }
439
  return minMatchIndex;
46,699✔
440
}
441

442
static SyncIndex syncLogRetentionIndex(SSyncNode* pSyncNode, int64_t bytes) {
841✔
443
  return pSyncNode->pLogStore->syncLogIndexRetention(pSyncNode->pLogStore, bytes);
841✔
444
}
445

446
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
33,104✔
447
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
33,104✔
448
  int32_t    code = 0;
33,104✔
449
  if (pSyncNode == NULL) {
33,104✔
450
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
36✔
451
    if (terrno != 0) code = terrno;
36!
452
    sError("sync begin snapshot error");
36!
453
    TAOS_RETURN(code);
36✔
454
  }
455

456
  SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
33,068✔
457
  SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
33,068✔
458
  bool      isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);
33,068✔
459

460
  if (isEmpty || !(lastApplyIndex >= beginIndex && lastApplyIndex <= endIndex)) {
33,068!
461
    sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", empty:%d, do not delete wal", lastApplyIndex, isEmpty);
59!
462
    syncNodeRelease(pSyncNode);
59✔
463
    return 0;
59✔
464
  }
465

466
  int64_t logRetention = 0;
33,009✔
467

468
  if (syncNodeIsMnode(pSyncNode)) {
33,009✔
469
    // mnode
470
    logRetention = tsMndLogRetention;
3,796✔
471
  } else {
472
    // vnode
473
    if (pSyncNode->replicaNum > 1) {
29,213✔
474
      logRetention = SYNC_VNODE_LOG_RETENTION;
407✔
475
    }
476
  }
477

478
  if (pSyncNode->totalReplicaNum > 1) {
33,009✔
479
    if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER &&
853✔
480
        pSyncNode->state != TAOS_SYNC_STATE_LEARNER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
152!
481
      sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " candidate or unknown state, do not delete wal",
12!
482
              lastApplyIndex);
483
      syncNodeRelease(pSyncNode);
12✔
484
      return 0;
12✔
485
    }
486
    SyncIndex retentionIndex =
841✔
487
        TMAX(pSyncNode->minMatchIndex, syncLogRetentionIndex(pSyncNode, SYNC_WAL_LOG_RETENTION_SIZE));
841✔
488
    logRetention += TMAX(0, lastApplyIndex - retentionIndex);
841✔
489
  }
490

491
_DEL_WAL:
32,156✔
492

493
  do {
494
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
32,997✔
495
    SyncIndex          snapshotVer = walGetSnapshotVer(pData->pWal);
32,997✔
496
    SyncIndex          walCommitVer = walGetCommittedVer(pData->pWal);
32,997✔
497
    SyncIndex          wallastVer = walGetLastVer(pData->pWal);
32,997✔
498
    if (lastApplyIndex <= walCommitVer) {
32,997!
499
      SyncIndex snapshottingIndex = atomic_load_64(&pSyncNode->snapshottingIndex);
32,997✔
500

501
      if (snapshottingIndex == SYNC_INDEX_INVALID) {
32,997!
502
        atomic_store_64(&pSyncNode->snapshottingIndex, lastApplyIndex);
32,997✔
503
        pSyncNode->snapshottingTime = taosGetTimestampMs();
32,997✔
504

505
        code = walBeginSnapshot(pData->pWal, lastApplyIndex, logRetention);
32,997✔
506
        if (code == 0) {
32,996!
507
          sNTrace(pSyncNode, "wal snapshot begin, index:%" PRId64 ", last apply index:%" PRId64,
32,996✔
508
                  pSyncNode->snapshottingIndex, lastApplyIndex);
509
        } else {
510
          sNError(pSyncNode, "wal snapshot begin error since:%s, index:%" PRId64 ", last apply index:%" PRId64,
×
511
                  terrstr(), pSyncNode->snapshottingIndex, lastApplyIndex);
512
          atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
×
513
        }
514

515
      } else {
516
        sNTrace(pSyncNode, "snapshotting for %" PRId64 ", do not delete wal for new-snapshot-index:%" PRId64,
×
517
                snapshottingIndex, lastApplyIndex);
518
      }
519
    }
520
  } while (0);
521

522
  syncNodeRelease(pSyncNode);
32,996✔
523
  TAOS_RETURN(code);
32,997✔
524
}
525

526
int32_t syncEndSnapshot(int64_t rid) {
33,066✔
527
  int32_t    code = 0;
33,066✔
528
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
33,066✔
529
  if (pSyncNode == NULL) {
33,068!
530
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
531
    if (terrno != 0) code = terrno;
×
532
    sError("sync end snapshot error");
×
533
    TAOS_RETURN(code);
×
534
  }
535

536
  if (atomic_load_64(&pSyncNode->snapshottingIndex) != SYNC_INDEX_INVALID) {
33,068✔
537
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
32,997✔
538
    code = walEndSnapshot(pData->pWal);
32,997✔
539
    if (code != 0) {
32,997!
540
      sNError(pSyncNode, "wal snapshot end error since:%s", tstrerror(code));
×
541
      syncNodeRelease(pSyncNode);
×
542
      TAOS_RETURN(code);
×
543
    } else {
544
      sNTrace(pSyncNode, "wal snapshot end, index:%" PRId64, atomic_load_64(&pSyncNode->snapshottingIndex));
32,997✔
545
      atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
32,997✔
546
    }
547
  }
548

549
  syncNodeRelease(pSyncNode);
33,068✔
550
  TAOS_RETURN(code);
33,068✔
551
}
552

553
bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) {
18,553,326✔
554
  if (pSyncNode == NULL) {
18,553,326!
555
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
556
    sError("sync ready for read error");
×
557
    return false;
×
558
  }
559

560
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
18,553,326!
561
    terrno = TSDB_CODE_SYN_NOT_LEADER;
75,344✔
562
    return false;
75,344✔
563
  }
564

565
  if (!pSyncNode->restoreFinish) {
18,477,982✔
566
    terrno = TSDB_CODE_SYN_RESTORING;
24,653✔
567
    return false;
24,653✔
568
  }
569

570
  return true;
18,453,329✔
571
}
572

573
bool syncIsReadyForRead(int64_t rid) {
17,287,368✔
574
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
17,287,368✔
575
  if (pSyncNode == NULL) {
17,289,468!
576
    sError("sync ready for read error");
×
577
    return false;
×
578
  }
579

580
  bool ready = syncNodeIsReadyForRead(pSyncNode);
17,289,468✔
581

582
  syncNodeRelease(pSyncNode);
17,288,857✔
583
  return ready;
17,286,874✔
584
}
585

586
#ifdef BUILD_NO_CALL
587
bool syncSnapshotSending(int64_t rid) {
588
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
589
  if (pSyncNode == NULL) {
590
    return false;
591
  }
592

593
  bool b = syncNodeSnapshotSending(pSyncNode);
594
  syncNodeRelease(pSyncNode);
595
  return b;
596
}
597

598
bool syncSnapshotRecving(int64_t rid) {
599
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
600
  if (pSyncNode == NULL) {
601
    return false;
602
  }
603

604
  bool b = syncNodeSnapshotRecving(pSyncNode);
605
  syncNodeRelease(pSyncNode);
606
  return b;
607
}
608
#endif
609

610
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) {
15,806✔
611
  if (pSyncNode->peersNum == 0) {
15,806✔
612
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
12,371✔
613
    return 0;
12,370✔
614
  }
615

616
  int32_t ret = 0;
3,435✔
617
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && pSyncNode->replicaNum > 1) {
3,435✔
618
    SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0];
1,047✔
619
    if (pSyncNode->peersNum == 2) {
1,047✔
620
      SyncIndex matchIndex0 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
748✔
621
      SyncIndex matchIndex1 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[1]));
748✔
622
      if (matchIndex1 > matchIndex0) {
748✔
623
        newLeader = (pSyncNode->peersNodeInfo)[1];
47✔
624
      }
625
    }
626
    ret = syncNodeLeaderTransferTo(pSyncNode, newLeader);
1,047✔
627
  }
628

629
  return ret;
3,435✔
630
}
631

632
int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) {
1,047✔
633
  if (pSyncNode->replicaNum == 1) {
1,047!
634
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
×
635
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
636
  }
637

638
  sNTrace(pSyncNode, "begin leader transfer to %s:%u", newLeader.nodeFqdn, newLeader.nodePort);
1,047!
639

640
  SRpcMsg rpcMsg = {0};
1,047✔
641
  TAOS_CHECK_RETURN(syncBuildLeaderTransfer(&rpcMsg, pSyncNode->vgId));
1,047!
642

643
  SyncLeaderTransfer* pMsg = rpcMsg.pCont;
1,047✔
644
  pMsg->newLeaderId.addr = SYNC_ADDR(&newLeader);
1,047✔
645
  pMsg->newLeaderId.vgId = pSyncNode->vgId;
1,047✔
646
  pMsg->newNodeInfo = newLeader;
1,047✔
647

648
  int32_t ret = syncNodePropose(pSyncNode, &rpcMsg, false, NULL);
1,047✔
649
  rpcFreeCont(rpcMsg.pCont);
1,047✔
650
  return ret;
1,047✔
651
}
652

653
SSyncState syncGetState(int64_t rid) {
5,942,753✔
654
  SSyncState state = {.state = TAOS_SYNC_STATE_ERROR};
5,942,753✔
655

656
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
5,942,753✔
657
  if (pSyncNode != NULL) {
5,944,102✔
658
    state.state = pSyncNode->state;
5,944,066✔
659
    state.roleTimeMs = pSyncNode->roleTimeMs;
5,944,066✔
660
    state.startTimeMs = pSyncNode->startTime;
5,944,066✔
661
    state.restored = pSyncNode->restoreFinish;
5,944,066✔
662
    if (pSyncNode->vgId != 1) {
5,944,066✔
663
      state.canRead = syncNodeIsReadyForRead(pSyncNode);
1,266,385✔
664
    } else {
665
      state.canRead = state.restored;
4,677,681✔
666
    }
667
    /*
668
    double progress = 0;
669
    if(pSyncNode->pLogBuf->totalIndex > 0 && pSyncNode->pLogBuf->commitIndex > 0){
670
      progress = (double)pSyncNode->pLogBuf->commitIndex/(double)pSyncNode->pLogBuf->totalIndex;
671
      state.progress = (int32_t)(progress * 100);
672
    }
673
    else{
674
      state.progress = -1;
675
    }
676
    sDebug("vgId:%d, learner progress state, commitIndex:%" PRId64 " totalIndex:%" PRId64 ", "
677
            "progress:%lf, progress:%d",
678
          pSyncNode->vgId,
679
         pSyncNode->pLogBuf->commitIndex, pSyncNode->pLogBuf->totalIndex, progress, state.progress);
680
    */
681
    state.term = raftStoreGetTerm(pSyncNode);
5,944,056✔
682
    syncNodeRelease(pSyncNode);
5,944,204✔
683
  }
684

685
  return state;
5,944,128✔
686
}
687

688
void syncGetCommitIndex(int64_t rid, int64_t* syncCommitIndex) {
1,072,965✔
689
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
1,072,965✔
690
  if (pSyncNode != NULL) {
1,072,965!
691
    *syncCommitIndex = pSyncNode->commitIndex;
1,072,965✔
692
    syncNodeRelease(pSyncNode);
1,072,965✔
693
  }
694
}
1,072,965✔
695

696
int32_t syncGetArbToken(int64_t rid, char* outToken) {
73,006✔
697
  int32_t    code = 0;
73,006✔
698
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
73,006✔
699
  if (pSyncNode == NULL) {
73,006!
700
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
701
    if (terrno != 0) code = terrno;
×
702
    TAOS_RETURN(code);
×
703
  }
704

705
  memset(outToken, 0, TSDB_ARB_TOKEN_SIZE);
73,006✔
706
  (void)taosThreadMutexLock(&pSyncNode->arbTokenMutex);
73,006✔
707
  tstrncpy(outToken, pSyncNode->arbToken, TSDB_ARB_TOKEN_SIZE);
73,006✔
708
  (void)taosThreadMutexUnlock(&pSyncNode->arbTokenMutex);
73,006✔
709

710
  syncNodeRelease(pSyncNode);
73,006✔
711
  TAOS_RETURN(code);
73,006✔
712
}
713

714
int32_t syncCheckSynced(int64_t rid) {
5✔
715
  int32_t    code = 0;
5✔
716
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
5✔
717
  if (pSyncNode == NULL) {
5!
718
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
719
    if (terrno != 0) code = terrno;
×
720
    TAOS_RETURN(code);
×
721
  }
722

723
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
5!
724
    code = TSDB_CODE_VND_ARB_NOT_SYNCED;
×
725
    syncNodeRelease(pSyncNode);
×
726
    TAOS_RETURN(code);
×
727
  }
728

729
  bool isSync = pSyncNode->commitIndex >= pSyncNode->assignedCommitIndex;
5✔
730
  code = (isSync ? TSDB_CODE_SUCCESS : TSDB_CODE_VND_ARB_NOT_SYNCED);
5!
731

732
  syncNodeRelease(pSyncNode);
5✔
733
  TAOS_RETURN(code);
5✔
734
}
735

736
int32_t syncUpdateArbTerm(int64_t rid, SyncTerm arbTerm) {
111✔
737
  int32_t    code = 0;
111✔
738
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
111✔
739
  if (pSyncNode == NULL) {
111!
740
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
741
    if (terrno != 0) code = terrno;
×
742
    TAOS_RETURN(code);
×
743
  }
744

745
  pSyncNode->arbTerm = TMAX(arbTerm, pSyncNode->arbTerm);
111✔
746
  syncNodeRelease(pSyncNode);
111✔
747
  TAOS_RETURN(code);
111✔
748
}
749

750
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex) {
1,689,833✔
751
  if (pSyncNode->raftCfg.configIndexCount < 1) {
1,689,833!
752
    sError("vgId:%d, failed get snapshot config index, configIndexCount:%d", pSyncNode->vgId,
×
753
           pSyncNode->raftCfg.configIndexCount);
754
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
755
    return -2;
×
756
  }
757
  SyncIndex lastIndex = (pSyncNode->raftCfg.configIndexArr)[0];
1,689,833✔
758

759
  for (int32_t i = 0; i < pSyncNode->raftCfg.configIndexCount; ++i) {
3,422,517✔
760
    if ((pSyncNode->raftCfg.configIndexArr)[i] > lastIndex &&
1,732,684✔
761
        (pSyncNode->raftCfg.configIndexArr)[i] <= snapshotLastApplyIndex) {
42,845!
762
      lastIndex = (pSyncNode->raftCfg.configIndexArr)[i];
42,845✔
763
    }
764
  }
765
  sTrace("vgId:%d, index:%" PRId64 ", get last snapshot config index:%" PRId64, pSyncNode->vgId, snapshotLastApplyIndex,
1,689,833✔
766
         lastIndex);
767

768
  return lastIndex;
1,689,837✔
769
}
770

771
static SRaftId syncGetRaftIdByEp(SSyncNode* pSyncNode, const SEp* pEp) {
737,887✔
772
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
943,893✔
773
    if (strcmp(pEp->fqdn, (pSyncNode->peersNodeInfo)[i].nodeFqdn) == 0 &&
338,984✔
774
        pEp->port == (pSyncNode->peersNodeInfo)[i].nodePort) {
338,935✔
775
      return pSyncNode->peersId[i];
132,978✔
776
    }
777
  }
778
  return EMPTY_RAFT_ID;
604,909✔
779
}
780

781
static void epsetToString(const SEpSet* pEpSet, char* buffer, size_t bufferSize) {
604,892✔
782
  if (pEpSet == NULL || buffer == NULL) {
604,892!
783
    snprintf(buffer, bufferSize, "EpSet is NULL");
×
784
    return;
×
785
  }
786

787
  size_t offset = 0;
604,920✔
788
  offset += snprintf(buffer + offset, bufferSize - offset, "EpSet: [");
604,920✔
789

790
  for (int i = 0; i < pEpSet->numOfEps; ++i) {
1,342,706✔
791
    if (offset >= bufferSize) break;
737,786!
792
    offset += snprintf(buffer + offset, bufferSize - offset, "%s:%d%s", pEpSet->eps[i].fqdn, pEpSet->eps[i].port,
737,786✔
793
                       (i + 1 < pEpSet->numOfEps) ? ", " : "");
737,786✔
794
  }
795

796
  if (offset < bufferSize) {
604,920!
797
    snprintf(buffer + offset, bufferSize - offset, "]");
605,034✔
798
  }
799
}
800

801
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
605,044✔
802
  pEpSet->numOfEps = 0;
605,044✔
803

804
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
605,044✔
805
  if (pSyncNode == NULL) return;
605,125!
806

807
  int index = -1;
605,125✔
808

809
  sDebug("vgId:%d, sync get retry epset, leaderCache:%" PRIx64 ", leaderCacheEp.fqdn:%s, leaderCacheEp.port:%d",
605,125✔
810
         pSyncNode->vgId, pSyncNode->leaderCache.addr, pSyncNode->leaderCacheEp.fqdn, pSyncNode->leaderCacheEp.port);
811
  int j = 0;
605,092✔
812
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
1,344,597✔
813
    if (pSyncNode->raftCfg.cfg.nodeInfo[i].nodeRole == TAOS_SYNC_ROLE_LEARNER) continue;
739,548✔
814
    SEp* pEp = &pEpSet->eps[j];
737,912✔
815
    tstrncpy(pEp->fqdn, pSyncNode->raftCfg.cfg.nodeInfo[i].nodeFqdn, TSDB_FQDN_LEN);
737,912✔
816
    pEp->port = (pSyncNode->raftCfg.cfg.nodeInfo)[i].nodePort;
737,912✔
817
    pEpSet->numOfEps++;
737,912✔
818
    SRaftId id = syncGetRaftIdByEp(pSyncNode, pEp);
737,912✔
819
    sDebug("vgId:%d, sync get retry epset, index:%d id:%" PRIx64 " %s:%d", pSyncNode->vgId, i, id.addr, pEp->fqdn,
737,935✔
820
           pEp->port);
821
    if (pEp->port == pSyncNode->leaderCacheEp.port &&
737,869✔
822
        strncmp(pEp->fqdn, pSyncNode->leaderCacheEp.fqdn, TSDB_FQDN_LEN) == 0 /*&&
566,205✔
823
        id.vgId == pSyncNode->leaderCache.vgId && id.addr != 0 && id.vgId != 0*/) {
824
      index = j;
566,058✔
825
    }
826
    j++;
737,869✔
827
  }
828
  if (pEpSet->numOfEps > 0) {
605,049✔
829
    if (index != -1) {
604,988✔
830
      pEpSet->inUse = index;
566,119✔
831
    } else {
832
      if (pSyncNode->myRaftId.addr == pSyncNode->leaderCache.addr &&
38,869!
833
          pSyncNode->myRaftId.vgId == pSyncNode->leaderCache.vgId) {
×
834
        pEpSet->inUse = pSyncNode->raftCfg.cfg.myIndex;
×
835
      } else {
836
        pEpSet->inUse = (pSyncNode->raftCfg.cfg.myIndex + 1) % pEpSet->numOfEps;
38,869✔
837
      }
838
    }
839
    // pEpSet->inUse = 0;
840
  }
841
  epsetSort(pEpSet);
605,049✔
842

843
  char buffer[1024];
844
  epsetToString(pEpSet, buffer, sizeof(buffer));
604,902✔
845
  sDebug("vgId:%d, sync get retry epset numOfEps:%d %s inUse:%d", pSyncNode->vgId, pEpSet->numOfEps, buffer,
605,064✔
846
         pEpSet->inUse);
847
  syncNodeRelease(pSyncNode);
605,064✔
848
}
849

850
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
10,620,913✔
851
  int32_t    code = 0;
10,620,913✔
852
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
10,620,913✔
853
  if (pSyncNode == NULL) {
10,621,040✔
854
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
1✔
855
    if (terrno != 0) code = terrno;
1!
856
    sError("sync propose error");
1!
857
    TAOS_RETURN(code);
1✔
858
  }
859

860
  int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak, seq);
10,621,039✔
861
  syncNodeRelease(pSyncNode);
10,620,690✔
862
  return ret;
10,620,992✔
863
}
864

865
int32_t syncCheckMember(int64_t rid) {
×
866
  int32_t    code = 0;
×
867
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
×
868
  if (pSyncNode == NULL) {
×
869
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
870
    if (terrno != 0) code = terrno;
×
871
    sError("sync propose error");
×
872
    TAOS_RETURN(code);
×
873
  }
874

875
  if (pSyncNode->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_LEARNER) {
×
876
    syncNodeRelease(pSyncNode);
×
877
    return TSDB_CODE_SYN_WRONG_ROLE;
×
878
  }
879

880
  syncNodeRelease(pSyncNode);
×
881
  return 0;
×
882
}
883

884
int32_t syncIsCatchUp(int64_t rid) {
5,167✔
885
  int32_t    code = 0;
5,167✔
886
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
5,167✔
887
  if (pSyncNode == NULL) {
5,167!
888
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
889
    if (terrno != 0) code = terrno;
×
890
    sError("sync Node Acquire error since %d", ERRNO);
×
891
    TAOS_RETURN(code);
×
892
  }
893

894
  int32_t isCatchUp = 0;
5,167✔
895
  if (pSyncNode->pLogBuf->totalIndex < 0 || pSyncNode->pLogBuf->commitIndex < 0 ||
5,167!
896
      pSyncNode->pLogBuf->totalIndex < pSyncNode->pLogBuf->commitIndex ||
1,208!
897
      pSyncNode->pLogBuf->totalIndex - pSyncNode->pLogBuf->commitIndex > SYNC_LEARNER_CATCHUP) {
1,208✔
898
    sInfo("vgId:%d, Not catch up, wait one second, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64,
4,896!
899
          pSyncNode->vgId, pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex,
900
          pSyncNode->pLogBuf->matchIndex);
901
    isCatchUp = 0;
4,896✔
902
  } else {
903
    sInfo("vgId:%d, Catch up, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64, pSyncNode->vgId,
271!
904
          pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex, pSyncNode->pLogBuf->matchIndex);
905
    isCatchUp = 1;
271✔
906
  }
907

908
  syncNodeRelease(pSyncNode);
5,167✔
909
  return isCatchUp;
5,167✔
910
}
911

912
ESyncRole syncGetRole(int64_t rid) {
5,167✔
913
  int32_t    code = 0;
5,167✔
914
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
5,167✔
915
  if (pSyncNode == NULL) {
5,167!
916
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
917
    if (terrno != 0) code = terrno;
×
918
    sError("sync Node Acquire error since %d", ERRNO);
×
919
    TAOS_RETURN(code);
×
920
  }
921

922
  ESyncRole role = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole;
5,167✔
923

924
  syncNodeRelease(pSyncNode);
5,167✔
925
  return role;
5,167✔
926
}
927

928
int64_t syncGetTerm(int64_t rid) {
29,027✔
929
  int32_t    code = 0;
29,027✔
930
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
29,027✔
931
  if (pSyncNode == NULL) {
29,027!
932
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
933
    if (terrno != 0) code = terrno;
×
934
    sError("sync Node Acquire error since %d", ERRNO);
×
935
    TAOS_RETURN(code);
×
936
  }
937

938
  int64_t term = raftStoreGetTerm(pSyncNode);
29,027✔
939

940
  syncNodeRelease(pSyncNode);
29,027✔
941
  return term;
29,027✔
942
}
943

944
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
10,621,986✔
945
  int32_t code = 0;
10,621,986✔
946
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
10,621,986!
947
    code = TSDB_CODE_SYN_NOT_LEADER;
6,674✔
948
    sNWarn(pSyncNode, "sync propose not leader, type:%s", TMSG_INFO(pMsg->msgType));
6,674!
949
    TAOS_RETURN(code);
6,674✔
950
  }
951

952
  if (!pSyncNode->restoreFinish) {
10,615,312✔
953
    code = TSDB_CODE_SYN_PROPOSE_NOT_READY;
164✔
954
    sNWarn(pSyncNode, "failed to sync propose since not ready, type:%s, last:%" PRId64 ", cmt:%" PRId64,
164!
955
           TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
956
    TAOS_RETURN(code);
164✔
957
  }
958

959
  // heartbeat timeout
960
  if (pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER && syncNodeHeartbeatReplyTimeout(pSyncNode)) {
10,615,148✔
961
    code = TSDB_CODE_SYN_PROPOSE_NOT_READY;
3✔
962
    sNError(pSyncNode, "failed to sync propose since heartbeat timeout, type:%s, last:%" PRId64 ", cmt:%" PRId64,
3!
963
            TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
964
    TAOS_RETURN(code);
3✔
965
  }
966

967
  // optimized one replica
968
  if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) {
10,615,149✔
969
    SyncIndex retIndex;
970
    int32_t   code = syncNodeOnClientRequest(pSyncNode, pMsg, &retIndex);
10,364,630✔
971
    if (code >= 0) {
10,364,232!
972
      pMsg->info.conn.applyIndex = retIndex;
10,364,375✔
973
      pMsg->info.conn.applyTerm = raftStoreGetTerm(pSyncNode);
10,364,375✔
974

975
      // after raft member change, need to handle 1->2 switching point
976
      // at this point, need to switch entry handling thread
977
      if (pSyncNode->replicaNum == 1) {
10,364,672✔
978
        sGDebug(&pMsg->info.traceId, "vgId:%d, index:%" PRId64 ", propose optimized msg type:%s", pSyncNode->vgId,
10,364,623!
979
                retIndex, TMSG_INFO(pMsg->msgType));
980
        return 1;
10,364,427✔
981
      } else {
982
        sGDebug(&pMsg->info.traceId,
49!
983
                "vgId:%d, index:%" PRId64 ", propose optimized msg, return to normal, type:%s, handle:%p",
984
                pSyncNode->vgId, retIndex, TMSG_INFO(pMsg->msgType), pMsg->info.handle);
985
        return 0;
×
986
      }
987
    } else {
988
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
989
      sError("vgId:%d, failed to propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
×
990
             TMSG_INFO(pMsg->msgType));
991
      TAOS_RETURN(code);
×
992
    }
993
  } else {
994
    SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
250,587✔
995
    uint64_t  seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);
250,592✔
996
    SRpcMsg   rpcMsg = {.info.traceId = pMsg->info.traceId};
250,592✔
997
    int32_t   code = syncBuildClientRequest(&rpcMsg, pMsg, seqNum, isWeak, pSyncNode->vgId);
250,592✔
998
    if (code != 0) {
250,591!
999
      sError("vgId:%d, failed to propose msg while serialize since %s", pSyncNode->vgId, terrstr());
×
1000
      code = syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
×
1001
      TAOS_RETURN(code);
×
1002
    }
1003

1004
    sGDebug(&pMsg->info.traceId, "vgId:%d, msg:%p, propose msg, type:%s", pSyncNode->vgId, pMsg, TMSG_INFO(pMsg->msgType));
250,591!
1005
    code = (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg);
250,591✔
1006
    if (code != 0) {
250,587✔
1007
      sWarn("vgId:%d, failed to propose msg while enqueue since %s", pSyncNode->vgId, terrstr());
1,357!
1008
      TAOS_CHECK_RETURN(syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum));
1,360✔
1009
    }
1010

1011
    if (seq != NULL) *seq = seqNum;
250,550✔
1012
    TAOS_RETURN(code);
250,550✔
1013
  }
1014
}
1015

1016
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId) {
263,400✔
1017
  pSyncTimer->pTimer = NULL;
263,400✔
1018
  pSyncTimer->counter = 0;
263,400✔
1019
  pSyncTimer->timerMS = pSyncNode->hbBaseLine;
263,400✔
1020
  pSyncTimer->timerCb = syncNodeEqPeerHeartbeatTimer;
263,400✔
1021
  pSyncTimer->destId = destId;
263,400✔
1022
  pSyncTimer->timeStamp = taosGetTimestampMs();
263,414✔
1023
  atomic_store_64(&pSyncTimer->logicClock, 0);
263,414✔
1024
  return 0;
263,418✔
1025
}
1026

1027
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
2,477✔
1028
  int32_t code = 0;
2,477✔
1029
  int64_t tsNow = taosGetTimestampMs();
2,477✔
1030
  if (syncIsInit()) {
2,477!
1031
    SSyncHbTimerData* pData = syncHbTimerDataAcquire(pSyncTimer->hbDataRid);
2,477✔
1032
    if (pData == NULL) {
2,477!
1033
      pData = taosMemoryMalloc(sizeof(SSyncHbTimerData));
2,477!
1034
      pData->rid = syncHbTimerDataAdd(pData);
2,477✔
1035
    }
1036
    pSyncTimer->hbDataRid = pData->rid;
2,477✔
1037
    pSyncTimer->timeStamp = tsNow;
2,477✔
1038

1039
    pData->syncNodeRid = pSyncNode->rid;
2,477✔
1040
    pData->pTimer = pSyncTimer;
2,477✔
1041
    pData->destId = pSyncTimer->destId;
2,477✔
1042
    pData->logicClock = pSyncTimer->logicClock;
2,477✔
1043
    pData->execTime = tsNow + pSyncTimer->timerMS;
2,477✔
1044

1045
    sTrace("vgId:%d, start hb timer, rid:%" PRId64 " addr:0x%" PRIx64 " at %d", pSyncNode->vgId, pData->rid,
2,477!
1046
           pData->destId.addr, pSyncTimer->timerMS);
1047

1048
    bool stopped = taosTmrResetPriority(pSyncTimer->timerCb, pSyncTimer->timerMS, (void*)(pData->rid),
2,477✔
1049
                                        syncEnv()->pTimerManager, &pSyncTimer->pTimer, 2);
2,477✔
1050
    if (stopped) {
2,477!
1051
      sError("vgId:%d, failed to reset hb timer success", pSyncNode->vgId);
×
1052
      return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1053
    }
1054
  } else {
1055
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1056
    sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId);
×
1057
  }
1058
  return code;
2,477✔
1059
}
1060

1061
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
25,254✔
1062
  int32_t ret = 0;
25,254✔
1063
  (void)atomic_add_fetch_64(&pSyncTimer->logicClock, 1);
25,254✔
1064
  bool stop = taosTmrStop(pSyncTimer->pTimer);
25,254✔
1065
  sDebug("vgId:%d, stop hb timer stop:%d", pSyncNode->vgId, stop);
25,253✔
1066
  pSyncTimer->pTimer = NULL;
25,253✔
1067
  syncHbTimerDataRemove(pSyncTimer->hbDataRid);
25,253✔
1068
  pSyncTimer->hbDataRid = -1;
25,253✔
1069
  return ret;
25,253✔
1070
}
1071

1072
int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) {
15,807✔
1073
  int32_t code = 0;
15,807✔
1074
  if (pNode->pLogStore == NULL) {
15,807!
1075
    sError("vgId:%d, log store not created", pNode->vgId);
×
1076
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1077
  }
1078
  if (pNode->pFsm == NULL) {
15,807!
1079
    sError("vgId:%d, pFsm not registered", pNode->vgId);
×
1080
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1081
  }
1082
  if (pNode->pFsm->FpGetSnapshotInfo == NULL) {
15,807!
1083
    sError("vgId:%d, FpGetSnapshotInfo not registered", pNode->vgId);
×
1084
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1085
  }
1086
  SSnapshot snapshot = {0};
15,807✔
1087
  // TODO check return value
1088
  (void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
15,807✔
1089

1090
  SyncIndex commitIndex = snapshot.lastApplyIndex;
15,807✔
1091
  SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
15,807✔
1092
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
15,807✔
1093
  if ((lastVer < commitIndex || firstVer > commitIndex + 1) || pNode->fsmState == SYNC_FSM_STATE_INCOMPLETE) {
15,807!
1094
    if ((code = pNode->pLogStore->syncLogRestoreFromSnapshot(pNode->pLogStore, commitIndex)) != 0) {
×
1095
      sError("vgId:%d, failed to restore log store from snapshot since %s. lastVer:%" PRId64 ", snapshotVer:%" PRId64,
×
1096
             pNode->vgId, terrstr(), lastVer, commitIndex);
1097
      TAOS_RETURN(code);
×
1098
    }
1099
  }
1100
  TAOS_RETURN(code);
15,807✔
1101
}
1102

1103
// open/close --------------
1104
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
15,797✔
1105
  int32_t    code = 0;
15,797✔
1106
  SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode));
15,797!
1107
  if (pSyncNode == NULL) {
15,806!
1108
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1109
    goto _error;
×
1110
  }
1111

1112
  if (!taosDirExist((char*)(pSyncInfo->path))) {
15,806✔
1113
    if (taosMkDir(pSyncInfo->path) != 0) {
12,897!
1114
      terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
1115
      sError("vgId:%d, failed to create dir:%s since %s", pSyncInfo->vgId, pSyncInfo->path, terrstr());
×
1116
      goto _error;
×
1117
    }
1118
  }
1119

1120
  memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
15,806✔
1121
  snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s%sraft_store.json", pSyncInfo->path,
15,806✔
1122
           TD_DIRSEP);
1123
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
15,806✔
1124

1125
  if (!taosCheckExistFile(pSyncNode->configPath)) {
15,806✔
1126
    // create a new raft config file
1127
    sInfo("vgId:%d, create a new raft config file", pSyncInfo->vgId);
12,896✔
1128
    pSyncNode->vgId = pSyncInfo->vgId;
12,898✔
1129
    pSyncNode->raftCfg.isStandBy = pSyncInfo->isStandBy;
12,898✔
1130
    pSyncNode->raftCfg.snapshotStrategy = pSyncInfo->snapshotStrategy;
12,898✔
1131
    pSyncNode->raftCfg.lastConfigIndex = pSyncInfo->syncCfg.lastIndex;
12,898✔
1132
    pSyncNode->raftCfg.batchSize = pSyncInfo->batchSize;
12,898✔
1133
    pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
12,898✔
1134
    pSyncNode->raftCfg.configIndexCount = 1;
12,898✔
1135
    pSyncNode->raftCfg.configIndexArr[0] = -1;
12,898✔
1136

1137
    if ((code = syncWriteCfgFile(pSyncNode)) != 0) {
12,898!
1138
      terrno = code;
×
1139
      sError("vgId:%d, failed to create sync cfg file", pSyncNode->vgId);
×
1140
      goto _error;
×
1141
    }
1142
  } else {
1143
    // update syncCfg by raft_config.json
1144
    if ((code = syncReadCfgFile(pSyncNode)) != 0) {
2,909!
1145
      terrno = code;
×
1146
      sError("vgId:%d, failed to read sync cfg file", pSyncNode->vgId);
×
1147
      goto _error;
×
1148
    }
1149

1150
    if (vnodeVersion > pSyncNode->raftCfg.cfg.changeVersion) {
2,908✔
1151
      if (pSyncInfo->syncCfg.totalReplicaNum > 0 && syncIsConfigChanged(&pSyncNode->raftCfg.cfg, &pSyncInfo->syncCfg)) {
1,746!
1152
        sInfo("vgId:%d, use sync config from input options and write to cfg file", pSyncNode->vgId);
1,432!
1153
        pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
1,432✔
1154
        if ((code = syncWriteCfgFile(pSyncNode)) != 0) {
1,432!
1155
          terrno = code;
×
1156
          sError("vgId:%d, failed to write sync cfg file", pSyncNode->vgId);
×
1157
          goto _error;
×
1158
        }
1159
      } else {
1160
        sInfo("vgId:%d, use sync config from sync cfg file", pSyncNode->vgId);
314!
1161
        pSyncInfo->syncCfg = pSyncNode->raftCfg.cfg;
314✔
1162
      }
1163
    } else {
1164
      sInfo("vgId:%d, skip save sync cfg file since request ver:%d <= file ver:%d", pSyncNode->vgId, vnodeVersion,
1,162!
1165
            pSyncInfo->syncCfg.changeVersion);
1166
    }
1167
  }
1168

1169
  // init by SSyncInfo
1170
  pSyncNode->vgId = pSyncInfo->vgId;
15,807✔
1171
  SSyncCfg* pCfg = &pSyncNode->raftCfg.cfg;
15,807✔
1172
  bool      updated = false;
15,807✔
1173
  sInfo("vgId:%d, start to open sync node, totalReplicaNum:%d replicaNum:%d selfIndex:%d", pSyncNode->vgId,
15,807✔
1174
        pCfg->totalReplicaNum, pCfg->replicaNum, pCfg->myIndex);
1175
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
38,477✔
1176
    SNodeInfo* pNode = &pCfg->nodeInfo[i];
22,670✔
1177
    if (tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort)) {
22,670!
1178
      updated = true;
×
1179
    }
1180
    sInfo("vgId:%d, index:%d ep:%s:%u dnode:%d cluster:%" PRId64, pSyncNode->vgId, i, pNode->nodeFqdn, pNode->nodePort,
22,670✔
1181
          pNode->nodeId, pNode->clusterId);
1182
  }
1183

1184
  if (vnodeVersion > pSyncInfo->syncCfg.changeVersion) {
15,807✔
1185
    if (updated) {
2,062!
1186
      sInfo("vgId:%d, save config info since dnode info changed", pSyncNode->vgId);
×
1187
      if ((code = syncWriteCfgFile(pSyncNode)) != 0) {
×
1188
        terrno = code;
×
1189
        sError("vgId:%d, failed to write sync cfg file on dnode info updated", pSyncNode->vgId);
×
1190
        goto _error;
×
1191
      }
1192
    }
1193
  }
1194

1195
  pSyncNode->pWal = pSyncInfo->pWal;
15,807✔
1196
  pSyncNode->msgcb = pSyncInfo->msgcb;
15,807✔
1197
  pSyncNode->syncSendMSg = pSyncInfo->syncSendMSg;
15,807✔
1198
  pSyncNode->syncEqMsg = pSyncInfo->syncEqMsg;
15,807✔
1199
  pSyncNode->syncEqCtrlMsg = pSyncInfo->syncEqCtrlMsg;
15,807✔
1200

1201
  // create raft log ring buffer
1202
  code = syncLogBufferCreate(&pSyncNode->pLogBuf);
15,807✔
1203
  if (pSyncNode->pLogBuf == NULL) {
15,806!
1204
    sError("failed to init sync log buffer since %s. vgId:%d", tstrerror(code), pSyncNode->vgId);
×
1205
    goto _error;
×
1206
  }
1207

1208
  // init replicaNum, replicasId
1209
  pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
15,806✔
1210
  pSyncNode->totalReplicaNum = pSyncNode->raftCfg.cfg.totalReplicaNum;
15,806✔
1211
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
38,476✔
1212
    if (syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]) ==
22,669!
1213
        false) {
1214
      sError("vgId:%d, failed to determine raft member id, replica:%d", pSyncNode->vgId, i);
×
1215
      goto _error;
×
1216
    }
1217
  }
1218

1219
  // init internal
1220
  pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
15,807✔
1221
  pSyncNode->myRaftId = pSyncNode->replicasId[pSyncNode->raftCfg.cfg.myIndex];
15,807✔
1222

1223
  // init peersNum, peers, peersId
1224
  pSyncNode->peersNum = pSyncNode->raftCfg.cfg.totalReplicaNum - 1;
15,807✔
1225
  int32_t j = 0;
15,807✔
1226
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
38,477✔
1227
    if (i != pSyncNode->raftCfg.cfg.myIndex) {
22,670✔
1228
      pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
6,863✔
1229
      pSyncNode->peersId[j] = pSyncNode->replicasId[i];
6,863✔
1230
      syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
6,863✔
1231
      j++;
6,863✔
1232
    }
1233
  }
1234

1235
  pSyncNode->arbTerm = -1;
15,807✔
1236
  (void)taosThreadMutexInit(&pSyncNode->arbTokenMutex, NULL);
15,807✔
1237
  syncUtilGenerateArbToken(pSyncNode->myNodeInfo.nodeId, pSyncInfo->vgId, pSyncNode->arbToken);
15,807✔
1238
  sInfo("vgId:%d, generate arb token:%s", pSyncNode->vgId, pSyncNode->arbToken);
15,805✔
1239

1240
  // init raft algorithm
1241
  pSyncNode->pFsm = pSyncInfo->pFsm;
15,805✔
1242
  pSyncInfo->pFsm = NULL;
15,805✔
1243
  pSyncNode->quorum = syncUtilQuorum(pSyncNode->raftCfg.cfg.replicaNum);
15,805✔
1244
  pSyncNode->leaderCache = EMPTY_RAFT_ID;
15,807✔
1245
  pSyncNode->leaderCacheEp.port = 0;
15,807✔
1246
  pSyncNode->leaderCacheEp.fqdn[0] = '\0';
15,807✔
1247

1248
  // init life cycle outside
1249

1250
  // TLA+ Spec
1251
  // InitHistoryVars == /\ elections = {}
1252
  //                    /\ allLogs   = {}
1253
  //                    /\ voterLog  = [i \in Server |-> [j \in {} |-> <<>>]]
1254
  // InitServerVars == /\ currentTerm = [i \in Server |-> 1]
1255
  //                   /\ state       = [i \in Server |-> Follower]
1256
  //                   /\ votedFor    = [i \in Server |-> Nil]
1257
  // InitCandidateVars == /\ votesResponded = [i \in Server |-> {}]
1258
  //                      /\ votesGranted   = [i \in Server |-> {}]
1259
  // \* The values nextIndex[i][i] and matchIndex[i][i] are never read, since the
1260
  // \* leader does not send itself messages. It's still easier to include these
1261
  // \* in the functions.
1262
  // InitLeaderVars == /\ nextIndex  = [i \in Server |-> [j \in Server |-> 1]]
1263
  //                   /\ matchIndex = [i \in Server |-> [j \in Server |-> 0]]
1264
  // InitLogVars == /\ log          = [i \in Server |-> << >>]
1265
  //                /\ commitIndex  = [i \in Server |-> 0]
1266
  // Init == /\ messages = [m \in {} |-> 0]
1267
  //         /\ InitHistoryVars
1268
  //         /\ InitServerVars
1269
  //         /\ InitCandidateVars
1270
  //         /\ InitLeaderVars
1271
  //         /\ InitLogVars
1272
  //
1273

1274
  // init TLA+ server vars
1275
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
15,807✔
1276
  pSyncNode->roleTimeMs = taosGetTimestampMs();
15,807✔
1277
  if ((code = raftStoreOpen(pSyncNode)) != 0) {
15,807!
1278
    terrno = code;
×
1279
    sError("vgId:%d, failed to open raft store at path %s", pSyncNode->vgId, pSyncNode->raftStorePath);
×
1280
    goto _error;
×
1281
  }
1282

1283
  // init TLA+ candidate vars
1284
  pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode);
15,807✔
1285
  if (pSyncNode->pVotesGranted == NULL) {
15,807!
1286
    sError("vgId:%d, failed to create VotesGranted", pSyncNode->vgId);
×
1287
    goto _error;
×
1288
  }
1289
  pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode);
15,807✔
1290
  if (pSyncNode->pVotesRespond == NULL) {
15,807!
1291
    sError("vgId:%d, failed to create VotesRespond", pSyncNode->vgId);
×
1292
    goto _error;
×
1293
  }
1294

1295
  // init TLA+ leader vars
1296
  pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode);
15,807✔
1297
  if (pSyncNode->pNextIndex == NULL) {
15,804!
1298
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
×
1299
    goto _error;
×
1300
  }
1301
  pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode);
15,804✔
1302
  if (pSyncNode->pMatchIndex == NULL) {
15,805!
1303
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
×
1304
    goto _error;
×
1305
  }
1306

1307
  // init TLA+ log vars
1308
  pSyncNode->pLogStore = logStoreCreate(pSyncNode);
15,805✔
1309
  if (pSyncNode->pLogStore == NULL) {
15,804!
1310
    sError("vgId:%d, failed to create SyncLogStore", pSyncNode->vgId);
×
1311
    goto _error;
×
1312
  }
1313

1314
  SyncIndex commitIndex = SYNC_INDEX_INVALID;
15,804✔
1315
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
15,804!
1316
    SSnapshot snapshot = {0};
15,805✔
1317
    // TODO check return value
1318
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
15,805✔
1319
    if (snapshot.lastApplyIndex > commitIndex) {
15,805✔
1320
      commitIndex = snapshot.lastApplyIndex;
1,446✔
1321
      sNTrace(pSyncNode, "reset commit index by snapshot");
1,446!
1322
    }
1323
    pSyncNode->fsmState = snapshot.state;
15,805✔
1324
    if (pSyncNode->fsmState == SYNC_FSM_STATE_INCOMPLETE) {
15,805!
1325
      sError("vgId:%d, fsm state is incomplete.", pSyncNode->vgId);
×
1326
      if (pSyncNode->replicaNum == 1) {
×
1327
        terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1328
        goto _error;
×
1329
      }
1330
    }
1331
  }
1332
  pSyncNode->commitIndex = commitIndex;
15,804✔
1333
  sInfo("vgId:%d, sync node commitIndex initialized as %" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
15,804✔
1334

1335
  // restore log store on need
1336
  if ((code = syncNodeLogStoreRestoreOnNeed(pSyncNode)) < 0) {
15,808!
1337
    terrno = code;
×
1338
    sError("vgId:%d, failed to restore log store since %s.", pSyncNode->vgId, terrstr());
×
1339
    goto _error;
×
1340
  }
1341

1342
  // timer ms init
1343
  pSyncNode->pingBaseLine = PING_TIMER_MS;
15,807✔
1344
  pSyncNode->electBaseLine = tsElectInterval;
15,807✔
1345
  pSyncNode->hbBaseLine = tsHeartbeatInterval;
15,807✔
1346

1347
  // init ping timer
1348
  pSyncNode->pPingTimer = NULL;
15,807✔
1349
  pSyncNode->pingTimerMS = pSyncNode->pingBaseLine;
15,807✔
1350
  atomic_store_64(&pSyncNode->pingTimerLogicClock, 0);
15,807✔
1351
  atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0);
15,807✔
1352
  pSyncNode->FpPingTimerCB = syncNodeEqPingTimer;
15,807✔
1353
  pSyncNode->pingTimerCounter = 0;
15,807✔
1354

1355
  // init elect timer
1356
  pSyncNode->pElectTimer = NULL;
15,807✔
1357
  pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
15,807✔
1358
  atomic_store_64(&pSyncNode->electTimerLogicClock, 0);
15,807✔
1359
  pSyncNode->FpElectTimerCB = syncNodeEqElectTimer;
15,807✔
1360
  pSyncNode->electTimerCounter = 0;
15,807✔
1361

1362
  // init heartbeat timer
1363
  pSyncNode->pHeartbeatTimer = NULL;
15,807✔
1364
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
15,807✔
1365
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0);
15,807✔
1366
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0);
15,807✔
1367
#ifdef BUILD_NO_CALL
1368
  pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer;
1369
#endif
1370
  pSyncNode->heartbeatTimerCounter = 0;
15,808✔
1371

1372
  // init peer heartbeat timer
1373
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
252,901✔
1374
    if ((code = syncHbTimerInit(pSyncNode, &(pSyncNode->peerHeartbeatTimerArr[i]), (pSyncNode->replicasId)[i])) != 0) {
237,094!
1375
      terrno = code;
×
1376
      goto _error;
×
1377
    }
1378
  }
1379

1380
  // tools
1381
  if ((code = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS, &pSyncNode->pSyncRespMgr)) != 0) {
15,807!
1382
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
×
1383
    goto _error;
×
1384
  }
1385
  if (pSyncNode->pSyncRespMgr == NULL) {
15,806!
1386
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
×
1387
    goto _error;
×
1388
  }
1389

1390
  // restore state
1391
  pSyncNode->restoreFinish = false;
15,806✔
1392

1393
  // snapshot senders
1394
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
252,833✔
1395
    SSyncSnapshotSender* pSender = NULL;
237,029✔
1396
    code = snapshotSenderCreate(pSyncNode, i, &pSender);
237,029✔
1397
    if (pSender == NULL) return NULL;
237,010!
1398

1399
    pSyncNode->senders[i] = pSender;
237,010✔
1400
    sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender);
237,010✔
1401
  }
1402

1403
  // snapshot receivers
1404
  code = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID, &pSyncNode->pNewNodeReceiver);
15,804✔
1405
  if (pSyncNode->pNewNodeReceiver == NULL) return NULL;
15,806!
1406
  sRDebug(pSyncNode->pNewNodeReceiver, "snapshot receiver create while open sync node, data:%p",
15,806✔
1407
          pSyncNode->pNewNodeReceiver);
1408

1409
  // is config changing
1410
  pSyncNode->changing = false;
15,806✔
1411

1412
  // replication mgr
1413
  if ((code = syncNodeLogReplInit(pSyncNode)) < 0) {
15,806!
1414
    terrno = code;
×
1415
    sError("vgId:%d, failed to init repl mgr since %s.", pSyncNode->vgId, terrstr());
×
1416
    goto _error;
×
1417
  }
1418

1419
  // peer state
1420
  if ((code = syncNodePeerStateInit(pSyncNode)) < 0) {
15,806!
1421
    terrno = code;
×
1422
    sError("vgId:%d, failed to init peer stat since %s.", pSyncNode->vgId, terrstr());
×
1423
    goto _error;
×
1424
  }
1425

1426
  //
1427
  // min match index
1428
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
15,804✔
1429

1430
  // start in syncNodeStart
1431
  // start raft
1432

1433
  int64_t timeNow = taosGetTimestampMs();
15,807✔
1434
  pSyncNode->startTime = timeNow;
15,807✔
1435
  pSyncNode->lastReplicateTime = timeNow;
15,807✔
1436

1437
  // snapshotting
1438
  atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
15,807✔
1439

1440
  // init log buffer
1441
  if ((code = syncLogBufferInit(pSyncNode->pLogBuf, pSyncNode)) < 0) {
15,807!
1442
    terrno = code;
×
1443
    sError("vgId:%d, failed to init sync log buffer since %s", pSyncNode->vgId, terrstr());
×
1444
    goto _error;
×
1445
  }
1446

1447
  pSyncNode->isStart = true;
15,807✔
1448
  pSyncNode->electNum = 0;
15,807✔
1449
  pSyncNode->becomeLeaderNum = 0;
15,807✔
1450
  pSyncNode->becomeAssignedLeaderNum = 0;
15,807✔
1451
  pSyncNode->configChangeNum = 0;
15,807✔
1452
  pSyncNode->hbSlowNum = 0;
15,807✔
1453
  pSyncNode->hbrSlowNum = 0;
15,807✔
1454
  pSyncNode->tmrRoutineNum = 0;
15,807✔
1455

1456
  sNInfo(pSyncNode, "sync node opened, node:%p electInterval:%d heartbeatInterval:%d heartbeatTimeout:%d", pSyncNode,
15,807✔
1457
         tsElectInterval, tsHeartbeatInterval, tsHeartbeatTimeout);
1458
  return pSyncNode;
15,807✔
1459

1460
_error:
×
1461
  if (pSyncInfo->pFsm) {
×
1462
    taosMemoryFree(pSyncInfo->pFsm);
×
1463
    pSyncInfo->pFsm = NULL;
×
1464
  }
1465
  syncNodeClose(pSyncNode);
×
1466
  pSyncNode = NULL;
×
1467
  return NULL;
×
1468
}
1469

1470
#ifdef BUILD_NO_CALL
1471
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
1472
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
1473
    SSnapshot snapshot = {0};
1474
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
1475
    if (snapshot.lastApplyIndex > pSyncNode->commitIndex) {
1476
      pSyncNode->commitIndex = snapshot.lastApplyIndex;
1477
    }
1478
  }
1479
}
1480
#endif
1481

1482
int32_t syncNodeRestore(SSyncNode* pSyncNode) {
15,806✔
1483
  int32_t code = 0;
15,806✔
1484
  if (pSyncNode->pLogStore == NULL) {
15,806!
1485
    sError("vgId:%d, log store not created", pSyncNode->vgId);
×
1486
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1487
  }
1488
  if (pSyncNode->pLogBuf == NULL) {
15,806!
1489
    sError("vgId:%d, ring log buffer not created", pSyncNode->vgId);
×
1490
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1491
  }
1492

1493
  (void)taosThreadMutexLock(&pSyncNode->pLogBuf->mutex);
15,806✔
1494
  SyncIndex lastVer = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
15,806✔
1495
  SyncIndex commitIndex = pSyncNode->pLogStore->syncLogCommitIndex(pSyncNode->pLogStore);
15,806✔
1496
  SyncIndex endIndex = pSyncNode->pLogBuf->endIndex;
15,806✔
1497
  (void)taosThreadMutexUnlock(&pSyncNode->pLogBuf->mutex);
15,806✔
1498

1499
  if (lastVer != -1 && endIndex != lastVer + 1) {
15,806!
1500
    code = TSDB_CODE_WAL_LOG_INCOMPLETE;
×
1501
    sWarn("vgId:%d, failed to restore sync node since %s. expected lastLogIndex:%" PRId64 ", lastVer:%" PRId64,
×
1502
          pSyncNode->vgId, terrstr(), endIndex - 1, lastVer);
1503
    // TAOS_RETURN(code);
1504
  }
1505

1506
  // if (endIndex != lastVer + 1) return TSDB_CODE_SYN_INTERNAL_ERROR;
1507
  pSyncNode->commitIndex = TMAX(pSyncNode->commitIndex, commitIndex);
15,806✔
1508
  sInfo("vgId:%d, restore began, and keep syncing until commitIndex:%" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
15,806✔
1509

1510
  if (pSyncNode->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
31,612!
1511
      (code = syncLogBufferCommit(pSyncNode->pLogBuf, pSyncNode, pSyncNode->commitIndex, NULL, "restore")) < 0) {
15,806✔
1512
    TAOS_RETURN(code);
×
1513
  }
1514

1515
  TAOS_RETURN(code);
15,806✔
1516
}
1517

1518
int32_t syncNodeStart(SSyncNode* pSyncNode) {
15,806✔
1519
  // start raft
1520
  sInfo("vgId:%d, begin to start sync node", pSyncNode->vgId);
15,806✔
1521
  if (pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole == TAOS_SYNC_ROLE_LEARNER) {
15,806✔
1522
    syncNodeBecomeLearner(pSyncNode, "first start");
272✔
1523
  } else {
1524
    if (pSyncNode->replicaNum == 1) {
15,534✔
1525
      raftStoreNextTerm(pSyncNode);
12,512✔
1526
      syncNodeBecomeLeader(pSyncNode, "one replica start");
12,512✔
1527

1528
      // Raft 3.6.2 Committing entries from previous terms
1529
      TAOS_CHECK_RETURN(syncNodeAppendNoop(pSyncNode));
12,512!
1530
    } else {
1531
      SRaftId id = {0};
3,022✔
1532
      syncNodeBecomeFollower(pSyncNode, id, "first start");
3,022✔
1533
    }
1534
  }
1535

1536
  int32_t ret = 0;
15,806✔
1537
  ret = syncNodeStartPingTimer(pSyncNode);
15,806✔
1538
  if (ret != 0) {
15,806!
1539
    sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, tstrerror(ret));
×
1540
  }
1541
  sInfo("vgId:%d, sync node started", pSyncNode->vgId);
15,806✔
1542
  return ret;
15,806✔
1543
}
1544

1545
#ifdef BUILD_NO_CALL
1546
int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) {
1547
  // state change
1548
  int32_t code = 0;
1549
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
1550
  pSyncNode->roleTimeMs = taosGetTimestampMs();
1551
  // TODO check return value
1552
  TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
1553

1554
  // reset elect timer, long enough
1555
  int32_t electMS = TIMER_MAX_MS;
1556
  code = syncNodeRestartElectTimer(pSyncNode, electMS);
1557
  if (code < 0) {
1558
    sError("vgId:%d, failed to restart elect timer since %s", pSyncNode->vgId, terrstr());
1559
    return -1;
1560
  }
1561

1562
  code = syncNodeStartPingTimer(pSyncNode);
1563
  if (code < 0) {
1564
    sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, terrstr());
1565
    return -1;
1566
  }
1567
  return code;
1568
}
1569
#endif
1570

1571
void syncNodePreClose(SSyncNode* pSyncNode) {
15,806✔
1572
  int32_t code = 0;
15,806✔
1573
  if (pSyncNode == NULL) {
15,806!
1574
    sError("failed to pre close sync node since sync node is null");
×
1575
    return;
×
1576
  }
1577
  if (pSyncNode->pFsm == NULL) {
15,806!
1578
    sError("failed to pre close sync node since fsm is null");
×
1579
    return;
×
1580
  }
1581
  if (pSyncNode->pFsm->FpApplyQueueItems == NULL) {
15,806!
1582
    sError("failed to pre close sync node since FpApplyQueueItems is null");
×
1583
    return;
×
1584
  }
1585

1586
  // stop elect timer
1587
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
15,806!
1588
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
1589
    return;
×
1590
  }
1591

1592
  // stop heartbeat timer
1593
  if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
15,806!
1594
    sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
1595
    return;
×
1596
  }
1597

1598
  // stop ping timer
1599
  if ((code = syncNodeStopPingTimer(pSyncNode)) != 0) {
15,806!
1600
    sError("vgId:%d, failed to stop ping timer since %s", pSyncNode->vgId, tstrerror(code));
×
1601
    return;
×
1602
  }
1603

1604
  // clean rsp
1605
  syncRespCleanRsp(pSyncNode->pSyncRespMgr);
15,806✔
1606
}
1607

1608
void syncNodePostClose(SSyncNode* pSyncNode) {
13,745✔
1609
  if (pSyncNode->pNewNodeReceiver != NULL) {
13,745!
1610
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
13,745!
1611
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
1612
    }
1613

1614
    sDebug("vgId:%d, snapshot receiver destroy while preclose sync node, data:%p", pSyncNode->vgId,
13,745✔
1615
           pSyncNode->pNewNodeReceiver);
1616
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
13,745✔
1617
    pSyncNode->pNewNodeReceiver = NULL;
13,744✔
1618
  }
1619
}
13,744✔
1620

1621
void syncHbTimerDataFree(SSyncHbTimerData* pData) { taosMemoryFree(pData); }
2,473!
1622

1623
void syncNodeClose(SSyncNode* pSyncNode) {
15,806✔
1624
  int32_t code = 0;
15,806✔
1625
  if (pSyncNode == NULL) return;
15,806!
1626
  sNInfo(pSyncNode, "sync close, node:%p", pSyncNode);
15,806✔
1627

1628
  syncRespCleanRsp(pSyncNode->pSyncRespMgr);
15,806✔
1629

1630
  if ((code = syncNodeStopPingTimer(pSyncNode)) != 0) {
15,806!
1631
    sError("vgId:%d, failed to stop ping timer since %s", pSyncNode->vgId, tstrerror(code));
×
1632
    return;
×
1633
  }
1634
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
15,805!
1635
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
1636
    return;
×
1637
  }
1638
  if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
15,805!
1639
    sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
1640
    return;
×
1641
  }
1642
  syncNodeLogReplDestroy(pSyncNode);
15,805✔
1643

1644
  syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
15,806✔
1645
  pSyncNode->pSyncRespMgr = NULL;
15,803✔
1646
  voteGrantedDestroy(pSyncNode->pVotesGranted);
15,803✔
1647
  pSyncNode->pVotesGranted = NULL;
15,806✔
1648
  votesRespondDestory(pSyncNode->pVotesRespond);
15,806✔
1649
  pSyncNode->pVotesRespond = NULL;
15,806✔
1650
  syncIndexMgrDestroy(pSyncNode->pNextIndex);
15,806✔
1651
  pSyncNode->pNextIndex = NULL;
15,804✔
1652
  syncIndexMgrDestroy(pSyncNode->pMatchIndex);
15,804✔
1653
  pSyncNode->pMatchIndex = NULL;
15,805✔
1654
  logStoreDestory(pSyncNode->pLogStore);
15,805✔
1655
  pSyncNode->pLogStore = NULL;
15,804✔
1656
  syncLogBufferDestroy(pSyncNode->pLogBuf);
15,804✔
1657
  pSyncNode->pLogBuf = NULL;
15,806✔
1658

1659
  (void)taosThreadMutexDestroy(&pSyncNode->arbTokenMutex);
15,806✔
1660

1661
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
252,858✔
1662
    if (pSyncNode->senders[i] != NULL) {
237,052✔
1663
      sDebug("vgId:%d, snapshot sender destroy while close, data:%p", pSyncNode->vgId, pSyncNode->senders[i]);
237,050✔
1664

1665
      if (snapshotSenderIsStart(pSyncNode->senders[i])) {
237,054!
1666
        snapshotSenderStop(pSyncNode->senders[i], false);
×
1667
      }
1668

1669
      snapshotSenderDestroy(pSyncNode->senders[i]);
237,049✔
1670
      pSyncNode->senders[i] = NULL;
237,061✔
1671
    }
1672
  }
1673

1674
  if (pSyncNode->pNewNodeReceiver != NULL) {
15,806✔
1675
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
2,061!
1676
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
1677
    }
1678

1679
    sDebug("vgId:%d, snapshot receiver destroy while close, data:%p", pSyncNode->vgId, pSyncNode->pNewNodeReceiver);
2,061✔
1680
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
2,061✔
1681
    pSyncNode->pNewNodeReceiver = NULL;
2,061✔
1682
  }
1683

1684
  if (pSyncNode->pFsm != NULL) {
15,806✔
1685
    taosMemoryFree(pSyncNode->pFsm);
15,805!
1686
  }
1687

1688
  raftStoreClose(pSyncNode);
15,806✔
1689

1690
  taosMemoryFree(pSyncNode);
15,806!
1691
}
1692

1693
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->raftCfg.snapshotStrategy; }
×
1694

1695
// timer control --------------
1696
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
15,806✔
1697
  int32_t code = 0;
15,806✔
1698
  if (syncIsInit()) {
15,806!
1699
    bool stopped = taosTmrResetPriority(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, (void*)pSyncNode->rid,
15,806✔
1700
                                        syncEnv()->pTimerManager, &pSyncNode->pPingTimer, 2);
15,806✔
1701
    if (stopped) {
15,806!
1702
      sError("vgId:%d, failed to reset ping timer, ms:%d", pSyncNode->vgId, pSyncNode->pingTimerMS);
×
1703
      return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1704
    }
1705
    atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
15,806✔
1706
  } else {
1707
    sError("vgId:%d, start ping timer error, sync env is stop", pSyncNode->vgId);
×
1708
  }
1709
  return code;
15,806✔
1710
}
1711

1712
int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
31,612✔
1713
  int32_t code = 0;
31,612✔
1714
  (void)atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
31,612✔
1715
  bool stop = taosTmrStop(pSyncNode->pPingTimer);
31,612✔
1716
  sDebug("vgId:%d, stop ping timer, stop:%d", pSyncNode->vgId, stop);
31,611✔
1717
  pSyncNode->pPingTimer = NULL;
31,611✔
1718
  return code;
31,611✔
1719
}
1720

1721
int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
426,201✔
1722
  int32_t code = 0;
426,201✔
1723
  if (syncIsInit()) {
426,201!
1724
    pSyncNode->electTimerMS = ms;
426,201✔
1725

1726
    int64_t execTime = taosGetTimestampMs() + ms;
426,201✔
1727
    atomic_store_64(&(pSyncNode->electTimerParam.executeTime), execTime);
426,201✔
1728
    atomic_store_64(&(pSyncNode->electTimerParam.logicClock), pSyncNode->electTimerLogicClock);
426,203✔
1729
    pSyncNode->electTimerParam.pSyncNode = pSyncNode;
426,203✔
1730
    pSyncNode->electTimerParam.pData = NULL;
426,203✔
1731

1732
    bool stopped = taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, (void*)(pSyncNode->rid),
426,203✔
1733
                                syncEnv()->pTimerManager, &pSyncNode->pElectTimer);
426,203✔
1734
    if (stopped) sWarn("vgId:%d, failed to reset elect timer, ms:%d", pSyncNode->vgId, ms);
426,203!
1735
  } else {
1736
    sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
×
1737
  }
1738
  return code;
426,203✔
1739
}
1740

1741
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
471,356✔
1742
  int32_t code = 0;
471,356✔
1743
  (void)atomic_add_fetch_64(&pSyncNode->electTimerLogicClock, 1);
471,356✔
1744
  bool stop = taosTmrStop(pSyncNode->pElectTimer);
471,357✔
1745
  sTrace("vgId:%d, stop elect timer, stop:%d", pSyncNode->vgId, stop);
471,357✔
1746
  pSyncNode->pElectTimer = NULL;
471,357✔
1747

1748
  return code;
471,357✔
1749
}
1750

1751
int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
426,203✔
1752
  int32_t ret = 0;
426,203✔
1753
  TAOS_CHECK_RETURN(syncNodeStopElectTimer(pSyncNode));
426,203!
1754
  TAOS_CHECK_RETURN(syncNodeStartElectTimer(pSyncNode, ms));
426,203!
1755
  return ret;
426,203✔
1756
}
1757

1758
void syncNodeResetElectTimer(SSyncNode* pSyncNode) {
426,202✔
1759
  int32_t code = 0;
426,202✔
1760
  int32_t electMS;
1761

1762
  if (pSyncNode->raftCfg.isStandBy) {
426,202!
1763
    electMS = TIMER_MAX_MS;
×
1764
  } else {
1765
    electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
426,202✔
1766
  }
1767

1768
  if ((code = syncNodeRestartElectTimer(pSyncNode, electMS)) != 0) {
426,203!
1769
    sWarn("vgId:%d, failed to restart elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
1770
    return;
×
1771
  };
1772

1773
  sNTrace(pSyncNode, "reset elect timer, min:%d, max:%d, ms:%d", pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine,
426,203!
1774
          electMS);
1775
}
1776

1777
#ifdef BUILD_NO_CALL
1778
static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) {
1779
  int32_t code = 0;
1780
  if (syncIsInit()) {
1781
    TAOS_CHECK_RETURN(taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, (void*)pSyncNode->rid,
1782
                                   syncEnv()->pTimerManager, &pSyncNode->pHeartbeatTimer));
1783
    atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
1784
  } else {
1785
    sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId);
1786
  }
1787

1788
  sNTrace(pSyncNode, "start heartbeat timer, ms:%d", pSyncNode->heartbeatTimerMS);
1789
  return code;
1790
}
1791
#endif
1792

1793
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
15,297✔
1794
  int32_t ret = 0;
15,297✔
1795

1796
#if 0
1797
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
1798
  ret = syncNodeDoStartHeartbeatTimer(pSyncNode);
1799
#endif
1800

1801
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
17,774✔
1802
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
2,477✔
1803
    if (pSyncTimer != NULL) {
2,477!
1804
      TAOS_CHECK_RETURN(syncHbTimerStart(pSyncNode, pSyncTimer));
2,477!
1805
    }
1806
  }
1807

1808
  return ret;
15,297✔
1809
}
1810

1811
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
38,621✔
1812
  int32_t code = 0;
38,621✔
1813

1814
#if 0
1815
  TAOS_CHECK_RETURN(atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1));
1816
  bool stop = taosTmrStop(pSyncNode->pHeartbeatTimer);
1817
  sDebug("vgId:%d, stop heartbeat timer, stop:%d", pSyncNode->vgId, stop);
1818
  pSyncNode->pHeartbeatTimer = NULL;
1819
#endif
1820

1821
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
63,874✔
1822
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
25,252✔
1823
    if (pSyncTimer != NULL) {
25,254!
1824
      TAOS_CHECK_RETURN(syncHbTimerStop(pSyncNode, pSyncTimer));
25,254!
1825
    }
1826
  }
1827

1828
  return code;
38,622✔
1829
}
1830

1831
#ifdef BUILD_NO_CALL
1832
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) {
1833
  // TODO check return value
1834
  int32_t code = 0;
1835
  TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
1836
  TAOS_CHECK_RETURN(syncNodeStartHeartbeatTimer(pSyncNode));
1837
  return 0;
1838
}
1839
#endif
1840

1841
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pNode, SRpcMsg* pMsg) {
2,401,010✔
1842
  SEpSet* epSet = NULL;
2,401,010✔
1843
  for (int32_t i = 0; i < pNode->peersNum; ++i) {
2,790,170✔
1844
    if (destRaftId->addr == pNode->peersId[i].addr) {
2,790,138✔
1845
      epSet = &pNode->peersEpset[i];
2,400,978✔
1846
      break;
2,400,978✔
1847
    }
1848
  }
1849

1850
  int32_t code = -1;
2,401,010✔
1851
  if (pNode->syncSendMSg != NULL && epSet != NULL) {
2,401,010!
1852
    syncUtilMsgHtoN(pMsg->pCont);
2,400,979✔
1853
    pMsg->info.noResp = 1;
2,400,972✔
1854
    code = pNode->syncSendMSg(epSet, pMsg);
2,400,972✔
1855
  }
1856

1857
  if (code < 0) {
2,401,022✔
1858
    sError("vgId:%d, failed to send sync msg since %s. epset:%p dnode:%d addr:0x%" PRIx64, pNode->vgId, tstrerror(code),
36!
1859
           epSet, DID(destRaftId), destRaftId->addr);
1860
    rpcFreeCont(pMsg->pCont);
36✔
1861
  }
1862

1863
  TAOS_RETURN(code);
2,401,022✔
1864
}
1865

1866
inline bool syncNodeInConfig(SSyncNode* pNode, const SSyncCfg* pCfg) {
2,559✔
1867
  bool b1 = false;
2,559✔
1868
  bool b2 = false;
2,559✔
1869

1870
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
3,037!
1871
    if (strcmp(pCfg->nodeInfo[i].nodeFqdn, pNode->myNodeInfo.nodeFqdn) == 0 &&
3,037!
1872
        pCfg->nodeInfo[i].nodePort == pNode->myNodeInfo.nodePort) {
3,037✔
1873
      b1 = true;
2,559✔
1874
      break;
2,559✔
1875
    }
1876
  }
1877

1878
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
3,037!
1879
    SRaftId raftId = {
3,037✔
1880
        .addr = SYNC_ADDR(&pCfg->nodeInfo[i]),
3,037✔
1881
        .vgId = pNode->vgId,
3,037✔
1882
    };
1883

1884
    if (syncUtilSameId(&raftId, &pNode->myRaftId)) {
3,037✔
1885
      b2 = true;
2,559✔
1886
      break;
2,559✔
1887
    }
1888
  }
1889

1890
  if (b1 != b2) {
2,559!
1891
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1892
    return false;
×
1893
  }
1894
  return b1;
2,559✔
1895
}
1896

1897
static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg) {
3,385✔
1898
  if (pOldCfg->totalReplicaNum != pNewCfg->totalReplicaNum) return true;
3,385✔
1899
  if (pOldCfg->myIndex != pNewCfg->myIndex) return true;
2,378✔
1900
  for (int32_t i = 0; i < pOldCfg->totalReplicaNum; ++i) {
5,590✔
1901
    const SNodeInfo* pOldInfo = &pOldCfg->nodeInfo[i];
3,940✔
1902
    const SNodeInfo* pNewInfo = &pNewCfg->nodeInfo[i];
3,940✔
1903
    if (strcmp(pOldInfo->nodeFqdn, pNewInfo->nodeFqdn) != 0) return true;
3,940!
1904
    if (pOldInfo->nodePort != pNewInfo->nodePort) return true;
3,940✔
1905
    if (pOldInfo->nodeRole != pNewInfo->nodeRole) return true;
3,938✔
1906
  }
1907

1908
  return false;
1,650✔
1909
}
1910

1911
int32_t syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) {
1,953✔
1912
  int32_t  code = 0;
1,953✔
1913
  SSyncCfg oldConfig = pSyncNode->raftCfg.cfg;
1,953✔
1914
  if (!syncIsConfigChanged(&oldConfig, pNewConfig)) {
1,953✔
1915
    sInfo("vgId:1, sync not reconfig since not changed");
1,650✔
1916
    return 0;
1,650✔
1917
  }
1918

1919
  pSyncNode->raftCfg.cfg = *pNewConfig;
303✔
1920
  pSyncNode->raftCfg.lastConfigIndex = lastConfigChangeIndex;
303✔
1921

1922
  pSyncNode->configChangeNum++;
303✔
1923

1924
  bool IamInOld = syncNodeInConfig(pSyncNode, &oldConfig);
303✔
1925
  bool IamInNew = syncNodeInConfig(pSyncNode, pNewConfig);
303✔
1926

1927
  bool isDrop = false;
303✔
1928
  bool isAdd = false;
303✔
1929

1930
  if (IamInOld && !IamInNew) {
303!
1931
    isDrop = true;
×
1932
  } else {
1933
    isDrop = false;
303✔
1934
  }
1935

1936
  if (!IamInOld && IamInNew) {
303!
1937
    isAdd = true;
×
1938
  } else {
1939
    isAdd = false;
303✔
1940
  }
1941

1942
  // log begin config change
1943
  sNInfo(pSyncNode, "begin do config change, from %d to %d, from %" PRId64 " to %" PRId64 ", replicas:%d",
303!
1944
         pSyncNode->vgId, oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum, oldConfig.lastIndex,
1945
         pNewConfig->lastIndex);
1946

1947
  if (IamInNew) {
303!
1948
    pSyncNode->raftCfg.isStandBy = 0;  // change isStandBy to normal
303✔
1949
  }
1950
  if (isDrop) {
303!
1951
    pSyncNode->raftCfg.isStandBy = 1;  // set standby
×
1952
  }
1953

1954
  // add last config index
1955
  SRaftCfg* pCfg = &pSyncNode->raftCfg;
303✔
1956
  if (pCfg->configIndexCount >= MAX_CONFIG_INDEX_COUNT) {
303!
1957
    sNError(pSyncNode, "failed to add cfg index:%d since out of range", pCfg->configIndexCount);
×
1958
    terrno = TSDB_CODE_OUT_OF_RANGE;
×
1959
    return -1;
×
1960
  }
1961

1962
  pCfg->configIndexArr[pCfg->configIndexCount] = lastConfigChangeIndex;
303✔
1963
  pCfg->configIndexCount++;
303✔
1964

1965
  if (IamInNew) {
303!
1966
    //-----------------------------------------
1967
    int32_t ret = 0;
303✔
1968

1969
    // save snapshot senders
1970
    SRaftId oldReplicasId[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
1971
    memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId));
303✔
1972
    SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
1973
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
4,848✔
1974
      oldSenders[i] = pSyncNode->senders[i];
4,545✔
1975
      sSTrace(oldSenders[i], "snapshot sender save old");
4,545!
1976
    }
1977

1978
    // init internal
1979
    pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
303✔
1980
    if (syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId) == false) return terrno;
303!
1981

1982
    // init peersNum, peers, peersId
1983
    pSyncNode->peersNum = pSyncNode->raftCfg.cfg.totalReplicaNum - 1;
303✔
1984
    int32_t j = 0;
303✔
1985
    for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
1,098✔
1986
      if (i != pSyncNode->raftCfg.cfg.myIndex) {
795✔
1987
        pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
492✔
1988
        syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
492✔
1989
        j++;
492✔
1990
      }
1991
    }
1992
    for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
795✔
1993
      if (syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]) == false)
492!
1994
        return terrno;
×
1995
    }
1996

1997
    // init replicaNum, replicasId
1998
    pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
303✔
1999
    pSyncNode->totalReplicaNum = pSyncNode->raftCfg.cfg.totalReplicaNum;
303✔
2000
    for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
1,098✔
2001
      if (syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]) ==
795!
2002
          false)
2003
        return terrno;
×
2004
    }
2005

2006
    // update quorum first
2007
    pSyncNode->quorum = syncUtilQuorum(pSyncNode->raftCfg.cfg.replicaNum);
303✔
2008

2009
    syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode);
303✔
2010
    syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode);
303✔
2011
    voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode);
303✔
2012
    votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode);
303✔
2013

2014
    // reset snapshot senders
2015

2016
    // clear new
2017
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
4,848✔
2018
      pSyncNode->senders[i] = NULL;
4,545✔
2019
    }
2020

2021
    // reset new
2022
    for (int32_t i = 0; i < pSyncNode->totalReplicaNum; ++i) {
1,098✔
2023
      // reset sender
2024
      bool reset = false;
795✔
2025
      for (int32_t j = 0; j < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++j) {
3,396✔
2026
        if (syncUtilSameId(&(pSyncNode->replicasId)[i], &oldReplicasId[j]) && oldSenders[j] != NULL) {
3,253!
2027
          sNTrace(pSyncNode, "snapshot sender reset for:%" PRId64 ", newIndex:%d, dnode:%d, %p",
652!
2028
                  (pSyncNode->replicasId)[i].addr, i, DID(&pSyncNode->replicasId[i]), oldSenders[j]);
2029

2030
          pSyncNode->senders[i] = oldSenders[j];
652✔
2031
          oldSenders[j] = NULL;
652✔
2032
          reset = true;
652✔
2033

2034
          // reset replicaIndex
2035
          int32_t oldreplicaIndex = pSyncNode->senders[i]->replicaIndex;
652✔
2036
          pSyncNode->senders[i]->replicaIndex = i;
652✔
2037

2038
          sNTrace(pSyncNode, "snapshot sender udpate replicaIndex from %d to %d, dnode:%d, %p, reset:%d",
652!
2039
                  oldreplicaIndex, i, DID(&pSyncNode->replicasId[i]), pSyncNode->senders[i], reset);
2040

2041
          break;
652✔
2042
        }
2043
      }
2044
    }
2045

2046
    // create new
2047
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
4,848✔
2048
      if (pSyncNode->senders[i] == NULL) {
4,545✔
2049
        TAOS_CHECK_RETURN(snapshotSenderCreate(pSyncNode, i, &pSyncNode->senders[i]));
3,893!
2050
        if (pSyncNode->senders[i] == NULL) {
3,893!
2051
          // will be created later while send snapshot
2052
          sSError(pSyncNode->senders[i], "snapshot sender create failed while reconfig");
×
2053
        } else {
2054
          sSDebug(pSyncNode->senders[i], "snapshot sender create while reconfig, data:%p", pSyncNode->senders[i]);
3,893✔
2055
        }
2056
      } else {
2057
        sSDebug(pSyncNode->senders[i], "snapshot sender already exist, data:%p", pSyncNode->senders[i]);
652✔
2058
      }
2059
    }
2060

2061
    // free old
2062
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
4,848✔
2063
      if (oldSenders[i] != NULL) {
4,545✔
2064
        sSDebug(oldSenders[i], "snapshot sender destroy old, data:%p replica-index:%d", oldSenders[i], i);
3,893✔
2065
        snapshotSenderDestroy(oldSenders[i]);
3,893✔
2066
        oldSenders[i] = NULL;
3,893✔
2067
      }
2068
    }
2069

2070
    // persist cfg
2071
    TAOS_CHECK_RETURN(syncWriteCfgFile(pSyncNode));
303!
2072
  } else {
2073
    // persist cfg
2074
    TAOS_CHECK_RETURN(syncWriteCfgFile(pSyncNode));
×
2075
    sNInfo(pSyncNode, "do not config change from %d to %d", oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum);
×
2076
  }
2077

2078
_END:
×
2079
  // log end config change
2080
  sNInfo(pSyncNode, "end do config change, from %d to %d", oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum);
303!
2081
  return 0;
303✔
2082
}
2083

2084
// raft state change --------------
2085
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
13,443✔
2086
  if (term > raftStoreGetTerm(pSyncNode)) {
13,443!
2087
    raftStoreSetTerm(pSyncNode, term);
×
2088
  }
2089
}
13,443✔
2090

2091
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm, SRaftId id) {
367,463✔
2092
  SyncTerm currentTerm = raftStoreGetTerm(pSyncNode);
367,463✔
2093
  if (currentTerm > newTerm) {
367,463!
2094
    sNTrace(pSyncNode, "step down, ignore, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
×
2095
    return;
×
2096
  }
2097

2098
  do {
2099
    sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
367,463!
2100
  } while (0);
2101

2102
  if (pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
367,463!
2103
    (void)taosThreadMutexLock(&pSyncNode->arbTokenMutex);
×
2104
    syncUtilGenerateArbToken(pSyncNode->myNodeInfo.nodeId, pSyncNode->vgId, pSyncNode->arbToken);
×
2105
    sInfo("vgId:%d, step down as assigned leader, new arbToken:%s", pSyncNode->vgId, pSyncNode->arbToken);
×
2106
    (void)taosThreadMutexUnlock(&pSyncNode->arbTokenMutex);
×
2107
  }
2108

2109
  if (currentTerm < newTerm) {
367,462✔
2110
    raftStoreSetTerm(pSyncNode, newTerm);
2,209✔
2111
    char tmpBuf[64];
2112
    snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm);
2,209✔
2113
    syncNodeBecomeFollower(pSyncNode, id, tmpBuf);
2,209✔
2114
    raftStoreClearVote(pSyncNode);
2,209✔
2115
  } else {
2116
    if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
365,253✔
2117
      syncNodeBecomeFollower(pSyncNode, id, "step down");
9✔
2118
    }
2119
  }
2120
}
2121

2122
void syncNodeLeaderChangeRsp(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); }
5,255✔
2123

2124
void syncNodeBecomeFollower(SSyncNode* pSyncNode, SRaftId leaderId, const char* debugStr) {
5,255✔
2125
  int32_t code = 0;  // maybe clear leader cache
5,255✔
2126
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
5,255✔
2127
    pSyncNode->leaderCache = EMPTY_RAFT_ID;
31✔
2128
    pSyncNode->leaderCacheEp.port = 0;
31✔
2129
    pSyncNode->leaderCacheEp.fqdn[0] = '\0';
31✔
2130
  }
2131

2132
  pSyncNode->hbSlowNum = 0;
5,255✔
2133

2134
  pSyncNode->leaderCache = leaderId;  // state change
5,255✔
2135

2136
  for (int32_t i = 0; i < pSyncNode->totalReplicaNum; ++i) {
16,764✔
2137
    if (syncUtilSameId(&pSyncNode->replicasId[i], &leaderId)) {
13,727✔
2138
      pSyncNode->leaderCacheEp.port = pSyncNode->raftCfg.cfg.nodeInfo[i].nodePort;
2,218✔
2139
      strncpy(pSyncNode->leaderCacheEp.fqdn, pSyncNode->raftCfg.cfg.nodeInfo[i].nodeFqdn, TSDB_FQDN_LEN);
2,218✔
2140
      break;
2,218✔
2141
    }
2142
  }
2143
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
5,255✔
2144
  pSyncNode->roleTimeMs = taosGetTimestampMs();
5,255✔
2145
  if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
5,255!
2146
    sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
2147
    return;
×
2148
  }
2149

2150
  // trace log
2151
  sNTrace(pSyncNode, "become follower %s", debugStr);
5,255!
2152

2153
  // send rsp to client
2154
  syncNodeLeaderChangeRsp(pSyncNode);
5,255✔
2155

2156
  // call back
2157
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeFollowerCb != NULL) {
5,255!
2158
    pSyncNode->pFsm->FpBecomeFollowerCb(pSyncNode->pFsm);
5,255✔
2159
  }
2160

2161
  // min match index
2162
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
5,255✔
2163

2164
  // reset log buffer
2165
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
5,255!
2166
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2167
    return;
×
2168
  }
2169

2170
  // reset elect timer
2171
  syncNodeResetElectTimer(pSyncNode);
5,255✔
2172

2173
  sInfo("vgId:%d, become follower. %s", pSyncNode->vgId, debugStr);
5,255!
2174
}
2175

2176
void syncNodeBecomeLearner(SSyncNode* pSyncNode, const char* debugStr) {
272✔
2177
  pSyncNode->hbSlowNum = 0;
272✔
2178

2179
  // state change
2180
  pSyncNode->state = TAOS_SYNC_STATE_LEARNER;
272✔
2181
  pSyncNode->roleTimeMs = taosGetTimestampMs();
272✔
2182

2183
  // trace log
2184
  sNTrace(pSyncNode, "become learner %s", debugStr);
272!
2185

2186
  // call back
2187
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLearnerCb != NULL) {
272!
2188
    pSyncNode->pFsm->FpBecomeLearnerCb(pSyncNode->pFsm);
272✔
2189
  }
2190

2191
  // min match index
2192
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
272✔
2193

2194
  // reset log buffer
2195
  int32_t code = 0;
272✔
2196
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
272!
2197
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2198
    return;
×
2199
  };
2200
}
2201

2202
// TLA+ Spec
2203
// \* Candidate i transitions to leader.
2204
// BecomeLeader(i) ==
2205
//     /\ state[i] = Candidate
2206
//     /\ votesGranted[i] \in Quorum
2207
//     /\ state'      = [state EXCEPT ![i] = Leader]
2208
//     /\ nextIndex'  = [nextIndex EXCEPT ![i] =
2209
//                          [j \in Server |-> Len(log[i]) + 1]]
2210
//     /\ matchIndex' = [matchIndex EXCEPT ![i] =
2211
//                          [j \in Server |-> 0]]
2212
//     /\ elections'  = elections \cup
2213
//                          {[eterm     |-> currentTerm[i],
2214
//                            eleader   |-> i,
2215
//                            elog      |-> log[i],
2216
//                            evotes    |-> votesGranted[i],
2217
//                            evoterLog |-> voterLog[i]]}
2218
//     /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
2219
//
2220
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
13,542✔
2221
  int32_t code = 0;
13,542✔
2222
  pSyncNode->becomeLeaderNum++;
13,542✔
2223
  pSyncNode->hbrSlowNum = 0;
13,542✔
2224

2225
  // reset restoreFinish
2226
  pSyncNode->restoreFinish = false;
13,542✔
2227

2228
  // state change
2229
  pSyncNode->state = TAOS_SYNC_STATE_LEADER;
13,542✔
2230
  pSyncNode->roleTimeMs = taosGetTimestampMs();
13,542✔
2231

2232
  // set leader cache
2233
  pSyncNode->leaderCache = pSyncNode->myRaftId;
13,542✔
2234
  strncpy(pSyncNode->leaderCacheEp.fqdn, pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeFqdn,
13,542✔
2235
          TSDB_FQDN_LEN);
2236
  pSyncNode->leaderCacheEp.port = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodePort;
13,542✔
2237

2238
  for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
29,099✔
2239
    SyncIndex lastIndex;
2240
    SyncTerm  lastTerm;
2241
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
15,557✔
2242
    if (code != 0) {
15,557!
2243
      sError("vgId:%d, failed to become leader since %s", pSyncNode->vgId, tstrerror(code));
×
2244
      return;
×
2245
    }
2246
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
15,557✔
2247
  }
2248

2249
  for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
29,099✔
2250
    // maybe overwrite myself, no harm
2251
    // just do it!
2252
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
15,557✔
2253
  }
2254

2255
  // init peer mgr
2256
  if ((code = syncNodePeerStateInit(pSyncNode)) != 0) {
13,542!
2257
    sError("vgId:%d, failed to init peer state since %s", pSyncNode->vgId, tstrerror(code));
×
2258
    return;
×
2259
  }
2260

2261
#if 0
2262
  // update sender private term
2263
  SSyncSnapshotSender* pMySender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->myRaftId));
2264
  if (pMySender != NULL) {
2265
    for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
2266
      if (pSyncNode->senders[i]->privateTerm > pMySender->privateTerm) {
2267
        pMySender->privateTerm = pSyncNode->senders[i]->privateTerm;
2268
      }
2269
    }
2270
    (pMySender->privateTerm) += 100;
2271
  }
2272
#endif
2273

2274
  // close receiver
2275
  if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
13,542!
2276
    snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
2277
  }
2278

2279
  // stop elect timer
2280
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
13,542!
2281
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
2282
    return;
×
2283
  }
2284

2285
  // start heartbeat timer
2286
  if ((code = syncNodeStartHeartbeatTimer(pSyncNode)) != 0) {
13,542!
2287
    sError("vgId:%d, failed to start heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
2288
    return;
×
2289
  }
2290

2291
  // send heartbeat right now
2292
  if ((code = syncNodeHeartbeatPeers(pSyncNode)) != 0) {
13,542!
2293
    sError("vgId:%d, failed to send heartbeat to peers since %s", pSyncNode->vgId, tstrerror(code));
×
2294
    return;
×
2295
  }
2296

2297
  // call back
2298
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLeaderCb != NULL) {
13,542!
2299
    pSyncNode->pFsm->FpBecomeLeaderCb(pSyncNode->pFsm);
13,542✔
2300
  }
2301

2302
  // min match index
2303
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
13,542✔
2304

2305
  // reset log buffer
2306
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
13,542!
2307
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2308
    return;
×
2309
  }
2310

2311
  // trace log
2312
  sNInfo(pSyncNode, "become leader %s", debugStr);
13,542✔
2313
}
2314

2315
void syncNodeBecomeAssignedLeader(SSyncNode* pSyncNode) {
×
2316
  int32_t code = 0;
×
2317
  pSyncNode->becomeAssignedLeaderNum++;
×
2318
  pSyncNode->hbrSlowNum = 0;
×
2319

2320
  // reset restoreFinish
2321
  // pSyncNode->restoreFinish = false;
2322

2323
  // state change
2324
  pSyncNode->state = TAOS_SYNC_STATE_ASSIGNED_LEADER;
×
2325
  pSyncNode->roleTimeMs = taosGetTimestampMs();
×
2326

2327
  // set leader cache
2328
  pSyncNode->leaderCache = pSyncNode->myRaftId;
×
2329

2330
  for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
×
2331
    SyncIndex lastIndex;
2332
    SyncTerm  lastTerm;
2333
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
×
2334
    if (code != 0) {
×
2335
      sError("vgId:%d, failed to become assigned leader since %s", pSyncNode->vgId, tstrerror(code));
×
2336
      return;
×
2337
    }
2338
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
×
2339
  }
2340

2341
  for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
×
2342
    // maybe overwrite myself, no harm
2343
    // just do it!
2344
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
×
2345
  }
2346

2347
  // init peer mgr
2348
  if ((code = syncNodePeerStateInit(pSyncNode)) != 0) {
×
2349
    sError("vgId:%d, failed to init peer state since %s", pSyncNode->vgId, tstrerror(code));
×
2350
    return;
×
2351
  }
2352

2353
  // close receiver
2354
  if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
×
2355
    snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
2356
  }
2357

2358
  // stop elect timer
2359
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
×
2360
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
2361
    return;
×
2362
  }
2363

2364
  // start heartbeat timer
2365
  if ((code = syncNodeStartHeartbeatTimer(pSyncNode)) != 0) {
×
2366
    sError("vgId:%d, failed to start heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
2367
    return;
×
2368
  }
2369

2370
  // send heartbeat right now
2371
  if ((code = syncNodeHeartbeatPeers(pSyncNode)) != 0) {
×
2372
    sError("vgId:%d, failed to send heartbeat to peers since %s", pSyncNode->vgId, tstrerror(code));
×
2373
    return;
×
2374
  }
2375

2376
  // call back
2377
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeAssignedLeaderCb != NULL) {
×
2378
    pSyncNode->pFsm->FpBecomeAssignedLeaderCb(pSyncNode->pFsm);
×
2379
  }
2380

2381
  // min match index
2382
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
×
2383

2384
  // reset log buffer
2385
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
×
2386
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2387
    return;
×
2388
  }
2389

2390
  // trace log
2391
  sNInfo(pSyncNode, "become assigned leader");
×
2392
}
2393

2394
void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
1,030✔
2395
  if (pSyncNode->state != TAOS_SYNC_STATE_CANDIDATE) {
1,030!
2396
    sError("vgId:%d, failed leader from candidate since node state is wrong:%d", pSyncNode->vgId, pSyncNode->state);
×
2397
    return;
×
2398
  }
2399
  bool granted = voteGrantedMajority(pSyncNode->pVotesGranted);
1,030✔
2400
  if (!granted) {
1,030!
2401
    sError("vgId:%d, not granted by majority.", pSyncNode->vgId);
×
2402
    return;
×
2403
  }
2404
  syncNodeBecomeLeader(pSyncNode, "candidate to leader");
1,030✔
2405

2406
  sNTrace(pSyncNode, "state change syncNodeCandidate2Leader");
1,030!
2407

2408
  int32_t ret = syncNodeAppendNoop(pSyncNode);
1,030✔
2409
  if (ret < 0) {
1,030!
2410
    sError("vgId:%d, failed to append noop entry since %s", pSyncNode->vgId, terrstr());
×
2411
  }
2412

2413
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
1,030✔
2414

2415
  sInfo("vgId:%d, become leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64, pSyncNode->vgId,
1,030!
2416
        raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
2417
}
2418

2419
bool syncNodeIsMnode(SSyncNode* pSyncNode) { return (pSyncNode->vgId == 1); }
499,415✔
2420

2421
int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) {
29,343✔
2422
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
469,510✔
2423
    pSyncNode->peerStates[i].lastSendIndex = SYNC_INDEX_INVALID;
440,167✔
2424
    pSyncNode->peerStates[i].lastSendTime = 0;
440,167✔
2425
  }
2426

2427
  return 0;
29,343✔
2428
}
2429

2430
void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
1,376✔
2431
  if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
1,376!
2432
    sError("vgId:%d, failed candidate from follower since node state is wrong:%d", pSyncNode->vgId, pSyncNode->state);
×
2433
    return;
×
2434
  }
2435
  pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
1,376✔
2436
  pSyncNode->roleTimeMs = taosGetTimestampMs();
1,376✔
2437
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
1,376✔
2438
  sInfo("vgId:%d, become candidate from follower. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
1,376!
2439
        pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
2440

2441
  sNTrace(pSyncNode, "follower to candidate");
1,376!
2442
}
2443

2444
int32_t syncNodeAssignedLeader2Leader(SSyncNode* pSyncNode) {
×
2445
  if (pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) return TSDB_CODE_SYN_INTERNAL_ERROR;
×
2446
  syncNodeBecomeLeader(pSyncNode, "assigned leader to leader");
×
2447

2448
  sNTrace(pSyncNode, "assigned leader to leader");
×
2449

2450
  int32_t ret = syncNodeAppendNoop(pSyncNode);
×
2451
  if (ret < 0) {
×
2452
    sError("vgId:%d, failed to append noop entry since %s", pSyncNode->vgId, tstrerror(ret));
×
2453
  }
2454

2455
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
×
2456
  sInfo("vgId:%d, become leader from assigned leader. term:%" PRId64 ", commit index:%" PRId64
×
2457
        "assigned commit index:%" PRId64 ", last index:%" PRId64,
2458
        pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, pSyncNode->assignedCommitIndex,
2459
        lastIndex);
2460
  return 0;
×
2461
}
2462

2463
// just called by syncNodeVoteForSelf
2464
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
1,452✔
2465
  SyncTerm storeTerm = raftStoreGetTerm(pSyncNode);
1,452✔
2466
  if (term != storeTerm) {
1,452!
2467
    sError("vgId:%d, failed to vote for term, term:%" PRId64 ", storeTerm:%" PRId64, pSyncNode->vgId, term, storeTerm);
×
2468
    return;
×
2469
  }
2470
  sTrace("vgId:%d, begin hasVoted", pSyncNode->vgId);
1,452!
2471
  bool voted = raftStoreHasVoted(pSyncNode);
1,452✔
2472
  if (voted) {
1,452!
2473
    sError("vgId:%d, failed to vote for term since not voted", pSyncNode->vgId);
×
2474
    return;
×
2475
  }
2476

2477
  raftStoreVote(pSyncNode, pRaftId);
1,452✔
2478
}
2479

2480
// simulate get vote from outside
2481
void syncNodeVoteForSelf(SSyncNode* pSyncNode, SyncTerm currentTerm) {
1,452✔
2482
  syncNodeVoteForTerm(pSyncNode, currentTerm, &pSyncNode->myRaftId);
1,452✔
2483

2484
  SRpcMsg rpcMsg = {0};
1,452✔
2485
  int32_t ret = syncBuildRequestVoteReply(&rpcMsg, pSyncNode->vgId);
1,452✔
2486
  if (ret != 0) return;
1,452!
2487

2488
  SyncRequestVoteReply* pMsg = rpcMsg.pCont;
1,452✔
2489
  pMsg->srcId = pSyncNode->myRaftId;
1,452✔
2490
  pMsg->destId = pSyncNode->myRaftId;
1,452✔
2491
  pMsg->term = currentTerm;
1,452✔
2492
  pMsg->voteGranted = true;
1,452✔
2493

2494
  voteGrantedVote(pSyncNode->pVotesGranted, pMsg);
1,452✔
2495
  votesRespondAdd(pSyncNode->pVotesRespond, pMsg);
1,452✔
2496
  rpcFreeCont(rpcMsg.pCont);
1,452✔
2497
}
2498

2499
// return if has a snapshot
2500
bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
20,581✔
2501
  bool      ret = false;
20,581✔
2502
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
20,581✔
2503
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
20,581!
2504
    // TODO check return value
2505
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
20,581✔
2506
    if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) {
20,581✔
2507
      ret = true;
2,337✔
2508
    }
2509
  }
2510
  return ret;
20,581✔
2511
}
2512

2513
// return max(logLastIndex, snapshotLastIndex)
2514
// if no snapshot and log, return -1
2515
SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode) {
20,748✔
2516
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
20,748✔
2517
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
20,748!
2518
    // TODO check return value
2519
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
20,748✔
2520
  }
2521
  SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
20,748✔
2522

2523
  SyncIndex lastIndex = logLastIndex > snapshot.lastApplyIndex ? logLastIndex : snapshot.lastApplyIndex;
20,748✔
2524
  return lastIndex;
20,748✔
2525
}
2526

2527
// return the last term of snapshot and log
2528
// if error, return SYNC_TERM_INVALID (by syncLogLastTerm)
2529
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) {
20,581✔
2530
  SyncTerm lastTerm = 0;
20,581✔
2531
  if (syncNodeHasSnapshot(pSyncNode)) {
20,581✔
2532
    // has snapshot
2533
    SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
2,337✔
2534
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
2,337!
2535
      // TODO check return value
2536
      (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
2,337✔
2537
    }
2538

2539
    SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
2,337✔
2540
    if (logLastIndex > snapshot.lastApplyIndex) {
2,337✔
2541
      lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
1,380✔
2542
    } else {
2543
      lastTerm = snapshot.lastApplyTerm;
957✔
2544
    }
2545

2546
  } else {
2547
    // no snapshot
2548
    lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
18,244✔
2549
  }
2550

2551
  return lastTerm;
20,581✔
2552
}
2553

2554
// get last index and term along with snapshot
2555
int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm) {
18,339✔
2556
  *pLastIndex = syncNodeGetLastIndex(pSyncNode);
18,339✔
2557
  *pLastTerm = syncNodeGetLastTerm(pSyncNode);
18,339✔
2558
  return 0;
18,339✔
2559
}
2560

2561
#ifdef BUILD_NO_CALL
2562
// return append-entries first try index
2563
SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) {
2564
  SyncIndex syncStartIndex = syncNodeGetLastIndex(pSyncNode) + 1;
2565
  return syncStartIndex;
2566
}
2567

2568
// if index > 0, return index - 1
2569
// else, return -1
2570
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
2571
  SyncIndex preIndex = index - 1;
2572
  if (preIndex < SYNC_INDEX_INVALID) {
2573
    preIndex = SYNC_INDEX_INVALID;
2574
  }
2575

2576
  return preIndex;
2577
}
2578

2579
// if index < 0, return SYNC_TERM_INVALID
2580
// if index == 0, return 0
2581
// if index > 0, return preTerm
2582
// if error, return SYNC_TERM_INVALID
2583
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
2584
  if (index < SYNC_INDEX_BEGIN) {
2585
    return SYNC_TERM_INVALID;
2586
  }
2587

2588
  if (index == SYNC_INDEX_BEGIN) {
2589
    return 0;
2590
  }
2591

2592
  SyncTerm  preTerm = 0;
2593
  SyncIndex preIndex = index - 1;
2594

2595
  SSyncRaftEntry* pPreEntry = NULL;
2596
  SLRUCache*      pCache = pSyncNode->pLogStore->pCache;
2597
  LRUHandle*      h = taosLRUCacheLookup(pCache, &preIndex, sizeof(preIndex));
2598
  int32_t         code = 0;
2599
  if (h) {
2600
    pPreEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
2601
    code = 0;
2602

2603
    pSyncNode->pLogStore->cacheHit++;
2604
    sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", preIndex, pPreEntry->bytes, pPreEntry);
2605

2606
  } else {
2607
    pSyncNode->pLogStore->cacheMiss++;
2608
    sNTrace(pSyncNode, "miss cache index:%" PRId64, preIndex);
2609

2610
    code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry);
2611
  }
2612

2613
  SSnapshot snapshot = {.data = NULL,
2614
                        .lastApplyIndex = SYNC_INDEX_INVALID,
2615
                        .lastApplyTerm = SYNC_TERM_INVALID,
2616
                        .lastConfigIndex = SYNC_INDEX_INVALID};
2617

2618
  if (code == 0) {
2619
    if (pPreEntry == NULL) return -1;
2620
    preTerm = pPreEntry->term;
2621

2622
    if (h) {
2623
      taosLRUCacheRelease(pCache, h, false);
2624
    } else {
2625
      syncEntryDestroy(pPreEntry);
2626
    }
2627

2628
    return preTerm;
2629
  } else {
2630
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
2631
      // TODO check return value
2632
      (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
2633
      if (snapshot.lastApplyIndex == preIndex) {
2634
        return snapshot.lastApplyTerm;
2635
      }
2636
    }
2637
  }
2638

2639
  sNError(pSyncNode, "sync node get pre term error, index:%" PRId64 ", snap-index:%" PRId64 ", snap-term:%" PRId64,
2640
          index, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
2641
  return SYNC_TERM_INVALID;
2642
}
2643

2644
// get pre index and term of "index"
2645
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm) {
2646
  *pPreIndex = syncNodeGetPreIndex(pSyncNode, index);
2647
  *pPreTerm = syncNodeGetPreTerm(pSyncNode, index);
2648
  return 0;
2649
}
2650
#endif
2651

2652
static void syncNodeEqPingTimer(void* param, void* tmrId) {
233,723✔
2653
  if (!syncIsInit()) return;
233,723!
2654

2655
  int64_t    rid = (int64_t)param;
233,723✔
2656
  SSyncNode* pNode = syncNodeAcquire(rid);
233,723✔
2657

2658
  if (pNode == NULL) return;
233,723!
2659

2660
  if (atomic_load_64(&pNode->pingTimerLogicClockUser) <= atomic_load_64(&pNode->pingTimerLogicClock)) {
233,723!
2661
    SRpcMsg rpcMsg = {0};
233,723✔
2662
    int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock),
233,723✔
2663
                                    pNode->pingTimerMS, pNode);
2664
    if (code != 0) {
233,723!
2665
      sError("failed to build ping msg");
×
2666
      rpcFreeCont(rpcMsg.pCont);
×
2667
      goto _out;
×
2668
    }
2669

2670
    // sTrace("enqueue ping msg");
2671
    code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
233,723✔
2672
    if (code != 0) {
233,723!
2673
      sError("failed to sync enqueue ping msg since %s", terrstr());
×
2674
      rpcFreeCont(rpcMsg.pCont);
×
2675
      goto _out;
×
2676
    }
2677

2678
  _out:
233,723✔
2679
    if (taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager,
233,723!
2680
                     &pNode->pPingTimer))
2681
      sError("failed to reset ping timer");
×
2682
  }
2683
  syncNodeRelease(pNode);
233,723✔
2684
}
2685

2686
static void syncNodeEqElectTimer(void* param, void* tmrId) {
1,456✔
2687
  if (!syncIsInit()) return;
1,459!
2688

2689
  int64_t    rid = (int64_t)param;
1,456✔
2690
  SSyncNode* pNode = syncNodeAcquire(rid);
1,456✔
2691

2692
  if (pNode == NULL) return;
1,456✔
2693

2694
  if (pNode->syncEqMsg == NULL) {
1,453!
2695
    syncNodeRelease(pNode);
×
2696
    return;
×
2697
  }
2698

2699
  int64_t tsNow = taosGetTimestampMs();
1,453✔
2700
  if (tsNow < pNode->electTimerParam.executeTime) {
1,453!
2701
    syncNodeRelease(pNode);
×
2702
    return;
×
2703
  }
2704

2705
  SRpcMsg rpcMsg = {0};
1,453✔
2706
  int32_t code =
2707
      syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pNode->electTimerParam.logicClock, pNode->electTimerMS, pNode);
1,453✔
2708

2709
  if (code != 0) {
1,453!
2710
    sError("failed to build elect msg");
×
2711
    syncNodeRelease(pNode);
×
2712
    return;
×
2713
  }
2714

2715
  SyncTimeout* pTimeout = rpcMsg.pCont;
1,453✔
2716
  sNTrace(pNode, "enqueue elect msg lc:%" PRId64, pTimeout->logicClock);
1,453!
2717

2718
  code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
1,453✔
2719
  if (code != 0) {
1,453!
2720
    sError("failed to sync enqueue elect msg since %s", terrstr());
×
2721
    rpcFreeCont(rpcMsg.pCont);
×
2722
    syncNodeRelease(pNode);
×
2723
    return;
×
2724
  }
2725

2726
  syncNodeRelease(pNode);
1,453✔
2727
}
2728

2729
#ifdef BUILD_NO_CALL
2730
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
2731
  if (!syncIsInit()) return;
2732

2733
  int64_t    rid = (int64_t)param;
2734
  SSyncNode* pNode = syncNodeAcquire(rid);
2735

2736
  if (pNode == NULL) return;
2737

2738
  if (pNode->totalReplicaNum > 1) {
2739
    if (atomic_load_64(&pNode->heartbeatTimerLogicClockUser) <= atomic_load_64(&pNode->heartbeatTimerLogicClock)) {
2740
      SRpcMsg rpcMsg = {0};
2741
      int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pNode->heartbeatTimerLogicClock),
2742
                                      pNode->heartbeatTimerMS, pNode);
2743

2744
      if (code != 0) {
2745
        sError("failed to build heartbeat msg");
2746
        goto _out;
2747
      }
2748

2749
      sTrace("vgId:%d, enqueue heartbeat timer", pNode->vgId);
2750
      code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
2751
      if (code != 0) {
2752
        sError("failed to enqueue heartbeat msg since %s", terrstr());
2753
        rpcFreeCont(rpcMsg.pCont);
2754
        goto _out;
2755
      }
2756

2757
    _out:
2758
      if (taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager,
2759
                       &pNode->pHeartbeatTimer) != 0)
2760
        return;
2761

2762
    } else {
2763
      sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRId64 ", heartbeatTimerLogicClockUser:%" PRId64,
2764
             pNode->heartbeatTimerLogicClock, pNode->heartbeatTimerLogicClockUser);
2765
    }
2766
  }
2767
}
2768
#endif
2769

2770
static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
44,592✔
2771
  int32_t code = 0;
44,592✔
2772
  int64_t hbDataRid = (int64_t)param;
44,592✔
2773
  int64_t tsNow = taosGetTimestampMs();
44,592✔
2774

2775
  SSyncHbTimerData* pData = syncHbTimerDataAcquire(hbDataRid);
44,592✔
2776
  if (pData == NULL) {
44,592!
2777
    sError("hb timer get pData NULL, %" PRId64, hbDataRid);
×
2778
    return;
×
2779
  }
2780

2781
  SSyncNode* pSyncNode = syncNodeAcquire(pData->syncNodeRid);
44,592✔
2782
  if (pSyncNode == NULL) {
44,592✔
2783
    syncHbTimerDataRelease(pData);
3✔
2784
    sError("hb timer get pSyncNode NULL");
3!
2785
    return;
3✔
2786
  }
2787

2788
  SSyncTimer* pSyncTimer = pData->pTimer;
44,589✔
2789

2790
  if (!pSyncNode->isStart) {
44,589!
2791
    syncNodeRelease(pSyncNode);
×
2792
    syncHbTimerDataRelease(pData);
×
2793
    sError("vgId:%d, hb timer sync node already stop", pSyncNode->vgId);
×
2794
    return;
×
2795
  }
2796

2797
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
44,589!
2798
    syncNodeRelease(pSyncNode);
×
2799
    syncHbTimerDataRelease(pData);
×
2800
    sError("vgId:%d, hb timer sync node not leader", pSyncNode->vgId);
×
2801
    return;
×
2802
  }
2803

2804
  sTrace("vgId:%d, peer hb timer execution, rid:%" PRId64 " addr:0x%" PRIx64, pSyncNode->vgId, hbDataRid,
44,589!
2805
         pData->destId.addr);
2806

2807
  if (pSyncNode->totalReplicaNum > 1) {
44,589✔
2808
    int64_t timerLogicClock = atomic_load_64(&pSyncTimer->logicClock);
44,587✔
2809
    int64_t msgLogicClock = atomic_load_64(&pData->logicClock);
44,587✔
2810

2811
    if (timerLogicClock == msgLogicClock) {
44,587✔
2812
      if (tsNow > pData->execTime) {
44,584✔
2813
        pData->execTime += pSyncTimer->timerMS;
44,519✔
2814

2815
        SRpcMsg rpcMsg = {0};
44,519✔
2816
        if ((code = syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId)) != 0) {
44,519!
2817
          sError("vgId:%d, failed to build heartbeat msg since %s", pSyncNode->vgId, tstrerror(code));
×
2818
          syncNodeRelease(pSyncNode);
×
2819
          syncHbTimerDataRelease(pData);
×
2820
          return;
×
2821
        }
2822

2823
        pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode);
44,519✔
2824

2825
        SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
44,519✔
2826
        pSyncMsg->srcId = pSyncNode->myRaftId;
44,519✔
2827
        pSyncMsg->destId = pData->destId;
44,519✔
2828
        pSyncMsg->term = raftStoreGetTerm(pSyncNode);
44,519✔
2829
        pSyncMsg->commitIndex = pSyncNode->commitIndex;
44,519✔
2830
        pSyncMsg->minMatchIndex = pSyncNode->minMatchIndex;
44,519✔
2831
        pSyncMsg->privateTerm = 0;
44,519✔
2832
        pSyncMsg->timeStamp = tsNow;
44,519✔
2833

2834
        // update reset time
2835
        int64_t timerElapsed = tsNow - pSyncTimer->timeStamp;
44,519✔
2836
        pSyncTimer->timeStamp = tsNow;
44,519✔
2837

2838
        // send msg
2839
        TRACE_SET_MSGID(&(rpcMsg.info.traceId), tGenIdPI64());
44,519✔
2840
        TRACE_SET_ROOTID(&(rpcMsg.info.traceId), tGenIdPI64());
44,519✔
2841
        sGTrace(&rpcMsg.info.traceId, "vgId:%d, send sync-heartbeat to dnode:%d", pSyncNode->vgId,
44,519!
2842
                DID(&(pSyncMsg->destId)));
2843
        syncLogSendHeartbeat(pSyncNode, pSyncMsg, false, timerElapsed, pData->execTime);
44,519✔
2844
        int ret = syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
44,519✔
2845
        if (ret != 0) {
44,519✔
2846
          sError("vgId:%d, failed to send heartbeat since %s", pSyncNode->vgId, tstrerror(ret));
36!
2847
        }
2848
      }
2849

2850
      if (syncIsInit()) {
44,584!
2851
        sTrace("vgId:%d, reset peer hb timer at %d", pSyncNode->vgId, pSyncTimer->timerMS);
44,584!
2852
        bool stopped = taosTmrResetPriority(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS, (void*)hbDataRid,
44,584✔
2853
                                            syncEnv()->pTimerManager, &pSyncTimer->pTimer, 2);
44,584✔
2854
        if (stopped) sError("vgId:%d, reset peer hb timer error, %s", pSyncNode->vgId, tstrerror(code));
44,584!
2855

2856
      } else {
2857
        sError("sync env is stop, reset peer hb timer error");
×
2858
      }
2859

2860
    } else {
2861
      sTrace("vgId:%d, do not send hb, timerLogicClock:%" PRId64 ", msgLogicClock:%" PRId64, pSyncNode->vgId,
3!
2862
             timerLogicClock, msgLogicClock);
2863
    }
2864
  }
2865

2866
  syncHbTimerDataRelease(pData);
44,589✔
2867
  syncNodeRelease(pSyncNode);
44,589✔
2868
}
2869

2870
#ifdef BUILD_NO_CALL
2871
static void deleteCacheEntry(const void* key, size_t keyLen, void* value, void* ud) {
2872
  (void)ud;
2873
  taosMemoryFree(value);
2874
}
2875

2876
int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) {
2877
  SSyncLogStoreData* pData = pLogStore->data;
2878
  sNTrace(pData->pSyncNode, "in cache index:%" PRId64 ", bytes:%u, %p", pEntry->index, pEntry->bytes, pEntry);
2879

2880
  int32_t   code = 0;
2881
  int32_t   entryLen = sizeof(*pEntry) + pEntry->dataLen;
2882
  LRUStatus status = taosLRUCacheInsert(pLogStore->pCache, &pEntry->index, sizeof(pEntry->index), pEntry, entryLen,
2883
                                        deleteCacheEntry, h, TAOS_LRU_PRIORITY_LOW, NULL);
2884
  if (status != TAOS_LRU_STATUS_OK) {
2885
    code = -1;
2886
  }
2887

2888
  return code;
2889
}
2890
#endif
2891

2892
void syncBuildConfigFromReq(SAlterVnodeReplicaReq* pReq, SSyncCfg* cfg) {  // TODO SAlterVnodeReplicaReq name is proper?
×
2893
  cfg->replicaNum = 0;
×
2894
  cfg->totalReplicaNum = 0;
×
2895
  int32_t code = 0;
×
2896

2897
  for (int i = 0; i < pReq->replica; ++i) {
×
2898
    SNodeInfo* pNode = &cfg->nodeInfo[i];
×
2899
    pNode->nodeId = pReq->replicas[i].id;
×
2900
    pNode->nodePort = pReq->replicas[i].port;
×
2901
    tstrncpy(pNode->nodeFqdn, pReq->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
×
2902
    pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
×
2903
    bool update = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
×
2904
    sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d, update:%d", pReq->vgId, i, pNode->nodeFqdn,
×
2905
          pNode->nodePort, pNode->nodeId, pNode->nodeRole, update);
2906
    cfg->replicaNum++;
×
2907
  }
2908
  if (pReq->selfIndex != -1) {
×
2909
    cfg->myIndex = pReq->selfIndex;
×
2910
  }
2911
  for (int i = cfg->replicaNum; i < pReq->replica + pReq->learnerReplica; ++i) {
×
2912
    SNodeInfo* pNode = &cfg->nodeInfo[i];
×
2913
    pNode->nodeId = pReq->learnerReplicas[cfg->totalReplicaNum].id;
×
2914
    pNode->nodePort = pReq->learnerReplicas[cfg->totalReplicaNum].port;
×
2915
    pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
2916
    tstrncpy(pNode->nodeFqdn, pReq->learnerReplicas[cfg->totalReplicaNum].fqdn, sizeof(pNode->nodeFqdn));
×
2917
    bool update = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
×
2918
    sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d, update:%d", pReq->vgId, i, pNode->nodeFqdn,
×
2919
          pNode->nodePort, pNode->nodeId, pNode->nodeRole, update);
2920
    cfg->totalReplicaNum++;
×
2921
  }
2922
  cfg->totalReplicaNum += pReq->replica;
×
2923
  if (pReq->learnerSelfIndex != -1) {
×
2924
    cfg->myIndex = pReq->replica + pReq->learnerSelfIndex;
×
2925
  }
2926
  cfg->changeVersion = pReq->changeVersion;
×
2927
}
×
2928

2929
int32_t syncNodeCheckChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry) {
×
2930
  int32_t code = 0;
×
2931
  if (pEntry->originalRpcType != TDMT_SYNC_CONFIG_CHANGE) {
×
2932
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
2933
  }
2934

2935
  SMsgHead* head = (SMsgHead*)pEntry->data;
×
2936
  void*     pReq = POINTER_SHIFT(head, sizeof(SMsgHead));
×
2937

2938
  SAlterVnodeTypeReq req = {0};
×
2939
  if (tDeserializeSAlterVnodeReplicaReq(pReq, head->contLen, &req) != 0) {
×
2940
    code = TSDB_CODE_INVALID_MSG;
×
2941
    TAOS_RETURN(code);
×
2942
  }
2943

2944
  SSyncCfg cfg = {0};
×
2945
  syncBuildConfigFromReq(&req, &cfg);
×
2946

2947
  if (cfg.totalReplicaNum >= 1 &&
×
2948
      (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER)) {
×
2949
    bool incfg = false;
×
2950
    for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
×
2951
      if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
×
2952
          ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
×
2953
        incfg = true;
×
2954
        break;
×
2955
      }
2956
    }
2957

2958
    if (!incfg) {
×
2959
      SyncTerm currentTerm = raftStoreGetTerm(ths);
×
2960
      SRaftId  id = EMPTY_RAFT_ID;
×
2961
      syncNodeStepDown(ths, currentTerm, id);
×
2962
      return 1;
×
2963
    }
2964
  }
2965
  return 0;
×
2966
}
2967

2968
void syncNodeLogConfigInfo(SSyncNode* ths, SSyncCfg* cfg, char* str) {
×
2969
  sInfo("vgId:%d, %s. SyncNode, replicaNum:%d, peersNum:%d, lastConfigIndex:%" PRId64
×
2970
        ", changeVersion:%d, "
2971
        "restoreFinish:%d",
2972
        ths->vgId, str, ths->replicaNum, ths->peersNum, ths->raftCfg.lastConfigIndex, ths->raftCfg.cfg.changeVersion,
2973
        ths->restoreFinish);
2974

2975
  sInfo("vgId:%d, %s, myNodeInfo, clusterId:0x%" PRIx64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str,
×
2976
        ths->myNodeInfo.clusterId, ths->myNodeInfo.nodeId, ths->myNodeInfo.nodeFqdn, ths->myNodeInfo.nodePort,
2977
        ths->myNodeInfo.nodeRole);
2978

2979
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
2980
    sInfo("vgId:%d, %s, peersNodeInfo%d, clusterId:0x%" PRIx64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str,
×
2981
          i, ths->peersNodeInfo[i].clusterId, ths->peersNodeInfo[i].nodeId, ths->peersNodeInfo[i].nodeFqdn,
2982
          ths->peersNodeInfo[i].nodePort, ths->peersNodeInfo[i].nodeRole);
2983
  }
2984

2985
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
2986
    char    buf[256];
2987
    int32_t len = 256;
×
2988
    int32_t n = 0;
×
2989
    n += tsnprintf(buf + n, len - n, "%s", "{");
×
2990
    for (int i = 0; i < ths->peersEpset->numOfEps; i++) {
×
2991
      n += tsnprintf(buf + n, len - n, "%s:%d%s", ths->peersEpset->eps[i].fqdn, ths->peersEpset->eps[i].port,
×
2992
                     (i + 1 < ths->peersEpset->numOfEps ? ", " : ""));
×
2993
    }
2994
    n += tsnprintf(buf + n, len - n, "%s", "}");
×
2995

2996
    sInfo("vgId:%d, %s, peersEpset%d, %s, inUse:%d", ths->vgId, str, i, buf, ths->peersEpset->inUse);
×
2997
  }
2998

2999
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
3000
    sInfo("vgId:%d, %s, peersId%d, addr:0x%" PRIx64, ths->vgId, str, i, ths->peersId[i].addr);
×
3001
  }
3002

3003
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3004
    sInfo("vgId:%d, %s, nodeInfo%d, clusterId:0x%" PRIx64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str, i,
×
3005
          ths->raftCfg.cfg.nodeInfo[i].clusterId, ths->raftCfg.cfg.nodeInfo[i].nodeId,
3006
          ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, ths->raftCfg.cfg.nodeInfo[i].nodePort,
3007
          ths->raftCfg.cfg.nodeInfo[i].nodeRole);
3008
  }
3009

3010
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3011
    sInfo("vgId:%d, %s, replicasId%d, addr:0x%" PRIx64, ths->vgId, str, i, ths->replicasId[i].addr);
×
3012
  }
3013
}
×
3014

3015
int32_t syncNodeRebuildPeerAndCfg(SSyncNode* ths, SSyncCfg* cfg) {
×
3016
  int32_t i = 0;
×
3017

3018
  // change peersNodeInfo
3019
  i = 0;
×
3020
  for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
3021
    if (!(strcmp(ths->myNodeInfo.nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
3022
          ths->myNodeInfo.nodePort == cfg->nodeInfo[j].nodePort)) {
×
3023
      ths->peersNodeInfo[i].nodeRole = cfg->nodeInfo[j].nodeRole;
×
3024
      ths->peersNodeInfo[i].clusterId = cfg->nodeInfo[j].clusterId;
×
3025
      tstrncpy(ths->peersNodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn, TSDB_FQDN_LEN);
×
3026
      ths->peersNodeInfo[i].nodeId = cfg->nodeInfo[j].nodeId;
×
3027
      ths->peersNodeInfo[i].nodePort = cfg->nodeInfo[j].nodePort;
×
3028

3029
      syncUtilNodeInfo2EpSet(&ths->peersNodeInfo[i], &ths->peersEpset[i]);
×
3030

3031
      if (syncUtilNodeInfo2RaftId(&ths->peersNodeInfo[i], ths->vgId, &ths->peersId[i]) == false) {
×
3032
        sError("vgId:%d, failed to determine raft member id, peer:%d", ths->vgId, i);
×
3033
        return terrno;
×
3034
      }
3035

3036
      i++;
×
3037
    }
3038
  }
3039
  ths->peersNum = i;
×
3040

3041
  // change cfg nodeInfo
3042
  ths->raftCfg.cfg.replicaNum = 0;
×
3043
  i = 0;
×
3044
  for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
3045
    if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3046
      ths->raftCfg.cfg.replicaNum++;
×
3047
    }
3048
    ths->raftCfg.cfg.nodeInfo[i].nodeRole = cfg->nodeInfo[j].nodeRole;
×
3049
    ths->raftCfg.cfg.nodeInfo[i].clusterId = cfg->nodeInfo[j].clusterId;
×
3050
    tstrncpy(ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn, TSDB_FQDN_LEN);
×
3051
    ths->raftCfg.cfg.nodeInfo[i].nodeId = cfg->nodeInfo[j].nodeId;
×
3052
    ths->raftCfg.cfg.nodeInfo[i].nodePort = cfg->nodeInfo[j].nodePort;
×
3053
    if ((strcmp(ths->myNodeInfo.nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
3054
         ths->myNodeInfo.nodePort == cfg->nodeInfo[j].nodePort)) {
×
3055
      ths->raftCfg.cfg.myIndex = i;
×
3056
    }
3057
    i++;
×
3058
  }
3059
  ths->raftCfg.cfg.totalReplicaNum = i;
×
3060

3061
  return 0;
×
3062
}
3063

3064
void syncNodeChangePeerAndCfgToVoter(SSyncNode* ths, SSyncCfg* cfg) {
×
3065
  // change peersNodeInfo
3066
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
3067
    for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
3068
      if (strcmp(ths->peersNodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
3069
          ths->peersNodeInfo[i].nodePort == cfg->nodeInfo[j].nodePort) {
×
3070
        if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3071
          ths->peersNodeInfo[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3072
        }
3073
      }
3074
    }
3075
  }
3076

3077
  // change cfg nodeInfo
3078
  ths->raftCfg.cfg.replicaNum = 0;
×
3079
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3080
    for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
3081
      if (strcmp(ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
3082
          ths->raftCfg.cfg.nodeInfo[i].nodePort == cfg->nodeInfo[j].nodePort) {
×
3083
        if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3084
          ths->raftCfg.cfg.nodeInfo[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3085
          ths->raftCfg.cfg.replicaNum++;
×
3086
        }
3087
      }
3088
    }
3089
  }
3090
}
×
3091

3092
int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum) {
×
3093
  int32_t code = 0;
×
3094
  // 1.rebuild replicasId, remove deleted one
3095
  SRaftId oldReplicasId[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
3096
  memcpy(oldReplicasId, ths->replicasId, sizeof(oldReplicasId));
×
3097

3098
  ths->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3099
  ths->totalReplicaNum = ths->raftCfg.cfg.totalReplicaNum;
×
3100
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3101
    if (syncUtilNodeInfo2RaftId(&ths->raftCfg.cfg.nodeInfo[i], ths->vgId, &ths->replicasId[i]) == false) return terrno;
×
3102
  }
3103

3104
  // 2.rebuild MatchIndex, remove deleted one
3105
  SSyncIndexMgr* oldIndex = ths->pMatchIndex;
×
3106

3107
  ths->pMatchIndex = syncIndexMgrCreate(ths);
×
3108
  if (ths->pMatchIndex == NULL) {
×
3109
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
3110
    if (terrno != 0) code = terrno;
×
3111
    TAOS_RETURN(code);
×
3112
  }
3113

3114
  syncIndexMgrCopyIfExist(ths->pMatchIndex, oldIndex, oldReplicasId);
×
3115

3116
  syncIndexMgrDestroy(oldIndex);
×
3117

3118
  // 3.rebuild NextIndex, remove deleted one
3119
  SSyncIndexMgr* oldNextIndex = ths->pNextIndex;
×
3120

3121
  ths->pNextIndex = syncIndexMgrCreate(ths);
×
3122
  if (ths->pNextIndex == NULL) {
×
3123
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
3124
    if (terrno != 0) code = terrno;
×
3125
    TAOS_RETURN(code);
×
3126
  }
3127

3128
  syncIndexMgrCopyIfExist(ths->pNextIndex, oldNextIndex, oldReplicasId);
×
3129

3130
  syncIndexMgrDestroy(oldNextIndex);
×
3131

3132
  // 4.rebuild pVotesGranted, pVotesRespond, no need to keep old vote state, only rebuild
3133
  voteGrantedUpdate(ths->pVotesGranted, ths);
×
3134
  votesRespondUpdate(ths->pVotesRespond, ths);
×
3135

3136
  // 5.rebuild logReplMgr
3137
  for (int i = 0; i < oldtotalReplicaNum; ++i) {
×
3138
    sDebug("vgId:%d, old logReplMgrs i:%d, peerId:%d, restoreed:%d, [%" PRId64 " %" PRId64 ", %" PRId64 ")", ths->vgId,
×
3139
           i, ths->logReplMgrs[i]->peerId, ths->logReplMgrs[i]->restored, ths->logReplMgrs[i]->startIndex,
3140
           ths->logReplMgrs[i]->matchIndex, ths->logReplMgrs[i]->endIndex);
3141
  }
3142

3143
  SSyncLogReplMgr* oldLogReplMgrs = NULL;
×
3144
  int64_t          length = sizeof(SSyncLogReplMgr) * (TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA);
×
3145
  oldLogReplMgrs = taosMemoryMalloc(length);
×
3146
  if (NULL == oldLogReplMgrs) return terrno;
×
3147
  memset(oldLogReplMgrs, 0, length);
×
3148

3149
  for (int i = 0; i < oldtotalReplicaNum; i++) {
×
3150
    oldLogReplMgrs[i] = *(ths->logReplMgrs[i]);
×
3151
  }
3152

3153
  syncNodeLogReplDestroy(ths);
×
3154
  if ((code = syncNodeLogReplInit(ths)) != 0) {
×
3155
    taosMemoryFree(oldLogReplMgrs);
×
3156
    TAOS_RETURN(code);
×
3157
  }
3158

3159
  for (int i = 0; i < ths->totalReplicaNum; ++i) {
×
3160
    for (int j = 0; j < oldtotalReplicaNum; j++) {
×
3161
      if (syncUtilSameId(&ths->replicasId[i], &oldReplicasId[j])) {
×
3162
        *(ths->logReplMgrs[i]) = oldLogReplMgrs[j];
×
3163
        ths->logReplMgrs[i]->peerId = i;
×
3164
      }
3165
    }
3166
  }
3167

3168
  for (int i = 0; i < ths->totalReplicaNum; ++i) {
×
3169
    sDebug("vgId:%d, new logReplMgrs i:%d, peerId:%d, restoreed:%d, [%" PRId64 " %" PRId64 ", %" PRId64 ")", ths->vgId,
×
3170
           i, ths->logReplMgrs[i]->peerId, ths->logReplMgrs[i]->restored, ths->logReplMgrs[i]->startIndex,
3171
           ths->logReplMgrs[i]->matchIndex, ths->logReplMgrs[i]->endIndex);
3172
  }
3173

3174
  // 6.rebuild sender
3175
  for (int i = 0; i < oldtotalReplicaNum; ++i) {
×
3176
    sDebug("vgId:%d, old sender i:%d, replicaIndex:%d, lastSendTime:%" PRId64, ths->vgId, i,
×
3177
           ths->senders[i]->replicaIndex, ths->senders[i]->lastSendTime)
3178
  }
3179

3180
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
×
3181
    if (ths->senders[i] != NULL) {
×
3182
      sDebug("vgId:%d, snapshot sender destroy while close, data:%p", ths->vgId, ths->senders[i]);
×
3183

3184
      if (snapshotSenderIsStart(ths->senders[i])) {
×
3185
        snapshotSenderStop(ths->senders[i], false);
×
3186
      }
3187

3188
      snapshotSenderDestroy(ths->senders[i]);
×
3189
      ths->senders[i] = NULL;
×
3190
    }
3191
  }
3192

3193
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
×
3194
    SSyncSnapshotSender* pSender = NULL;
×
3195
    int32_t              code = snapshotSenderCreate(ths, i, &pSender);
×
3196
    if (pSender == NULL) return terrno = code;
×
3197

3198
    ths->senders[i] = pSender;
×
3199
    sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender);
×
3200
  }
3201

3202
  for (int i = 0; i < ths->totalReplicaNum; i++) {
×
3203
    sDebug("vgId:%d, new sender i:%d, replicaIndex:%d, lastSendTime:%" PRId64, ths->vgId, i,
×
3204
           ths->senders[i]->replicaIndex, ths->senders[i]->lastSendTime)
3205
  }
3206

3207
  // 7.rebuild synctimer
3208
  if ((code = syncNodeStopHeartbeatTimer(ths)) != 0) {
×
3209
    taosMemoryFree(oldLogReplMgrs);
×
3210
    TAOS_RETURN(code);
×
3211
  }
3212

3213
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
×
3214
    if ((code = syncHbTimerInit(ths, &ths->peerHeartbeatTimerArr[i], ths->replicasId[i])) != 0) {
×
3215
      taosMemoryFree(oldLogReplMgrs);
×
3216
      TAOS_RETURN(code);
×
3217
    }
3218
  }
3219

3220
  if ((code = syncNodeStartHeartbeatTimer(ths)) != 0) {
×
3221
    taosMemoryFree(oldLogReplMgrs);
×
3222
    TAOS_RETURN(code);
×
3223
  }
3224

3225
  // 8.rebuild peerStates
3226
  SPeerState oldState[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA] = {0};
×
3227
  for (int i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; i++) {
×
3228
    oldState[i] = ths->peerStates[i];
×
3229
  }
3230

3231
  for (int i = 0; i < ths->totalReplicaNum; i++) {
×
3232
    for (int j = 0; j < oldtotalReplicaNum; j++) {
×
3233
      if (syncUtilSameId(&ths->replicasId[i], &oldReplicasId[j])) {
×
3234
        ths->peerStates[i] = oldState[j];
×
3235
      }
3236
    }
3237
  }
3238

3239
  taosMemoryFree(oldLogReplMgrs);
×
3240

3241
  return 0;
×
3242
}
3243

3244
void syncNodeChangeToVoter(SSyncNode* ths) {
×
3245
  // replicasId, only need to change replicaNum when 1->3
3246
  ths->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3247
  sDebug("vgId:%d, totalReplicaNum:%d", ths->vgId, ths->totalReplicaNum);
×
3248
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
×
3249
    sDebug("vgId:%d, i:%d, replicaId.addr:0x%" PRIx64, ths->vgId, i, ths->replicasId[i].addr);
×
3250
  }
3251

3252
  // pMatchIndex, pNextIndex, only need to change replicaNum when 1->3
3253
  ths->pMatchIndex->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3254
  ths->pNextIndex->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3255

3256
  sDebug("vgId:%d, pMatchIndex->totalReplicaNum:%d", ths->vgId, ths->pMatchIndex->totalReplicaNum);
×
3257
  for (int32_t i = 0; i < ths->pMatchIndex->totalReplicaNum; ++i) {
×
3258
    sDebug("vgId:%d, i:%d, match.index:%" PRId64, ths->vgId, i, ths->pMatchIndex->index[i]);
×
3259
  }
3260

3261
  // pVotesGranted, pVotesRespond
3262
  voteGrantedUpdate(ths->pVotesGranted, ths);
×
3263
  votesRespondUpdate(ths->pVotesRespond, ths);
×
3264

3265
  // logRepMgrs
3266
  // no need to change logRepMgrs when 1->3
3267
}
×
3268

3269
void syncNodeResetPeerAndCfg(SSyncNode* ths) {
×
3270
  SNodeInfo node = {0};
×
3271
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
3272
    memcpy(&ths->peersNodeInfo[i], &node, sizeof(SNodeInfo));
×
3273
  }
3274

3275
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3276
    memcpy(&ths->raftCfg.cfg.nodeInfo[i], &node, sizeof(SNodeInfo));
×
3277
  }
3278
}
×
3279

3280
int32_t syncNodeChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry, char* str) {
×
3281
  int32_t code = 0;
×
3282
  if (pEntry->originalRpcType != TDMT_SYNC_CONFIG_CHANGE) {
×
3283
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3284
  }
3285

3286
  SMsgHead* head = (SMsgHead*)pEntry->data;
×
3287
  void*     pReq = POINTER_SHIFT(head, sizeof(SMsgHead));
×
3288

3289
  SAlterVnodeTypeReq req = {0};
×
3290
  if (tDeserializeSAlterVnodeReplicaReq(pReq, head->contLen, &req) != 0) {
×
3291
    code = TSDB_CODE_INVALID_MSG;
×
3292
    TAOS_RETURN(code);
×
3293
  }
3294

3295
  SSyncCfg cfg = {0};
×
3296
  syncBuildConfigFromReq(&req, &cfg);
×
3297

3298
  if (cfg.changeVersion <= ths->raftCfg.cfg.changeVersion) {
×
3299
    sInfo(
×
3300
        "vgId:%d, skip conf change entry since lower version. "
3301
        "this entry, index:%" PRId64 ", term:%" PRId64
3302
        ", totalReplicaNum:%d, changeVersion:%d; "
3303
        "current node, replicaNum:%d, peersNum:%d, lastConfigIndex:%" PRId64 ", changeVersion:%d",
3304
        ths->vgId, pEntry->index, pEntry->term, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum, ths->peersNum,
3305
        ths->raftCfg.lastConfigIndex, ths->raftCfg.cfg.changeVersion);
3306
    return 0;
×
3307
  }
3308

3309
  if (strcmp(str, "Commit") == 0) {
×
3310
    sInfo(
×
3311
        "vgId:%d, change config from %s. "
3312
        "this, i:%" PRId64
3313
        ", trNum:%d, vers:%d; "
3314
        "node, rNum:%d, pNum:%d, trNum:%d, "
3315
        "buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
3316
        "), "
3317
        "cond:(next i:%" PRId64 ", t:%" PRId64 " ==%s)",
3318
        ths->vgId, str, pEntry->index - 1, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum, ths->peersNum,
3319
        ths->totalReplicaNum, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex, ths->pLogBuf->matchIndex,
3320
        ths->pLogBuf->endIndex, pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType));
3321
  } else {
3322
    sInfo(
×
3323
        "vgId:%d, change config from %s. "
3324
        "this, i:%" PRId64 ", t:%" PRId64
3325
        ", trNum:%d, vers:%d; "
3326
        "node, rNum:%d, pNum:%d, trNum:%d, "
3327
        "buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
3328
        "), "
3329
        "cond:(pre i:%" PRId64 "==ci:%" PRId64 ", bci:%" PRId64 ")",
3330
        ths->vgId, str, pEntry->index, pEntry->term, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum,
3331
        ths->peersNum, ths->totalReplicaNum, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
3332
        ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex, pEntry->index - 1, ths->commitIndex,
3333
        ths->pLogBuf->commitIndex);
3334
  }
3335

3336
  syncNodeLogConfigInfo(ths, &cfg, "before config change");
×
3337

3338
  int32_t oldTotalReplicaNum = ths->totalReplicaNum;
×
3339

3340
  if (cfg.totalReplicaNum == 1 || cfg.totalReplicaNum == 2) {  // remove replica
×
3341

3342
    bool incfg = false;
×
3343
    for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
×
3344
      if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
×
3345
          ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
×
3346
        incfg = true;
×
3347
        break;
×
3348
      }
3349
    }
3350

3351
    if (incfg) {  // remove other
×
3352
      syncNodeResetPeerAndCfg(ths);
×
3353

3354
      // no need to change myNodeInfo
3355

3356
      if ((code = syncNodeRebuildPeerAndCfg(ths, &cfg)) != 0) {
×
3357
        TAOS_RETURN(code);
×
3358
      };
3359

3360
      if ((code = syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum)) != 0) {
×
3361
        TAOS_RETURN(code);
×
3362
      };
3363
    } else {  // remove myself
3364
      // no need to do anything actually, to change the following to reduce distruptive server chance
3365

3366
      syncNodeResetPeerAndCfg(ths);
×
3367

3368
      // change myNodeInfo
3369
      ths->myNodeInfo.nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3370

3371
      // change peer and cfg
3372
      ths->peersNum = 0;
×
3373
      memcpy(&ths->raftCfg.cfg.nodeInfo[0], &ths->myNodeInfo, sizeof(SNodeInfo));
×
3374
      ths->raftCfg.cfg.replicaNum = 0;
×
3375
      ths->raftCfg.cfg.totalReplicaNum = 1;
×
3376

3377
      // change other
3378
      if ((code = syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum)) != 0) {
×
3379
        TAOS_RETURN(code);
×
3380
      }
3381

3382
      // change state
3383
      ths->state = TAOS_SYNC_STATE_LEARNER;
×
3384
    }
3385

3386
    ths->restoreFinish = false;
×
3387
  } else {                            // add replica, or change replica type
3388
    if (ths->totalReplicaNum == 3) {  // change replica type
×
3389
      sInfo("vgId:%d, begin change replica type", ths->vgId);
×
3390

3391
      // change myNodeInfo
3392
      for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
×
3393
        if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
×
3394
            ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
×
3395
          if (cfg.nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3396
            ths->myNodeInfo.nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3397
          }
3398
        }
3399
      }
3400

3401
      // change peer and cfg
3402
      syncNodeChangePeerAndCfgToVoter(ths, &cfg);
×
3403

3404
      // change other
3405
      syncNodeChangeToVoter(ths);
×
3406

3407
      // change state
3408
      if (ths->state == TAOS_SYNC_STATE_LEARNER) {
×
3409
        if (ths->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3410
          ths->state = TAOS_SYNC_STATE_FOLLOWER;
×
3411
        }
3412
      }
3413

3414
      ths->restoreFinish = false;
×
3415
    } else {  // add replica
3416
      sInfo("vgId:%d, begin add replica", ths->vgId);
×
3417

3418
      // no need to change myNodeInfo
3419

3420
      // change peer and cfg
3421
      if ((code = syncNodeRebuildPeerAndCfg(ths, &cfg)) != 0) {
×
3422
        TAOS_RETURN(code);
×
3423
      };
3424

3425
      // change other
3426
      if ((code = syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum)) != 0) {
×
3427
        TAOS_RETURN(code);
×
3428
      };
3429

3430
      // no need to change state
3431

3432
      if (ths->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_LEARNER) {
×
3433
        ths->restoreFinish = false;
×
3434
      }
3435
    }
3436
  }
3437

3438
  ths->quorum = syncUtilQuorum(ths->replicaNum);
×
3439

3440
  ths->raftCfg.lastConfigIndex = pEntry->index;
×
3441
  ths->raftCfg.cfg.lastIndex = pEntry->index;
×
3442
  ths->raftCfg.cfg.changeVersion = cfg.changeVersion;
×
3443

3444
  syncNodeLogConfigInfo(ths, &cfg, "after config change");
×
3445

3446
  if ((code = syncWriteCfgFile(ths)) != 0) {
×
3447
    sError("vgId:%d, failed to create sync cfg file", ths->vgId);
×
3448
    TAOS_RETURN(code);
×
3449
  };
3450

3451
  TAOS_RETURN(code);
×
3452
}
3453

3454
int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry, SRpcMsg* pMsg) {
10,627,311✔
3455
  int32_t code = -1;
10,627,311✔
3456
  if (pEntry->dataLen < sizeof(SMsgHead)) {
10,627,311!
3457
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
3458
    sError("vgId:%d, msg:%p, cannot append an invalid client request with no msg head, type:%s dataLen:%d", ths->vgId,
×
3459
           pMsg, TMSG_INFO(pEntry->originalRpcType), pEntry->dataLen);
3460
    syncEntryDestroy(pEntry);
×
3461
    pEntry = NULL;
×
3462
    goto _out;
×
3463
  }
3464

3465
  // append to log buffer
3466
  if ((code = syncLogBufferAppend(ths->pLogBuf, ths, pEntry)) < 0) {
10,627,311✔
3467
    sError("vgId:%d, index:%" PRId64 ", failed to enqueue sync log buffer", ths->vgId, pEntry->index);
190!
3468
    int32_t ret = 0;
190✔
3469
    if ((ret = syncFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, terrno, false)) != 0) {
190!
3470
      sError("vgId:%d, index:%" PRId64 ", failed to execute fsm since %s", ths->vgId, pEntry->index, tstrerror(ret));
×
3471
    }
3472
    syncEntryDestroy(pEntry);
×
3473
    pEntry = NULL;
×
3474
    goto _out;
×
3475
  }
3476

3477
  code = 0;
10,627,284✔
3478
_out:;
10,627,284✔
3479
  // proceed match index, with replicating on needed
3480
  SyncIndex matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, NULL, "Append", pMsg);
10,627,284✔
3481
  const STraceId* trace = pEntry ? &pEntry->originRpcTraceId : NULL;
10,627,106!
3482

3483
  if (pEntry != NULL) {
10,627,106!
3484
    sGDebug(trace,
10,627,324!
3485
            "vgId:%d, index:%" PRId64 ", raft entry appended, msg:%p term:%" PRId64 " buf:[%" PRId64 " %" PRId64
3486
            " %" PRId64 ", %" PRId64 ")",
3487
            ths->vgId, pEntry->index, pMsg, pEntry->term, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
3488
            ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex);
3489
  }
3490

3491
  if (code == 0 && ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
10,627,110!
3492
    int64_t index = syncNodeUpdateAssignedCommitIndex(ths, matchIndex);
×
3493
    sGTrace(trace, "vgId:%d, index:%" PRId64 ", update assigned commit, msg:%p", ths->vgId, index, pMsg);
×
3494

3495
    if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
×
3496
        syncLogBufferCommit(ths->pLogBuf, ths, ths->assignedCommitIndex, trace, "append-entry") < 0) {
×
3497
      sGError(trace, "vgId:%d, index:%" PRId64 ", failed to commit, msg:%p commit index:%" PRId64, ths->vgId, index,
×
3498
              pMsg, ths->commitIndex);
3499
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
3500
    }
3501
  }
3502

3503
  // multi replica
3504
  if (ths->replicaNum > 1) {
10,627,052✔
3505
    TAOS_RETURN(code);
144,396✔
3506
  }
3507

3508
  // single replica
3509
  SyncIndex returnIndex = syncNodeUpdateCommitIndex(ths, matchIndex);
10,482,656✔
3510
  sGTrace(trace, "vgId:%d, index:%" PRId64 ", raft entry update commit, msg:%p return index:%" PRId64, ths->vgId,
10,482,841!
3511
          matchIndex, pMsg, returnIndex);
3512

3513
  if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
20,965,786!
3514
      (code = syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex, trace, "append-entry")) < 0) {
10,482,656✔
3515
    sGError(trace,
×
3516
            "vgId:%d, index:%" PRId64 ", failed to commit, msg:%p commit index:%" PRId64 " return index:%" PRId64,
3517
            ths->vgId, matchIndex, pMsg, ths->commitIndex, returnIndex);
3518
  }
3519

3520
  TAOS_RETURN(code);
10,483,130✔
3521
}
3522

3523
bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode) {
10,615,052✔
3524
  if (pSyncNode->totalReplicaNum == 1) {
10,615,052✔
3525
    return false;
10,408,609✔
3526
  }
3527

3528
  int32_t toCount = 0;
206,443✔
3529
  int64_t tsNow = taosGetTimestampMs();
206,572✔
3530
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
556,677✔
3531
    if (pSyncNode->peersNodeInfo[i].nodeRole == TAOS_SYNC_ROLE_LEARNER) {
350,106✔
3532
      continue;
62,254✔
3533
    }
3534
    int64_t recvTime = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
287,852✔
3535
    if (recvTime == 0 || recvTime == -1) {
287,851!
3536
      continue;
×
3537
    }
3538

3539
    if (tsNow - recvTime > tsHeartbeatTimeout) {
287,851✔
3540
      toCount++;
11,880✔
3541
    }
3542
  }
3543

3544
  bool b = (toCount >= pSyncNode->quorum ? true : false);
206,571✔
3545

3546
  return b;
206,571✔
3547
}
3548

3549
bool syncNodeSnapshotSending(SSyncNode* pSyncNode) {
×
3550
  if (pSyncNode == NULL) return false;
×
3551
  bool b = false;
×
3552
  for (int32_t i = 0; i < pSyncNode->totalReplicaNum; ++i) {
×
3553
    if (pSyncNode->senders[i] != NULL && pSyncNode->senders[i]->start) {
×
3554
      b = true;
×
3555
      break;
×
3556
    }
3557
  }
3558
  return b;
×
3559
}
3560

3561
bool syncNodeSnapshotRecving(SSyncNode* pSyncNode) {
×
3562
  if (pSyncNode == NULL) return false;
×
3563
  if (pSyncNode->pNewNodeReceiver == NULL) return false;
×
3564
  if (pSyncNode->pNewNodeReceiver->start) return true;
×
3565
  return false;
×
3566
}
3567

3568
static int32_t syncNodeAppendNoop(SSyncNode* ths) {
13,542✔
3569
  int32_t   code = 0;
13,542✔
3570
  SyncIndex index = syncLogBufferGetEndIndex(ths->pLogBuf);
13,542✔
3571
  SyncTerm  term = raftStoreGetTerm(ths);
13,542✔
3572

3573
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
13,542✔
3574
  if (pEntry == NULL) {
13,542!
3575
    code = TSDB_CODE_OUT_OF_MEMORY;
×
3576
    TAOS_RETURN(code);
×
3577
  }
3578

3579
  code = syncNodeAppend(ths, pEntry, NULL);
13,542✔
3580
  TAOS_RETURN(code);
13,542✔
3581
}
3582

3583
#ifdef BUILD_NO_CALL
3584
static int32_t syncNodeAppendNoopOld(SSyncNode* ths) {
3585
  int32_t ret = 0;
3586

3587
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
3588
  SyncTerm        term = raftStoreGetTerm(ths);
3589
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
3590
  if (pEntry == NULL) return -1;
3591

3592
  LRUHandle* h = NULL;
3593

3594
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
3595
    int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry, false);
3596
    if (code != 0) {
3597
      sError("append noop error");
3598
      return -1;
3599
    }
3600

3601
    syncCacheEntry(ths->pLogStore, pEntry, &h);
3602
  }
3603

3604
  if (h) {
3605
    taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
3606
  } else {
3607
    syncEntryDestroy(pEntry);
3608
  }
3609

3610
  return ret;
3611
}
3612
#endif
3613

3614
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
40,516✔
3615
  SyncHeartbeat* pMsg = pRpcMsg->pCont;
40,516✔
3616
  bool           resetElect = false;
40,516✔
3617

3618
  int64_t tsMs = taosGetTimestampMs();
40,516✔
3619

3620
  int64_t lastRecvTime = syncIndexMgrGetRecvTime(ths->pNextIndex, &(pMsg->srcId));
40,516✔
3621
  syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs);
40,516✔
3622

3623
  int64_t netElapsed = tsMs - pMsg->timeStamp;
40,516✔
3624
  int64_t timeDiff = tsMs - lastRecvTime;
40,516✔
3625
  syncLogRecvHeartbeat(ths, pMsg, netElapsed, &pRpcMsg->info.traceId, timeDiff);
40,516✔
3626

3627
  if (!syncNodeInRaftGroup(ths, &pMsg->srcId)) {
40,516✔
3628
    sWarn(
7!
3629
        "vgId:%d, drop heartbeat msg from dnode:%d, because it come from another cluster:%d, differ from current "
3630
        "cluster:%d",
3631
        ths->vgId, DID(&(pMsg->srcId)), CID(&(pMsg->srcId)), CID(&(ths->myRaftId)));
3632
    return 0;
7✔
3633
  }
3634

3635
  SyncTerm currentTerm = raftStoreGetTerm(ths);
40,509✔
3636

3637
  if (pMsg->term > currentTerm && ths->state == TAOS_SYNC_STATE_LEARNER) {
40,509✔
3638
    raftStoreSetTerm(ths, pMsg->term);
260✔
3639
    currentTerm = pMsg->term;
260✔
3640
  }
3641

3642
  int64_t tsMs2 = taosGetTimestampMs();
40,509✔
3643

3644
  int64_t processTime = tsMs2 - tsMs;
40,509✔
3645
  if (processTime > SYNC_HEARTBEAT_SLOW_MS) {
40,509!
3646
    sGError(&pRpcMsg->info.traceId,
×
3647
            "vgId:%d, process sync-heartbeat msg from dnode:%d, commit-index:%" PRId64 ", cluster:%d msgTerm:%" PRId64
3648
            " currentTerm:%" PRId64 ", processTime:%" PRId64,
3649
            ths->vgId, DID(&(pMsg->srcId)), pMsg->commitIndex, CID(&(pMsg->srcId)), pMsg->term, currentTerm,
3650
            processTime);
3651
  } else {
3652
    sGDebug(&pRpcMsg->info.traceId,
40,509!
3653
            "vgId:%d, process sync-heartbeat msg from dnode:%d, commit-index:%" PRId64 ", cluster:%d msgTerm:%" PRId64
3654
            " currentTerm:%" PRId64 ", processTime:%" PRId64,
3655
            ths->vgId, DID(&(pMsg->srcId)), pMsg->commitIndex, CID(&(pMsg->srcId)), pMsg->term, currentTerm,
3656
            processTime);
3657
  }
3658

3659
  if (pMsg->term == currentTerm &&
40,509✔
3660
      (ths->state != TAOS_SYNC_STATE_LEADER && ths->state != TAOS_SYNC_STATE_ASSIGNED_LEADER)) {
40,246!
3661
    resetElect = true;
40,246✔
3662

3663
    ths->minMatchIndex = pMsg->minMatchIndex;
40,246✔
3664

3665
    if (ths->state == TAOS_SYNC_STATE_FOLLOWER || ths->state == TAOS_SYNC_STATE_LEARNER) {
40,246✔
3666
      SRpcMsg rpcMsgLocalCmd = {0};
40,240✔
3667
      TAOS_CHECK_RETURN(syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId));
40,240!
3668
      rpcMsgLocalCmd.info.traceId = pRpcMsg->info.traceId;
40,240✔
3669

3670
      SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
40,240✔
3671
      pSyncMsg->cmd =
40,240✔
3672
          (ths->state == TAOS_SYNC_STATE_LEARNER) ? SYNC_LOCAL_CMD_LEARNER_CMT : SYNC_LOCAL_CMD_FOLLOWER_CMT;
40,240✔
3673
      pSyncMsg->commitIndex = pMsg->commitIndex;
40,240✔
3674
      pSyncMsg->currentTerm = pMsg->term;
40,240✔
3675

3676
      if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
40,240!
3677
        int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
40,240✔
3678
        if (code != 0) {
40,240!
3679
          sError("vgId:%d, failed to enqueue commit msg from heartbeat since %s, code:%d", ths->vgId, terrstr(), code);
×
3680
          rpcFreeCont(rpcMsgLocalCmd.pCont);
×
3681
        } else {
3682
          sGTrace(&pRpcMsg->info.traceId,
40,240!
3683
                  "vgId:%d, enqueue commit msg from heartbeat, commit-index:%" PRId64 ", term:%" PRId64, ths->vgId,
3684
                  pMsg->commitIndex, pMsg->term);
3685
        }
3686
      }
3687
    }
3688
  }
3689

3690
  if (pMsg->term >= currentTerm &&
40,509!
3691
      (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER)) {
40,509!
3692
    SRpcMsg rpcMsgLocalCmd = {0};
×
3693
    TAOS_CHECK_RETURN(syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId));
×
3694
    rpcMsgLocalCmd.info.traceId = pRpcMsg->info.traceId;
×
3695

3696
    SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
×
3697
    pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN;
×
3698
    pSyncMsg->currentTerm = pMsg->term;
×
3699
    pSyncMsg->commitIndex = pMsg->commitIndex;
×
3700

3701
    if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
×
3702
      int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
×
3703
      if (code != 0) {
×
3704
        sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code);
×
3705
        rpcFreeCont(rpcMsgLocalCmd.pCont);
×
3706
      } else {
3707
        sTrace("vgId:%d, sync enqueue step-down msg, new-term:%" PRId64, ths->vgId, pMsg->term);
×
3708
      }
3709
    }
3710
  }
3711

3712
  SRpcMsg rpcMsg = {0};
40,509✔
3713
  TAOS_CHECK_RETURN(syncBuildHeartbeatReply(&rpcMsg, ths->vgId));
40,509!
3714
  SyncHeartbeatReply* pMsgReply = rpcMsg.pCont;
40,509✔
3715
  pMsgReply->destId = pMsg->srcId;
40,509✔
3716
  pMsgReply->srcId = ths->myRaftId;
40,509✔
3717
  pMsgReply->term = currentTerm;
40,509✔
3718
  pMsgReply->privateTerm = 8864;  // magic number
40,509✔
3719
  pMsgReply->startTime = ths->startTime;
40,509✔
3720
  pMsgReply->timeStamp = tsMs;
40,509✔
3721
  rpcMsg.info.traceId = pRpcMsg->info.traceId;
40,509✔
3722

3723
  // reply
3724
  int64_t tsMs3 = taosGetTimestampMs();
40,509✔
3725

3726
  int64_t processTime2 = tsMs3 - tsMs2;
40,509✔
3727
  TRACE_SET_MSGID(&(rpcMsg.info.traceId), tGenIdPI64());
40,509✔
3728
  if (processTime2 > SYNC_HEARTBEAT_SLOW_MS) {
40,509!
3729
    sGError(&rpcMsg.info.traceId,
×
3730
            "vgId:%d, send sync-heartbeat-reply to dnode:%d term:%" PRId64 " timestamp:%" PRId64
3731
            ", processTime:%" PRId64,
3732
            ths->vgId, DID(&(pMsgReply->destId)), pMsgReply->term, pMsgReply->timeStamp, processTime2);
3733
  } else {
3734
    sGDebug(&rpcMsg.info.traceId,
40,509!
3735
            "vgId:%d, send sync-heartbeat-reply to dnode:%d term:%" PRId64 " timestamp:%" PRId64
3736
            ", processTime:%" PRId64,
3737
            ths->vgId, DID(&(pMsgReply->destId)), pMsgReply->term, pMsgReply->timeStamp, processTime2);
3738
  }
3739

3740
  TAOS_CHECK_RETURN(syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg));
40,509!
3741

3742
  if (resetElect) syncNodeResetElectTimer(ths);
40,509✔
3743
  return 0;
40,509✔
3744
}
3745

3746
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
39,586✔
3747
  int32_t code = 0;
39,586✔
3748

3749
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
39,586✔
3750
  SSyncLogReplMgr*    pMgr = syncNodeGetLogReplMgr(ths, &pMsg->srcId);
39,586✔
3751
  if (pMgr == NULL) {
39,586!
3752
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
3753
    if (terrno != 0) code = terrno;
×
3754
    sError("vgId:%d, failed to get log repl mgr for the peer at addr 0x016%" PRIx64, ths->vgId, pMsg->srcId.addr);
×
3755
    TAOS_RETURN(code);
×
3756
  }
3757

3758
  int64_t tsMs = taosGetTimestampMs();
39,586✔
3759
  int64_t lastRecvTime = syncIndexMgrGetRecvTime(ths->pMatchIndex, &pMsg->srcId);
39,586✔
3760
  syncLogRecvHeartbeatReply(ths, pMsg, tsMs - pMsg->timeStamp, &pRpcMsg->info.traceId, tsMs - lastRecvTime);
39,586✔
3761

3762
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);
39,586✔
3763

3764
  return syncLogReplProcessHeartbeatReply(pMgr, ths, pMsg);
39,586✔
3765
}
3766

3767
#ifdef BUILD_NO_CALL
3768
int32_t syncNodeOnHeartbeatReplyOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
3769
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
3770

3771
  int64_t tsMs = taosGetTimestampMs();
3772
  int64_t timeDiff = tsMs - pMsg->timeStamp;
3773
  syncLogRecvHeartbeatReply(ths, pMsg, timeDiff, &pRpcMsg->info.traceId, timeDiff);
3774

3775
  // update last reply time, make decision whether the other node is alive or not
3776
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);
3777
  return 0;
3778
}
3779
#endif
3780

3781
int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
40,240✔
3782
  SyncLocalCmd* pMsg = pRpcMsg->pCont;
40,240✔
3783
  syncLogRecvLocalCmd(ths, pMsg, &pRpcMsg->info.traceId);
40,240✔
3784

3785
  if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
40,240!
3786
    SRaftId id = EMPTY_RAFT_ID;
×
3787
    syncNodeStepDown(ths, pMsg->currentTerm, id);
×
3788

3789
  } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT || pMsg->cmd == SYNC_LOCAL_CMD_LEARNER_CMT) {
80,480!
3790
    if (syncLogBufferIsEmpty(ths->pLogBuf)) {
40,240!
3791
      sError("vgId:%d, sync log buffer is empty", ths->vgId);
×
3792
      return 0;
×
3793
    }
3794
    SyncTerm matchTerm = syncLogBufferGetLastMatchTerm(ths->pLogBuf);
40,240✔
3795
    if (matchTerm < 0) {
40,240!
3796
      return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3797
    }
3798
    if (pMsg->currentTerm == matchTerm) {
40,240✔
3799
      SyncIndex returnIndex = syncNodeUpdateCommitIndex(ths, pMsg->commitIndex);
37,019✔
3800
      sTrace("vgId:%d, raft entry update commit, return index:%" PRId64, ths->vgId, returnIndex);
37,019!
3801
    }
3802
    if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
80,480!
3803
        syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex, &pRpcMsg->info.traceId, "heartbeat") < 0) {
40,240✔
3804
      sError("vgId:%d, failed to commit raft log since %s. commit index:%" PRId64, ths->vgId, terrstr(),
×
3805
             ths->commitIndex);
3806
    }
3807
  } else {
3808
    sError("error local cmd");
×
3809
  }
3810

3811
  return 0;
40,240✔
3812
}
3813

3814
// TLA+ Spec
3815
// ClientRequest(i, v) ==
3816
//     /\ state[i] = Leader
3817
//     /\ LET entry == [term  |-> currentTerm[i],
3818
//                      value |-> v]
3819
//            newLog == Append(log[i], entry)
3820
//        IN  log' = [log EXCEPT ![i] = newLog]
3821
//     /\ UNCHANGED <<messages, serverVars, candidateVars,
3822
//                    leaderVars, commitIndex>>
3823
//
3824

3825
int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex) {
10,613,735✔
3826
  sGDebug(&pMsg->info.traceId, "vgId:%d, msg:%p, process client request", ths->vgId, pMsg);
10,613,735!
3827
  int32_t code = 0;
10,613,739✔
3828

3829
  SyncIndex       index = syncLogBufferGetEndIndex(ths->pLogBuf);
10,613,739✔
3830
  SyncTerm        term = raftStoreGetTerm(ths);
10,613,851✔
3831
  SSyncRaftEntry* pEntry = NULL;
10,613,864✔
3832
  if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
10,613,864✔
3833
    pEntry = syncEntryBuildFromClientRequest(pMsg->pCont, term, index, &pMsg->info.traceId);
249,231✔
3834
  } else {
3835
    pEntry = syncEntryBuildFromRpcMsg(pMsg, term, index);
10,364,633✔
3836
  }
3837

3838
  if (pEntry == NULL) {
10,613,880!
3839
    sGError(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to process client request since %s", ths->vgId, pMsg, terrstr());
×
3840
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3841
  }
3842

3843
  // 1->2, config change is add in write thread, and will continue in sync thread
3844
  // need save message for it
3845
  if (pMsg->msgType == TDMT_SYNC_CONFIG_CHANGE) {
10,613,880!
3846
    SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
×
3847
    uint64_t  seqNum = syncRespMgrAdd(ths->pSyncRespMgr, &stub);
×
3848
    pEntry->seqNum = seqNum;
×
3849
  }
3850

3851
  if (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
10,613,880!
3852
    if (pRetIndex) {
10,613,880✔
3853
      (*pRetIndex) = index;
10,364,594✔
3854
    }
3855

3856
    if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) {
10,613,880!
3857
      int32_t code = syncNodeCheckChangeConfig(ths, pEntry);
×
3858
      if (code < 0) {
×
3859
        sGError(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to check change config since %s", ths->vgId, pMsg, terrstr());
×
3860
        syncEntryDestroy(pEntry);
×
3861
        pEntry = NULL;
×
3862
        TAOS_RETURN(code);
×
3863
      }
3864

3865
      if (code > 0) {
×
3866
        SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
×
3867
        int32_t num = syncRespMgrGetAndDel(ths->pSyncRespMgr, pEntry->seqNum, &rsp.info);
×
3868
        sGDebug(&pMsg->info.traceId, "vgId:%d, msg:%p, get response stub for config change, seqNum:%" PRIu64 " num:%d",
×
3869
                ths->vgId, pMsg, pEntry->seqNum, num);
3870
        if (rsp.info.handle != NULL) {
×
3871
          tmsgSendRsp(&rsp);
×
3872
        }
3873
        syncEntryDestroy(pEntry);
×
3874
        pEntry = NULL;
×
3875
        TAOS_RETURN(code);
×
3876
      }
3877
    }
3878

3879
    code = syncNodeAppend(ths, pEntry, pMsg);
10,613,880✔
3880
    return code;
10,613,463✔
3881
  } else {
3882
    syncEntryDestroy(pEntry);
×
3883
    pEntry = NULL;
×
3884
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3885
  }
3886
}
3887

3888
const char* syncStr(ESyncState state) {
2,165,604✔
3889
  switch (state) {
2,165,604!
3890
    case TAOS_SYNC_STATE_FOLLOWER:
474,768✔
3891
      return "follower";
474,768✔
3892
    case TAOS_SYNC_STATE_CANDIDATE:
5,444✔
3893
      return "candidate";
5,444✔
3894
    case TAOS_SYNC_STATE_LEADER:
1,661,640✔
3895
      return "leader";
1,661,640✔
3896
    case TAOS_SYNC_STATE_ERROR:
×
3897
      return "error";
×
3898
    case TAOS_SYNC_STATE_OFFLINE:
4,542✔
3899
      return "offline";
4,542✔
3900
    case TAOS_SYNC_STATE_LEARNER:
19,324✔
3901
      return "learner";
19,324✔
3902
    case TAOS_SYNC_STATE_ASSIGNED_LEADER:
×
3903
      return "assigned leader";
×
3904
    default:
×
3905
      return "unknown";
×
3906
  }
3907
}
3908

3909
int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) {
1,953✔
3910
  for (int32_t i = 0; i < pNewCfg->totalReplicaNum; ++i) {
2,205!
3911
    SRaftId raftId = {
2,205✔
3912
        .addr = SYNC_ADDR(&pNewCfg->nodeInfo[i]),
2,205✔
3913
        .vgId = ths->vgId,
2,205✔
3914
    };
3915

3916
    if (syncUtilSameId(&(ths->myRaftId), &raftId)) {
2,205✔
3917
      pNewCfg->myIndex = i;
1,953✔
3918
      return 0;
1,953✔
3919
    }
3920
  }
3921

3922
  return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3923
}
3924

3925
bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) {
10,615,010✔
3926
  return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1);
10,615,010!
3927
}
3928

3929
bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId) {
2,337,178✔
3930
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
3,857,883✔
3931
    if (syncUtilSameId(&((ths->replicasId)[i]), pRaftId)) {
3,857,876✔
3932
      return true;
2,337,166✔
3933
    }
3934
  }
3935
  return false;
7✔
3936
}
3937

3938
SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId) {
42,318✔
3939
  SSyncSnapshotSender* pSender = NULL;
42,318✔
3940
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
157,022✔
3941
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
114,667✔
3942
      pSender = (ths->senders)[i];
42,360✔
3943
    }
3944
  }
3945
  return pSender;
42,355✔
3946
}
3947

3948
SSyncTimer* syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId) {
27,729✔
3949
  SSyncTimer* pTimer = NULL;
27,729✔
3950
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
116,199✔
3951
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
88,467✔
3952
      pTimer = &((ths->peerHeartbeatTimerArr)[i]);
27,731✔
3953
    }
3954
  }
3955
  return pTimer;
27,732✔
3956
}
3957

3958
SPeerState* syncNodeGetPeerState(SSyncNode* ths, const SRaftId* pDestId) {
132,994✔
3959
  SPeerState* pState = NULL;
132,994✔
3960
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
421,091✔
3961
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
288,095✔
3962
      pState = &((ths->peerStates)[i]);
132,993✔
3963
    }
3964
  }
3965
  return pState;
132,996✔
3966
}
3967

3968
#ifdef BUILD_NO_CALL
3969
bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const SyncAppendEntries* pMsg) {
3970
  SPeerState* pState = syncNodeGetPeerState(ths, pDestId);
3971
  if (pState == NULL) {
3972
    sError("vgId:%d, replica maybe dropped", ths->vgId);
3973
    return false;
3974
  }
3975

3976
  SyncIndex sendIndex = pMsg->prevLogIndex + 1;
3977
  int64_t   tsNow = taosGetTimestampMs();
3978

3979
  if (pState->lastSendIndex == sendIndex && tsNow - pState->lastSendTime < SYNC_APPEND_ENTRIES_TIMEOUT_MS) {
3980
    return false;
3981
  }
3982

3983
  return true;
3984
}
3985

3986
bool syncNodeCanChange(SSyncNode* pSyncNode) {
3987
  if (pSyncNode->changing) {
3988
    sError("sync cannot change");
3989
    return false;
3990
  }
3991

3992
  if ((pSyncNode->commitIndex >= SYNC_INDEX_BEGIN)) {
3993
    SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
3994
    if (pSyncNode->commitIndex != lastIndex) {
3995
      sError("sync cannot change2");
3996
      return false;
3997
    }
3998
  }
3999

4000
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
4001
    SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->peersId)[i]);
4002
    if (pSender != NULL && pSender->start) {
4003
      sError("sync cannot change3");
4004
      return false;
4005
    }
4006
  }
4007

4008
  return true;
4009
}
4010
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc