• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3625

26 Feb 2025 10:19AM UTC coverage: 63.633% (+0.1%) from 63.485%
#3625

push

travis-ci

web-flow
Merge pull request #29914 from taosdata/feat/TS-5613-3.0

feat:[TS-5613]support bool in cast

148738 of 299799 branches covered (49.61%)

Branch coverage included in aggregate %.

233124 of 300297 relevant lines covered (77.63%)

17654074.26 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

51.69
/source/libs/sync/src/syncMain.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "sync.h"
18
#include "syncAppendEntries.h"
19
#include "syncAppendEntriesReply.h"
20
#include "syncCommit.h"
21
#include "syncElection.h"
22
#include "syncEnv.h"
23
#include "syncIndexMgr.h"
24
#include "syncInt.h"
25
#include "syncMessage.h"
26
#include "syncPipeline.h"
27
#include "syncRaftCfg.h"
28
#include "syncRaftLog.h"
29
#include "syncRaftStore.h"
30
#include "syncReplication.h"
31
#include "syncRequestVote.h"
32
#include "syncRequestVoteReply.h"
33
#include "syncRespMgr.h"
34
#include "syncSnapshot.h"
35
#include "syncTimeout.h"
36
#include "syncUtil.h"
37
#include "syncVoteMgr.h"
38
#include "tglobal.h"
39
#include "tmisce.h"
40
#include "tref.h"
41

42
static void    syncNodeEqPingTimer(void* param, void* tmrId);
43
static void    syncNodeEqElectTimer(void* param, void* tmrId);
44
static void    syncNodeEqHeartbeatTimer(void* param, void* tmrId);
45
static int32_t syncNodeAppendNoop(SSyncNode* ths);
46
static void    syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId);
47
static bool    syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg);
48
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId);
49
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
50
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
51
static int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg);
52
static bool    syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config);
53
static int32_t syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex);
54
static bool    syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg);
55

56
static bool    syncNodeCanChange(SSyncNode* pSyncNode);
57
static int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode);
58
static int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader);
59
static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry);
60

61
static ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
62

63
int64_t syncOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
13,373✔
64
  sInfo("vgId:%d, start to open sync", pSyncInfo->vgId);
13,373✔
65
  SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo, vnodeVersion);
13,373✔
66
  if (pSyncNode == NULL) {
13,373!
67
    sError("vgId:%d, failed to open sync node", pSyncInfo->vgId);
×
68
    return -1;
×
69
  }
70

71
  pSyncNode->rid = syncNodeAdd(pSyncNode);
13,373✔
72
  if (pSyncNode->rid < 0) {
13,373!
73
    syncNodeClose(pSyncNode);
×
74
    return -1;
×
75
  }
76

77
  pSyncNode->pingBaseLine = pSyncInfo->pingMs;
13,373✔
78
  pSyncNode->pingTimerMS = pSyncInfo->pingMs;
13,373✔
79
  pSyncNode->electBaseLine = pSyncInfo->electMs;
13,373✔
80
  pSyncNode->hbBaseLine = pSyncInfo->heartbeatMs;
13,373✔
81
  pSyncNode->heartbeatTimerMS = pSyncInfo->heartbeatMs;
13,373✔
82
  pSyncNode->msgcb = pSyncInfo->msgcb;
13,373✔
83
  sInfo("vgId:%d, sync opened", pSyncInfo->vgId);
13,373✔
84
  return pSyncNode->rid;
13,373✔
85
}
86

87
int32_t syncStart(int64_t rid) {
13,372✔
88
  int32_t    code = 0;
13,372✔
89
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
13,372✔
90
  if (pSyncNode == NULL) {
13,372!
91
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
92
    if (terrno != 0) code = terrno;
×
93
    sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid);
×
94
    TAOS_RETURN(code);
×
95
  }
96
  sInfo("vgId:%d, begin to start sync", pSyncNode->vgId);
13,372✔
97

98
  if ((code = syncNodeRestore(pSyncNode)) < 0) {
13,372!
99
    sError("vgId:%d, failed to restore sync log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
100
    goto _err;
×
101
  }
102

103
  if ((code = syncNodeStart(pSyncNode)) < 0) {
13,372!
104
    sError("vgId:%d, failed to start sync node since %s", pSyncNode->vgId, tstrerror(code));
×
105
    goto _err;
×
106
  }
107

108
  syncNodeRelease(pSyncNode);
13,372✔
109

110
  sInfo("vgId:%d, sync started", pSyncNode->vgId);
13,372✔
111

112
  TAOS_RETURN(code);
13,372✔
113

114
_err:
×
115
  syncNodeRelease(pSyncNode);
×
116
  TAOS_RETURN(code);
×
117
}
118

119
int32_t syncNodeGetConfig(int64_t rid, SSyncCfg* cfg) {
27,836✔
120
  int32_t    code = 0;
27,836✔
121
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
27,836✔
122

123
  if (pSyncNode == NULL) {
27,837!
124
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
125
    if (terrno != 0) code = terrno;
×
126
    sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid);
×
127
    TAOS_RETURN(code);
×
128
  }
129

130
  *cfg = pSyncNode->raftCfg.cfg;
27,837✔
131

132
  syncNodeRelease(pSyncNode);
27,837✔
133

134
  return 0;
27,838✔
135
}
136

137
void syncStop(int64_t rid) {
13,343✔
138
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
13,343✔
139
  if (pSyncNode != NULL) {
13,343!
140
    pSyncNode->isStart = false;
13,343✔
141
    syncNodeRelease(pSyncNode);
13,343✔
142
    syncNodeRemove(rid);
13,343✔
143
  }
144
}
13,338✔
145

146
void syncPreStop(int64_t rid) {
13,363✔
147
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
13,363✔
148
  if (pSyncNode != NULL) {
13,363!
149
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
13,363!
150
      sInfo("vgId:%d, stop snapshot receiver", pSyncNode->vgId);
×
151
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
152
    }
153
    syncNodePreClose(pSyncNode);
13,363✔
154
    syncNodeRelease(pSyncNode);
13,363✔
155
  }
156
}
13,363✔
157

158
void syncPostStop(int64_t rid) {
11,603✔
159
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
11,603✔
160
  if (pSyncNode != NULL) {
11,602!
161
    syncNodePostClose(pSyncNode);
11,602✔
162
    syncNodeRelease(pSyncNode);
11,602✔
163
  }
164
}
11,602✔
165

166
static bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pCfg) {
1,654✔
167
  if (!syncNodeInConfig(pSyncNode, pCfg)) return false;
1,654!
168
  return abs(pCfg->replicaNum - pSyncNode->replicaNum) <= 1;
1,654!
169
}
170

171
int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) {
1,771✔
172
  int32_t    code = 0;
1,771✔
173
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
1,771✔
174
  if (pSyncNode == NULL) {
1,771!
175
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
176
    if (terrno != 0) code = terrno;
×
177
    TAOS_RETURN(code);
×
178
  }
179

180
  if (pSyncNode->raftCfg.lastConfigIndex >= pNewCfg->lastIndex) {
1,771✔
181
    syncNodeRelease(pSyncNode);
117✔
182
    sInfo("vgId:%d, no need Reconfig, current index:%" PRId64 ", new index:%" PRId64, pSyncNode->vgId,
117!
183
          pSyncNode->raftCfg.lastConfigIndex, pNewCfg->lastIndex);
184
    return 0;
117✔
185
  }
186

187
  if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) {
1,654!
188
    syncNodeRelease(pSyncNode);
×
189
    code = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
×
190
    sError("vgId:%d, failed to reconfig since invalid new config", pSyncNode->vgId);
×
191
    TAOS_RETURN(code);
×
192
  }
193

194
  TAOS_CHECK_RETURN(syncNodeUpdateNewConfigIndex(pSyncNode, pNewCfg));
1,654!
195

196
  if (syncNodeDoConfigChange(pSyncNode, pNewCfg, pNewCfg->lastIndex) != 0) {
1,654!
197
    code = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
×
198
    sError("vgId:%d, failed to reconfig since do change error", pSyncNode->vgId);
×
199
    TAOS_RETURN(code);
×
200
  }
201

202
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER || pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
1,654!
203
    // TODO check return value
204
    TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
1,457!
205

206
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
23,312✔
207
      TAOS_CHECK_RETURN(syncHbTimerInit(pSyncNode, &pSyncNode->peerHeartbeatTimerArr[i], pSyncNode->replicasId[i]));
21,855!
208
    }
209

210
    TAOS_CHECK_RETURN(syncNodeStartHeartbeatTimer(pSyncNode));
1,457!
211
    // syncNodeReplicate(pSyncNode);
212
  }
213

214
  syncNodeRelease(pSyncNode);
1,654✔
215
  TAOS_RETURN(code);
1,654✔
216
}
217

218
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
2,880,347✔
219
  int32_t code = -1;
2,880,347✔
220
  if (!syncIsInit()) {
2,880,347!
221
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
222
    if (terrno != 0) code = terrno;
×
223
    TAOS_RETURN(code);
×
224
  }
225

226
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
2,880,322✔
227
  if (pSyncNode == NULL) {
2,880,818✔
228
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
401✔
229
    if (terrno != 0) code = terrno;
401!
230
    TAOS_RETURN(code);
×
231
  }
232

233
  switch (pMsg->msgType) {
2,880,417!
234
    case TDMT_SYNC_HEARTBEAT:
36,488✔
235
      code = syncNodeOnHeartbeat(pSyncNode, pMsg);
36,488✔
236
      break;
36,488✔
237
    case TDMT_SYNC_HEARTBEAT_REPLY:
35,897✔
238
      code = syncNodeOnHeartbeatReply(pSyncNode, pMsg);
35,897✔
239
      break;
35,896✔
240
    case TDMT_SYNC_TIMEOUT:
200,700✔
241
      code = syncNodeOnTimeout(pSyncNode, pMsg);
200,700✔
242
      break;
200,753✔
243
    case TDMT_SYNC_TIMEOUT_ELECTION:
1,252✔
244
      code = syncNodeOnTimeout(pSyncNode, pMsg);
1,252✔
245
      break;
1,252✔
246
    case TDMT_SYNC_CLIENT_REQUEST:
217,265✔
247
      code = syncNodeOnClientRequest(pSyncNode, pMsg, NULL);
217,265✔
248
      break;
217,266✔
249
    case TDMT_SYNC_REQUEST_VOTE:
2,091✔
250
      code = syncNodeOnRequestVote(pSyncNode, pMsg);
2,091✔
251
      break;
2,091✔
252
    case TDMT_SYNC_REQUEST_VOTE_REPLY:
1,961✔
253
      code = syncNodeOnRequestVoteReply(pSyncNode, pMsg);
1,961✔
254
      break;
1,961✔
255
    case TDMT_SYNC_APPEND_ENTRIES:
1,167,074✔
256
      code = syncNodeOnAppendEntries(pSyncNode, pMsg);
1,167,074✔
257
      break;
1,167,077✔
258
    case TDMT_SYNC_APPEND_ENTRIES_REPLY:
1,167,036✔
259
      code = syncNodeOnAppendEntriesReply(pSyncNode, pMsg);
1,167,036✔
260
      break;
1,167,036✔
261
    case TDMT_SYNC_SNAPSHOT_SEND:
7,139✔
262
      code = syncNodeOnSnapshot(pSyncNode, pMsg);
7,139✔
263
      break;
7,139✔
264
    case TDMT_SYNC_SNAPSHOT_RSP:
7,203✔
265
      code = syncNodeOnSnapshotRsp(pSyncNode, pMsg);
7,203✔
266
      break;
7,203✔
267
    case TDMT_SYNC_LOCAL_CMD:
36,297✔
268
      code = syncNodeOnLocalCmd(pSyncNode, pMsg);
36,297✔
269
      break;
36,296✔
270
    case TDMT_SYNC_FORCE_FOLLOWER:
14✔
271
      code = syncForceBecomeFollower(pSyncNode, pMsg);
14✔
272
      break;
14✔
273
    case TDMT_SYNC_SET_ASSIGNED_LEADER:
×
274
      code = syncBecomeAssignedLeader(pSyncNode, pMsg);
×
275
      break;
×
276
    default:
×
277
      code = TSDB_CODE_MSG_NOT_PROCESSED;
×
278
  }
279

280
  syncNodeRelease(pSyncNode);
2,880,472✔
281
  if (code != 0) {
2,880,454✔
282
    sDebug("vgId:%d, failed to process sync msg:%p type:%s since %s", pSyncNode->vgId, pMsg, TMSG_INFO(pMsg->msgType),
21!
283
           tstrerror(code));
284
  }
285
  TAOS_RETURN(code);
2,880,454✔
286
}
287

288
int32_t syncLeaderTransfer(int64_t rid) {
13,363✔
289
  int32_t    code = 0;
13,363✔
290
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
13,363✔
291
  if (pSyncNode == NULL) {
13,363!
292
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
293
    if (terrno != 0) code = terrno;
×
294
    TAOS_RETURN(code);
×
295
  }
296

297
  int32_t ret = syncNodeLeaderTransfer(pSyncNode);
13,363✔
298
  syncNodeRelease(pSyncNode);
13,363✔
299
  return ret;
13,363✔
300
}
301

302
int32_t syncForceBecomeFollower(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
14✔
303
  SRaftId id = {0};
14✔
304
  syncNodeBecomeFollower(ths, id, "force election");
14✔
305

306
  SRpcMsg rsp = {
14✔
307
      .code = 0,
308
      .pCont = pRpcMsg->info.rsp,
14✔
309
      .contLen = pRpcMsg->info.rspLen,
14✔
310
      .info = pRpcMsg->info,
311
  };
312
  tmsgSendRsp(&rsp);
14✔
313

314
  return 0;
14✔
315
}
316

317
int32_t syncBecomeAssignedLeader(SSyncNode* ths, SRpcMsg* pRpcMsg) {
×
318
  int32_t code = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
×
319
  void*   pHead = NULL;
×
320
  int32_t contLen = 0;
×
321

322
  SVArbSetAssignedLeaderReq req = {0};
×
323
  if (tDeserializeSVArbSetAssignedLeaderReq((char*)pRpcMsg->pCont + sizeof(SMsgHead), pRpcMsg->contLen, &req) != 0) {
×
324
    sError("vgId:%d, failed to deserialize SVArbSetAssignedLeaderReq", ths->vgId);
×
325
    code = TSDB_CODE_INVALID_MSG;
×
326
    goto _OVER;
×
327
  }
328

329
  if (ths->arbTerm > req.arbTerm) {
×
330
    sInfo("vgId:%d, skip to set assigned leader, msg with lower term, local:%" PRId64 "msg:%" PRId64, ths->vgId,
×
331
          ths->arbTerm, req.arbTerm);
332
    goto _OVER;
×
333
  }
334

335
  ths->arbTerm = TMAX(req.arbTerm, ths->arbTerm);
×
336

337
  if (strncmp(req.memberToken, ths->arbToken, TSDB_ARB_TOKEN_SIZE) != 0) {
×
338
    sInfo("vgId:%d, skip to set assigned leader, token mismatch, local:%s, msg:%s", ths->vgId, ths->arbToken,
×
339
          req.memberToken);
340
    goto _OVER;
×
341
  }
342

343
  if (ths->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
×
344
    code = TSDB_CODE_SUCCESS;
×
345
    raftStoreNextTerm(ths);
×
346
    if (terrno != TSDB_CODE_SUCCESS) {
×
347
      code = terrno;
×
348
      sError("vgId:%d, failed to set next term since:%s", ths->vgId, tstrerror(code));
×
349
      goto _OVER;
×
350
    }
351
    syncNodeBecomeAssignedLeader(ths);
×
352

353
    if ((code = syncNodeAppendNoop(ths)) < 0) {
×
354
      sError("vgId:%d, assigned leader failed to append noop entry since %s", ths->vgId, tstrerror(code));
×
355
    }
356
  }
357

358
  SVArbSetAssignedLeaderRsp rsp = {0};
×
359
  rsp.arbToken = req.arbToken;
×
360
  rsp.memberToken = req.memberToken;
×
361
  rsp.vgId = ths->vgId;
×
362

363
  contLen = tSerializeSVArbSetAssignedLeaderRsp(NULL, 0, &rsp);
×
364
  if (contLen <= 0) {
×
365
    code = TSDB_CODE_OUT_OF_MEMORY;
×
366
    sError("vgId:%d, failed to serialize SVArbSetAssignedLeaderRsp", ths->vgId);
×
367
    goto _OVER;
×
368
  }
369
  pHead = rpcMallocCont(contLen);
×
370
  if (!pHead) {
×
371
    code = terrno;
×
372
    sError("vgId:%d, failed to malloc memory for SVArbSetAssignedLeaderRsp", ths->vgId);
×
373
    goto _OVER;
×
374
  }
375
  if (tSerializeSVArbSetAssignedLeaderRsp(pHead, contLen, &rsp) <= 0) {
×
376
    code = TSDB_CODE_OUT_OF_MEMORY;
×
377
    sError("vgId:%d, failed to serialize SVArbSetAssignedLeaderRsp", ths->vgId);
×
378
    rpcFreeCont(pHead);
×
379
    goto _OVER;
×
380
  }
381

382
  code = TSDB_CODE_SUCCESS;
×
383

384
_OVER:;
×
385
  SRpcMsg rspMsg = {
×
386
      .code = code,
387
      .pCont = pHead,
388
      .contLen = contLen,
389
      .info = pRpcMsg->info,
390
  };
391

392
  tmsgSendRsp(&rspMsg);
×
393

394
  tFreeSVArbSetAssignedLeaderReq(&req);
×
395
  TAOS_RETURN(code);
×
396
}
397

398
int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) {
×
399
  int32_t    code = 0;
×
400
  SSyncNode* pNode = syncNodeAcquire(rid);
×
401
  if (pNode == NULL) {
×
402
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
403
    if (terrno != 0) code = terrno;
×
404
    TAOS_RETURN(code);
×
405
  }
406

407
  SRpcMsg rpcMsg = {0, .info.notFreeAhandle = 1};
×
408
  int32_t ret = syncRespMgrGetAndDel(pNode->pSyncRespMgr, seq, &rpcMsg.info);
×
409
  rpcMsg.code = TSDB_CODE_SYN_TIMEOUT;
×
410

411
  syncNodeRelease(pNode);
×
412
  if (ret == 1) {
×
413
    sInfo("send timeout response, seq:%" PRId64 " handle:%p ahandle:%p", seq, rpcMsg.info.handle, rpcMsg.info.ahandle);
×
414
    code = rpcSendResponse(&rpcMsg);
×
415
    return code;
×
416
  } else {
417
    sError("no message handle to send timeout response, seq:%" PRId64, seq);
×
418
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
419
  }
420
}
421

422
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) {
41,465✔
423
  SyncIndex minMatchIndex = SYNC_INDEX_INVALID;
41,465✔
424

425
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
123,920✔
426
    SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
82,455✔
427
    if (minMatchIndex == SYNC_INDEX_INVALID) {
82,455✔
428
      minMatchIndex = matchIndex;
44,367✔
429
    } else if (matchIndex > 0 && matchIndex < minMatchIndex) {
38,088✔
430
      minMatchIndex = matchIndex;
1,492✔
431
    }
432
  }
433
  return minMatchIndex;
41,465✔
434
}
435

436
static SyncIndex syncLogRetentionIndex(SSyncNode* pSyncNode, int64_t bytes) {
791✔
437
  return pSyncNode->pLogStore->syncLogIndexRetention(pSyncNode->pLogStore, bytes);
791✔
438
}
439

440
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
30,911✔
441
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
30,911✔
442
  int32_t    code = 0;
30,911✔
443
  if (pSyncNode == NULL) {
30,911✔
444
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
26✔
445
    if (terrno != 0) code = terrno;
26!
446
    sError("sync begin snapshot error");
26!
447
    TAOS_RETURN(code);
26✔
448
  }
449

450
  SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
30,885✔
451
  SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
30,885✔
452
  bool      isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);
30,885✔
453

454
  if (isEmpty || !(lastApplyIndex >= beginIndex && lastApplyIndex <= endIndex)) {
30,885!
455
    sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", empty:%d, do not delete wal", lastApplyIndex, isEmpty);
59!
456
    syncNodeRelease(pSyncNode);
59✔
457
    return 0;
59✔
458
  }
459

460
  int64_t logRetention = 0;
30,826✔
461

462
  if (syncNodeIsMnode(pSyncNode)) {
30,826✔
463
    // mnode
464
    logRetention = tsMndLogRetention;
3,046✔
465
  } else {
466
    // vnode
467
    if (pSyncNode->replicaNum > 1) {
27,780✔
468
      logRetention = SYNC_VNODE_LOG_RETENTION;
392✔
469
    }
470
  }
471

472
  if (pSyncNode->totalReplicaNum > 1) {
30,826✔
473
    if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER &&
806✔
474
        pSyncNode->state != TAOS_SYNC_STATE_LEARNER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
155!
475
      sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " candidate or unknown state, do not delete wal",
15!
476
              lastApplyIndex);
477
      syncNodeRelease(pSyncNode);
15✔
478
      return 0;
15✔
479
    }
480
    SyncIndex retentionIndex =
791✔
481
        TMAX(pSyncNode->minMatchIndex, syncLogRetentionIndex(pSyncNode, SYNC_WAL_LOG_RETENTION_SIZE));
791✔
482
    logRetention += TMAX(0, lastApplyIndex - retentionIndex);
791✔
483
  }
484

485
_DEL_WAL:
30,020✔
486

487
  do {
488
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
30,811✔
489
    SyncIndex          snapshotVer = walGetSnapshotVer(pData->pWal);
30,811✔
490
    SyncIndex          walCommitVer = walGetCommittedVer(pData->pWal);
30,811✔
491
    SyncIndex          wallastVer = walGetLastVer(pData->pWal);
30,811✔
492
    if (lastApplyIndex <= walCommitVer) {
30,811!
493
      SyncIndex snapshottingIndex = atomic_load_64(&pSyncNode->snapshottingIndex);
30,811✔
494

495
      if (snapshottingIndex == SYNC_INDEX_INVALID) {
30,811!
496
        atomic_store_64(&pSyncNode->snapshottingIndex, lastApplyIndex);
30,811✔
497
        pSyncNode->snapshottingTime = taosGetTimestampMs();
30,811✔
498

499
        code = walBeginSnapshot(pData->pWal, lastApplyIndex, logRetention);
30,811✔
500
        if (code == 0) {
30,811!
501
          sNTrace(pSyncNode, "wal snapshot begin, index:%" PRId64 ", last apply index:%" PRId64,
30,811✔
502
                  pSyncNode->snapshottingIndex, lastApplyIndex);
503
        } else {
504
          sNError(pSyncNode, "wal snapshot begin error since:%s, index:%" PRId64 ", last apply index:%" PRId64,
×
505
                  terrstr(), pSyncNode->snapshottingIndex, lastApplyIndex);
506
          atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
×
507
        }
508

509
      } else {
510
        sNTrace(pSyncNode, "snapshotting for %" PRId64 ", do not delete wal for new-snapshot-index:%" PRId64,
×
511
                snapshottingIndex, lastApplyIndex);
512
      }
513
    }
514
  } while (0);
515

516
  syncNodeRelease(pSyncNode);
30,811✔
517
  TAOS_RETURN(code);
30,811✔
518
}
519

520
int32_t syncEndSnapshot(int64_t rid) {
30,882✔
521
  int32_t    code = 0;
30,882✔
522
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
30,882✔
523
  if (pSyncNode == NULL) {
30,884!
524
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
525
    if (terrno != 0) code = terrno;
×
526
    sError("sync end snapshot error");
×
527
    TAOS_RETURN(code);
×
528
  }
529

530
  if (atomic_load_64(&pSyncNode->snapshottingIndex) != SYNC_INDEX_INVALID) {
30,884✔
531
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
30,810✔
532
    code = walEndSnapshot(pData->pWal);
30,810✔
533
    if (code != 0) {
30,810!
534
      sNError(pSyncNode, "wal snapshot end error since:%s", tstrerror(code));
×
535
      syncNodeRelease(pSyncNode);
×
536
      TAOS_RETURN(code);
×
537
    } else {
538
      sNTrace(pSyncNode, "wal snapshot end, index:%" PRId64, atomic_load_64(&pSyncNode->snapshottingIndex));
30,810✔
539
      atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
30,810✔
540
    }
541
  }
542

543
  syncNodeRelease(pSyncNode);
30,884✔
544
  TAOS_RETURN(code);
30,884✔
545
}
546

547
bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) {
14,743,188✔
548
  if (pSyncNode == NULL) {
14,743,188!
549
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
550
    sError("sync ready for read error");
×
551
    return false;
×
552
  }
553

554
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
14,743,188!
555
    terrno = TSDB_CODE_SYN_NOT_LEADER;
64,212✔
556
    return false;
64,212✔
557
  }
558

559
  if (!pSyncNode->restoreFinish) {
14,678,976✔
560
    terrno = TSDB_CODE_SYN_RESTORING;
42,532✔
561
    return false;
42,528✔
562
  }
563

564
  return true;
14,636,444✔
565
}
566

567
bool syncIsReadyForRead(int64_t rid) {
13,093,975✔
568
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
13,093,975✔
569
  if (pSyncNode == NULL) {
13,100,683!
570
    sError("sync ready for read error");
×
571
    return false;
×
572
  }
573

574
  bool ready = syncNodeIsReadyForRead(pSyncNode);
13,100,683✔
575

576
  syncNodeRelease(pSyncNode);
13,099,730✔
577
  return ready;
13,094,794✔
578
}
579

580
#ifdef BUILD_NO_CALL
581
bool syncSnapshotSending(int64_t rid) {
582
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
583
  if (pSyncNode == NULL) {
584
    return false;
585
  }
586

587
  bool b = syncNodeSnapshotSending(pSyncNode);
588
  syncNodeRelease(pSyncNode);
589
  return b;
590
}
591

592
bool syncSnapshotRecving(int64_t rid) {
593
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
594
  if (pSyncNode == NULL) {
595
    return false;
596
  }
597

598
  bool b = syncNodeSnapshotRecving(pSyncNode);
599
  syncNodeRelease(pSyncNode);
600
  return b;
601
}
602
#endif
603

604
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) {
13,363✔
605
  if (pSyncNode->peersNum == 0) {
13,363✔
606
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
10,101✔
607
    return 0;
10,101✔
608
  }
609

610
  int32_t ret = 0;
3,262✔
611
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && pSyncNode->replicaNum > 1) {
3,262✔
612
    SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0];
984✔
613
    if (pSyncNode->peersNum == 2) {
984✔
614
      SyncIndex matchIndex0 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
696✔
615
      SyncIndex matchIndex1 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[1]));
696✔
616
      if (matchIndex1 > matchIndex0) {
696✔
617
        newLeader = (pSyncNode->peersNodeInfo)[1];
49✔
618
      }
619
    }
620
    ret = syncNodeLeaderTransferTo(pSyncNode, newLeader);
984✔
621
  }
622

623
  return ret;
3,262✔
624
}
625

626
int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) {
984✔
627
  if (pSyncNode->replicaNum == 1) {
984!
628
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
×
629
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
630
  }
631

632
  sNTrace(pSyncNode, "begin leader transfer to %s:%u", newLeader.nodeFqdn, newLeader.nodePort);
984!
633

634
  SRpcMsg rpcMsg = {0};
984✔
635
  TAOS_CHECK_RETURN(syncBuildLeaderTransfer(&rpcMsg, pSyncNode->vgId));
984!
636

637
  SyncLeaderTransfer* pMsg = rpcMsg.pCont;
984✔
638
  pMsg->newLeaderId.addr = SYNC_ADDR(&newLeader);
984✔
639
  pMsg->newLeaderId.vgId = pSyncNode->vgId;
984✔
640
  pMsg->newNodeInfo = newLeader;
984✔
641

642
  int32_t ret = syncNodePropose(pSyncNode, &rpcMsg, false, NULL);
984✔
643
  rpcFreeCont(rpcMsg.pCont);
984✔
644
  return ret;
984✔
645
}
646

647
SSyncState syncGetState(int64_t rid) {
5,833,383✔
648
  SSyncState state = {.state = TAOS_SYNC_STATE_ERROR};
5,833,383✔
649

650
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
5,833,383✔
651
  if (pSyncNode != NULL) {
5,834,137✔
652
    state.state = pSyncNode->state;
5,834,067✔
653
    state.roleTimeMs = pSyncNode->roleTimeMs;
5,834,067✔
654
    state.startTimeMs = pSyncNode->startTime;
5,834,067✔
655
    state.restored = pSyncNode->restoreFinish;
5,834,067✔
656
    if (pSyncNode->vgId != 1) {
5,834,067✔
657
      state.canRead = syncNodeIsReadyForRead(pSyncNode);
1,649,110✔
658
    } else {
659
      state.canRead = state.restored;
4,184,957✔
660
    }
661
    /*
662
    double progress = 0;
663
    if(pSyncNode->pLogBuf->totalIndex > 0 && pSyncNode->pLogBuf->commitIndex > 0){
664
      progress = (double)pSyncNode->pLogBuf->commitIndex/(double)pSyncNode->pLogBuf->totalIndex;
665
      state.progress = (int32_t)(progress * 100);
666
    }
667
    else{
668
      state.progress = -1;
669
    }
670
    sDebug("vgId:%d, learner progress state, commitIndex:%" PRId64 " totalIndex:%" PRId64 ", "
671
            "progress:%lf, progress:%d",
672
          pSyncNode->vgId,
673
         pSyncNode->pLogBuf->commitIndex, pSyncNode->pLogBuf->totalIndex, progress, state.progress);
674
    */
675
    state.term = raftStoreGetTerm(pSyncNode);
5,833,944✔
676
    syncNodeRelease(pSyncNode);
5,834,172✔
677
  }
678

679
  return state;
5,834,101✔
680
}
681

682
void syncGetCommitIndex(int64_t rid, int64_t* syncCommitIndex) {
946,199✔
683
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
946,199✔
684
  if (pSyncNode != NULL) {
946,199!
685
    *syncCommitIndex = pSyncNode->commitIndex;
946,199✔
686
    syncNodeRelease(pSyncNode);
946,199✔
687
  }
688
}
946,199✔
689

690
int32_t syncGetArbToken(int64_t rid, char* outToken) {
17,916✔
691
  int32_t    code = 0;
17,916✔
692
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
17,916✔
693
  if (pSyncNode == NULL) {
17,916!
694
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
695
    if (terrno != 0) code = terrno;
×
696
    TAOS_RETURN(code);
×
697
  }
698

699
  memset(outToken, 0, TSDB_ARB_TOKEN_SIZE);
17,916✔
700
  (void)taosThreadMutexLock(&pSyncNode->arbTokenMutex);
17,916✔
701
  tstrncpy(outToken, pSyncNode->arbToken, TSDB_ARB_TOKEN_SIZE);
17,916✔
702
  (void)taosThreadMutexUnlock(&pSyncNode->arbTokenMutex);
17,916✔
703

704
  syncNodeRelease(pSyncNode);
17,916✔
705
  TAOS_RETURN(code);
17,916✔
706
}
707

708
int32_t syncCheckSynced(int64_t rid) {
2✔
709
  int32_t    code = 0;
2✔
710
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
2✔
711
  if (pSyncNode == NULL) {
2!
712
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
713
    if (terrno != 0) code = terrno;
×
714
    TAOS_RETURN(code);
×
715
  }
716

717
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
2!
718
    code = TSDB_CODE_VND_ARB_NOT_SYNCED;
×
719
    syncNodeRelease(pSyncNode);
×
720
    TAOS_RETURN(code);
×
721
  }
722

723
  bool isSync = pSyncNode->commitIndex >= pSyncNode->assignedCommitIndex;
2✔
724
  code = (isSync ? TSDB_CODE_SUCCESS : TSDB_CODE_VND_ARB_NOT_SYNCED);
2!
725

726
  syncNodeRelease(pSyncNode);
2✔
727
  TAOS_RETURN(code);
2✔
728
}
729

730
int32_t syncUpdateArbTerm(int64_t rid, SyncTerm arbTerm) {
49✔
731
  int32_t    code = 0;
49✔
732
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
49✔
733
  if (pSyncNode == NULL) {
49!
734
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
735
    if (terrno != 0) code = terrno;
×
736
    TAOS_RETURN(code);
×
737
  }
738

739
  pSyncNode->arbTerm = TMAX(arbTerm, pSyncNode->arbTerm);
49✔
740
  syncNodeRelease(pSyncNode);
49✔
741
  TAOS_RETURN(code);
49✔
742
}
743

744
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex) {
1,663,860✔
745
  if (!(pSyncNode->raftCfg.configIndexCount >= 1)) {
1,663,860!
746
    sError("vgId:%d, failed get snapshot config index, configIndexCount:%d", pSyncNode->vgId,
×
747
           pSyncNode->raftCfg.configIndexCount);
748
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
749
    return -2;
×
750
  }
751
  SyncIndex lastIndex = (pSyncNode->raftCfg.configIndexArr)[0];
1,663,860✔
752

753
  for (int32_t i = 0; i < pSyncNode->raftCfg.configIndexCount; ++i) {
3,369,523✔
754
    if ((pSyncNode->raftCfg.configIndexArr)[i] > lastIndex &&
1,705,663✔
755
        (pSyncNode->raftCfg.configIndexArr)[i] <= snapshotLastApplyIndex) {
41,799!
756
      lastIndex = (pSyncNode->raftCfg.configIndexArr)[i];
41,799✔
757
    }
758
  }
759
  sTrace("vgId:%d, sync get last config index, index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId,
1,663,860✔
760
         snapshotLastApplyIndex, lastIndex);
761

762
  return lastIndex;
1,663,869✔
763
}
764

765
static SRaftId syncGetRaftIdByEp(SSyncNode* pSyncNode, const SEp* pEp) {
99,788✔
766
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
203,712✔
767
    if (strcmp(pEp->fqdn, (pSyncNode->peersNodeInfo)[i].nodeFqdn) == 0 &&
169,650!
768
        pEp->port == (pSyncNode->peersNodeInfo)[i].nodePort) {
169,650✔
769
      return pSyncNode->peersId[i];
65,726✔
770
    }
771
  }
772
  return EMPTY_RAFT_ID;
34,062✔
773
}
774

775
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
34,158✔
776
  pEpSet->numOfEps = 0;
34,158✔
777

778
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
34,158✔
779
  if (pSyncNode == NULL) return;
34,158!
780

781
  int index = -1;
34,158✔
782

783
  int j = 0;
34,158✔
784
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
135,113✔
785
    if (pSyncNode->raftCfg.cfg.nodeInfo[i].nodeRole == TAOS_SYNC_ROLE_LEARNER) continue;
100,955✔
786
    SEp* pEp = &pEpSet->eps[j];
99,788✔
787
    tstrncpy(pEp->fqdn, pSyncNode->raftCfg.cfg.nodeInfo[i].nodeFqdn, TSDB_FQDN_LEN);
99,788✔
788
    pEp->port = (pSyncNode->raftCfg.cfg.nodeInfo)[i].nodePort;
99,788✔
789
    pEpSet->numOfEps++;
99,788✔
790
    sDebug("vgId:%d, sync get retry epset, index:%d %s:%d", pSyncNode->vgId, i, pEp->fqdn, pEp->port);
99,788✔
791
    SRaftId id = syncGetRaftIdByEp(pSyncNode, pEp);
99,788✔
792
    if (id.addr == pSyncNode->leaderCache.addr && id.vgId == pSyncNode->leaderCache.vgId && id.addr != 0 &&
99,788!
793
        id.vgId != 0)
13,255!
794
      index = j;
13,255✔
795
    j++;
99,788✔
796
  }
797
  if (pEpSet->numOfEps > 0) {
34,158!
798
    if (index != -1) {
34,158✔
799
      pEpSet->inUse = index;
13,255✔
800
    } else {
801
      if (pSyncNode->myRaftId.addr == pSyncNode->leaderCache.addr &&
20,903✔
802
          pSyncNode->myRaftId.vgId == pSyncNode->leaderCache.vgId) {
684!
803
        pEpSet->inUse = pSyncNode->raftCfg.cfg.myIndex;
684✔
804
      } else {
805
        pEpSet->inUse = (pSyncNode->raftCfg.cfg.myIndex + 1) % pEpSet->numOfEps;
20,219✔
806
      }
807
    }
808
    // pEpSet->inUse = 0;
809
  }
810
  epsetSort(pEpSet);
34,158✔
811

812
  sInfo("vgId:%d, sync get retry epset numOfEps:%d inUse:%d", pSyncNode->vgId, pEpSet->numOfEps, pEpSet->inUse);
34,158!
813
  syncNodeRelease(pSyncNode);
34,158✔
814
}
815

816
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
10,284,054✔
817
  int32_t    code = 0;
10,284,054✔
818
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
10,284,054✔
819
  if (pSyncNode == NULL) {
10,284,342✔
820
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
1✔
821
    if (terrno != 0) code = terrno;
1!
822
    sError("sync propose error");
1!
823
    TAOS_RETURN(code);
1✔
824
  }
825

826
  int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak, seq);
10,284,341✔
827
  syncNodeRelease(pSyncNode);
10,283,908✔
828
  return ret;
10,284,129✔
829
}
830

831
int32_t syncCheckMember(int64_t rid) {
×
832
  int32_t    code = 0;
×
833
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
×
834
  if (pSyncNode == NULL) {
×
835
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
836
    if (terrno != 0) code = terrno;
×
837
    sError("sync propose error");
×
838
    TAOS_RETURN(code);
×
839
  }
840

841
  if (pSyncNode->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_LEARNER) {
×
842
    syncNodeRelease(pSyncNode);
×
843
    return TSDB_CODE_SYN_WRONG_ROLE;
×
844
  }
845

846
  syncNodeRelease(pSyncNode);
×
847
  return 0;
×
848
}
849

850
int32_t syncIsCatchUp(int64_t rid) {
4,837✔
851
  int32_t    code = 0;
4,837✔
852
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
4,837✔
853
  if (pSyncNode == NULL) {
4,837!
854
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
855
    if (terrno != 0) code = terrno;
×
856
    sError("sync Node Acquire error since %d", errno);
×
857
    TAOS_RETURN(code);
×
858
  }
859

860
  int32_t isCatchUp = 0;
4,837✔
861
  if (pSyncNode->pLogBuf->totalIndex < 0 || pSyncNode->pLogBuf->commitIndex < 0 ||
4,837!
862
      pSyncNode->pLogBuf->totalIndex < pSyncNode->pLogBuf->commitIndex ||
1,030!
863
      pSyncNode->pLogBuf->totalIndex - pSyncNode->pLogBuf->commitIndex > SYNC_LEARNER_CATCHUP) {
1,030✔
864
    sInfo("vgId:%d, Not catch up, wait one second, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64,
4,576!
865
          pSyncNode->vgId, pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex,
866
          pSyncNode->pLogBuf->matchIndex);
867
    isCatchUp = 0;
4,576✔
868
  } else {
869
    sInfo("vgId:%d, Catch up, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64, pSyncNode->vgId,
261!
870
          pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex, pSyncNode->pLogBuf->matchIndex);
871
    isCatchUp = 1;
261✔
872
  }
873

874
  syncNodeRelease(pSyncNode);
4,837✔
875
  return isCatchUp;
4,837✔
876
}
877

878
ESyncRole syncGetRole(int64_t rid) {
4,837✔
879
  int32_t    code = 0;
4,837✔
880
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
4,837✔
881
  if (pSyncNode == NULL) {
4,837!
882
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
883
    if (terrno != 0) code = terrno;
×
884
    sError("sync Node Acquire error since %d", errno);
×
885
    TAOS_RETURN(code);
×
886
  }
887

888
  ESyncRole role = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole;
4,837✔
889

890
  syncNodeRelease(pSyncNode);
4,837✔
891
  return role;
4,837✔
892
}
893

894
int64_t syncGetTerm(int64_t rid) {
5,814✔
895
  int32_t    code = 0;
5,814✔
896
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
5,814✔
897
  if (pSyncNode == NULL) {
5,814!
898
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
899
    if (terrno != 0) code = terrno;
×
900
    sError("sync Node Acquire error since %d", errno);
×
901
    TAOS_RETURN(code);
×
902
  }
903

904
  int64_t term = raftStoreGetTerm(pSyncNode);
5,814✔
905

906
  syncNodeRelease(pSyncNode);
5,814✔
907
  return term;
5,814✔
908
}
909

910
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
10,285,038✔
911
  int32_t code = 0;
10,285,038✔
912
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
10,285,038!
913
    code = TSDB_CODE_SYN_NOT_LEADER;
8,691✔
914
    sNWarn(pSyncNode, "sync propose not leader, type:%s", TMSG_INFO(pMsg->msgType));
8,691!
915
    TAOS_RETURN(code);
8,691✔
916
  }
917

918
  if (!pSyncNode->restoreFinish) {
10,276,347!
919
    code = TSDB_CODE_SYN_PROPOSE_NOT_READY;
×
920
    sNWarn(pSyncNode, "failed to sync propose since not ready, type:%s, last:%" PRId64 ", cmt:%" PRId64,
×
921
           TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
922
    TAOS_RETURN(code);
×
923
  }
924

925
  // heartbeat timeout
926
  if (pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER && syncNodeHeartbeatReplyTimeout(pSyncNode)) {
10,276,347!
927
    code = TSDB_CODE_SYN_PROPOSE_NOT_READY;
2✔
928
    sNError(pSyncNode, "failed to sync propose since heartbeat timeout, type:%s, last:%" PRId64 ", cmt:%" PRId64,
2!
929
            TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
930
    TAOS_RETURN(code);
2✔
931
  }
932

933
  // optimized one replica
934
  if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) {
10,276,395✔
935
    SyncIndex retIndex;
936
    int32_t   code = syncNodeOnClientRequest(pSyncNode, pMsg, &retIndex);
10,058,004✔
937
    if (code >= 0) {
10,057,628!
938
      pMsg->info.conn.applyIndex = retIndex;
10,057,655✔
939
      pMsg->info.conn.applyTerm = raftStoreGetTerm(pSyncNode);
10,057,655✔
940

941
      // after raft member change, need to handle 1->2 switching point
942
      // at this point, need to switch entry handling thread
943
      if (pSyncNode->replicaNum == 1) {
10,058,048✔
944
        sTrace("vgId:%d, propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
10,058,024!
945
               TMSG_INFO(pMsg->msgType));
946
        return 1;
10,058,017✔
947
      } else {
948
        sTrace("vgId:%d, propose optimized msg, return to normal, index:%" PRId64
24!
949
               " type:%s, "
950
               "handle:%p",
951
               pSyncNode->vgId, retIndex, TMSG_INFO(pMsg->msgType), pMsg->info.handle);
952
        return 0;
×
953
      }
954
    } else {
955
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
956
      sError("vgId:%d, failed to propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
×
957
             TMSG_INFO(pMsg->msgType));
958
      TAOS_RETURN(code);
×
959
    }
960
  } else {
961
    SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
218,537✔
962
    uint64_t  seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);
218,553✔
963
    SRpcMsg   rpcMsg = {0};
218,556✔
964
    int32_t   code = syncBuildClientRequest(&rpcMsg, pMsg, seqNum, isWeak, pSyncNode->vgId);
218,556✔
965
    if (code != 0) {
218,554!
966
      sError("vgId:%d, failed to propose msg while serialize since %s", pSyncNode->vgId, terrstr());
×
967
      code = syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
×
968
      TAOS_RETURN(code);
×
969
    }
970

971
    sNTrace(pSyncNode, "propose msg, type:%s", TMSG_INFO(pMsg->msgType));
218,554!
972
    code = (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg);
218,554✔
973
    if (code != 0) {
218,555✔
974
      sWarn("vgId:%d, failed to propose msg while enqueue since %s", pSyncNode->vgId, terrstr());
1,291!
975
      TAOS_CHECK_RETURN(syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum));
1,291✔
976
    }
977

978
    if (seq != NULL) *seq = seqNum;
218,473✔
979
    TAOS_RETURN(code);
218,473✔
980
  }
981
}
982

983
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId) {
222,441✔
984
  pSyncTimer->pTimer = NULL;
222,441✔
985
  pSyncTimer->counter = 0;
222,441✔
986
  pSyncTimer->timerMS = pSyncNode->hbBaseLine;
222,441✔
987
  pSyncTimer->timerCb = syncNodeEqPeerHeartbeatTimer;
222,441✔
988
  pSyncTimer->destId = destId;
222,441✔
989
  pSyncTimer->timeStamp = taosGetTimestampMs();
222,440✔
990
  atomic_store_64(&pSyncTimer->logicClock, 0);
222,440✔
991
  return 0;
222,442✔
992
}
993

994
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
2,330✔
995
  int32_t code = 0;
2,330✔
996
  int64_t tsNow = taosGetTimestampMs();
2,330✔
997
  if (syncIsInit()) {
2,330!
998
    SSyncHbTimerData* pData = syncHbTimerDataAcquire(pSyncTimer->hbDataRid);
2,330✔
999
    if (pData == NULL) {
2,330!
1000
      pData = taosMemoryMalloc(sizeof(SSyncHbTimerData));
2,330!
1001
      pData->rid = syncHbTimerDataAdd(pData);
2,330✔
1002
    }
1003
    pSyncTimer->hbDataRid = pData->rid;
2,330✔
1004
    pSyncTimer->timeStamp = tsNow;
2,330✔
1005

1006
    pData->syncNodeRid = pSyncNode->rid;
2,330✔
1007
    pData->pTimer = pSyncTimer;
2,330✔
1008
    pData->destId = pSyncTimer->destId;
2,330✔
1009
    pData->logicClock = pSyncTimer->logicClock;
2,330✔
1010
    pData->execTime = tsNow + pSyncTimer->timerMS;
2,330✔
1011

1012
    sTrace("vgId:%d, start hb timer, rid:%" PRId64 " addr:%" PRId64 " at %d", pSyncNode->vgId, pData->rid,
2,330!
1013
           pData->destId.addr, pSyncTimer->timerMS);
1014

1015
    bool stopped = taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS, (void*)(pData->rid), syncEnv()->pTimerManager,
2,330✔
1016
                                &pSyncTimer->pTimer);
2,330✔
1017
    if (stopped) {
2,330!
1018
      sError("vgId:%d, failed to reset hb timer success", pSyncNode->vgId);
×
1019
      return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1020
    }
1021
  } else {
1022
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1023
    sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId);
×
1024
  }
1025
  return code;
2,330✔
1026
}
1027

1028
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
24,041✔
1029
  int32_t ret = 0;
24,041✔
1030
  (void)atomic_add_fetch_64(&pSyncTimer->logicClock, 1);
24,041✔
1031
  bool stop = taosTmrStop(pSyncTimer->pTimer);
24,044✔
1032
  sDebug("vgId:%d, stop hb timer stop:%d", pSyncNode->vgId, stop);
24,041✔
1033
  pSyncTimer->pTimer = NULL;
24,041✔
1034
  syncHbTimerDataRemove(pSyncTimer->hbDataRid);
24,041✔
1035
  pSyncTimer->hbDataRid = -1;
24,043✔
1036
  return ret;
24,043✔
1037
}
1038

1039
int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) {
13,373✔
1040
  int32_t code = 0;
13,373✔
1041
  if (pNode->pLogStore == NULL) {
13,373!
1042
    sError("vgId:%d, log store not created", pNode->vgId);
×
1043
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1044
  }
1045
  if (pNode->pFsm == NULL) {
13,373!
1046
    sError("vgId:%d, pFsm not registered", pNode->vgId);
×
1047
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1048
  }
1049
  if (pNode->pFsm->FpGetSnapshotInfo == NULL) {
13,373!
1050
    sError("vgId:%d, FpGetSnapshotInfo not registered", pNode->vgId);
×
1051
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1052
  }
1053
  SSnapshot snapshot = {0};
13,373✔
1054
  // TODO check return value
1055
  (void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
13,373✔
1056

1057
  SyncIndex commitIndex = snapshot.lastApplyIndex;
13,373✔
1058
  SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
13,373✔
1059
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
13,373✔
1060
  if ((lastVer < commitIndex || firstVer > commitIndex + 1) || pNode->fsmState == SYNC_FSM_STATE_INCOMPLETE) {
13,373!
1061
    if ((code = pNode->pLogStore->syncLogRestoreFromSnapshot(pNode->pLogStore, commitIndex)) != 0) {
×
1062
      sError("vgId:%d, failed to restore log store from snapshot since %s. lastVer:%" PRId64 ", snapshotVer:%" PRId64,
×
1063
             pNode->vgId, terrstr(), lastVer, commitIndex);
1064
      TAOS_RETURN(code);
×
1065
    }
1066
  }
1067
  TAOS_RETURN(code);
13,373✔
1068
}
1069

1070
// open/close --------------
1071
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
13,371✔
1072
  int32_t    code = 0;
13,371✔
1073
  SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode));
13,371!
1074
  if (pSyncNode == NULL) {
13,373!
1075
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1076
    goto _error;
×
1077
  }
1078

1079
  if (!taosDirExist((char*)(pSyncInfo->path))) {
13,373✔
1080
    if (taosMkDir(pSyncInfo->path) != 0) {
10,714!
1081
      terrno = TAOS_SYSTEM_ERROR(errno);
×
1082
      sError("vgId:%d, failed to create dir:%s since %s", pSyncInfo->vgId, pSyncInfo->path, terrstr());
×
1083
      goto _error;
×
1084
    }
1085
  }
1086

1087
  memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
13,372✔
1088
  snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s%sraft_store.json", pSyncInfo->path,
13,372✔
1089
           TD_DIRSEP);
1090
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
13,372✔
1091

1092
  if (!taosCheckExistFile(pSyncNode->configPath)) {
13,372✔
1093
    // create a new raft config file
1094
    sInfo("vgId:%d, create a new raft config file", pSyncInfo->vgId);
10,714✔
1095
    pSyncNode->vgId = pSyncInfo->vgId;
10,714✔
1096
    pSyncNode->raftCfg.isStandBy = pSyncInfo->isStandBy;
10,714✔
1097
    pSyncNode->raftCfg.snapshotStrategy = pSyncInfo->snapshotStrategy;
10,714✔
1098
    pSyncNode->raftCfg.lastConfigIndex = pSyncInfo->syncCfg.lastIndex;
10,714✔
1099
    pSyncNode->raftCfg.batchSize = pSyncInfo->batchSize;
10,714✔
1100
    pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
10,714✔
1101
    pSyncNode->raftCfg.configIndexCount = 1;
10,714✔
1102
    pSyncNode->raftCfg.configIndexArr[0] = -1;
10,714✔
1103

1104
    if ((code = syncWriteCfgFile(pSyncNode)) != 0) {
10,714!
1105
      terrno = code;
×
1106
      sError("vgId:%d, failed to create sync cfg file", pSyncNode->vgId);
×
1107
      goto _error;
×
1108
    }
1109
  } else {
1110
    // update syncCfg by raft_config.json
1111
    if ((code = syncReadCfgFile(pSyncNode)) != 0) {
2,658!
1112
      terrno = code;
×
1113
      sError("vgId:%d, failed to read sync cfg file", pSyncNode->vgId);
×
1114
      goto _error;
×
1115
    }
1116

1117
    if (vnodeVersion > pSyncNode->raftCfg.cfg.changeVersion) {
2,658✔
1118
      if (pSyncInfo->syncCfg.totalReplicaNum > 0 && syncIsConfigChanged(&pSyncNode->raftCfg.cfg, &pSyncInfo->syncCfg)) {
1,675!
1119
        sInfo("vgId:%d, use sync config from input options and write to cfg file", pSyncNode->vgId);
1,384!
1120
        pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
1,384✔
1121
        if ((code = syncWriteCfgFile(pSyncNode)) != 0) {
1,384!
1122
          terrno = code;
×
1123
          sError("vgId:%d, failed to write sync cfg file", pSyncNode->vgId);
×
1124
          goto _error;
×
1125
        }
1126
      } else {
1127
        sInfo("vgId:%d, use sync config from sync cfg file", pSyncNode->vgId);
291!
1128
        pSyncInfo->syncCfg = pSyncNode->raftCfg.cfg;
291✔
1129
      }
1130
    } else {
1131
      sInfo("vgId:%d, skip save sync cfg file since request ver:%d <= file ver:%d", pSyncNode->vgId, vnodeVersion,
983!
1132
            pSyncInfo->syncCfg.changeVersion);
1133
    }
1134
  }
1135

1136
  // init by SSyncInfo
1137
  pSyncNode->vgId = pSyncInfo->vgId;
13,373✔
1138
  SSyncCfg* pCfg = &pSyncNode->raftCfg.cfg;
13,373✔
1139
  bool      updated = false;
13,373✔
1140
  sInfo("vgId:%d, start to open sync node, totalReplicaNum:%d replicaNum:%d selfIndex:%d", pSyncNode->vgId,
13,373✔
1141
        pCfg->totalReplicaNum, pCfg->replicaNum, pCfg->myIndex);
1142
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
33,304✔
1143
    SNodeInfo* pNode = &pCfg->nodeInfo[i];
19,931✔
1144
    if (tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort)) {
19,931!
1145
      updated = true;
×
1146
    }
1147
    sInfo("vgId:%d, index:%d ep:%s:%u dnode:%d cluster:%" PRId64, pSyncNode->vgId, i, pNode->nodeFqdn, pNode->nodePort,
19,931✔
1148
          pNode->nodeId, pNode->clusterId);
1149
  }
1150

1151
  if (vnodeVersion > pSyncInfo->syncCfg.changeVersion) {
13,373✔
1152
    if (updated) {
1,741!
1153
      sInfo("vgId:%d, save config info since dnode info changed", pSyncNode->vgId);
×
1154
      if ((code = syncWriteCfgFile(pSyncNode)) != 0) {
×
1155
        terrno = code;
×
1156
        sError("vgId:%d, failed to write sync cfg file on dnode info updated", pSyncNode->vgId);
×
1157
        goto _error;
×
1158
      }
1159
    }
1160
  }
1161

1162
  pSyncNode->pWal = pSyncInfo->pWal;
13,373✔
1163
  pSyncNode->msgcb = pSyncInfo->msgcb;
13,373✔
1164
  pSyncNode->syncSendMSg = pSyncInfo->syncSendMSg;
13,373✔
1165
  pSyncNode->syncEqMsg = pSyncInfo->syncEqMsg;
13,373✔
1166
  pSyncNode->syncEqCtrlMsg = pSyncInfo->syncEqCtrlMsg;
13,373✔
1167

1168
  // create raft log ring buffer
1169
  code = syncLogBufferCreate(&pSyncNode->pLogBuf);
13,373✔
1170
  if (pSyncNode->pLogBuf == NULL) {
13,373!
1171
    sError("failed to init sync log buffer since %s. vgId:%d", tstrerror(code), pSyncNode->vgId);
×
1172
    goto _error;
×
1173
  }
1174

1175
  // init replicaNum, replicasId
1176
  pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
13,373✔
1177
  pSyncNode->totalReplicaNum = pSyncNode->raftCfg.cfg.totalReplicaNum;
13,373✔
1178
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
33,304✔
1179
    if (syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]) ==
19,931!
1180
        false) {
1181
      sError("vgId:%d, failed to determine raft member id, replica:%d", pSyncNode->vgId, i);
×
1182
      goto _error;
×
1183
    }
1184
  }
1185

1186
  // init internal
1187
  pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
13,373✔
1188
  pSyncNode->myRaftId = pSyncNode->replicasId[pSyncNode->raftCfg.cfg.myIndex];
13,373✔
1189

1190
  // init peersNum, peers, peersId
1191
  pSyncNode->peersNum = pSyncNode->raftCfg.cfg.totalReplicaNum - 1;
13,373✔
1192
  int32_t j = 0;
13,373✔
1193
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
33,304✔
1194
    if (i != pSyncNode->raftCfg.cfg.myIndex) {
19,931✔
1195
      pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
6,558✔
1196
      pSyncNode->peersId[j] = pSyncNode->replicasId[i];
6,558✔
1197
      syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
6,558✔
1198
      j++;
6,558✔
1199
    }
1200
  }
1201

1202
  pSyncNode->arbTerm = -1;
13,373✔
1203
  (void)taosThreadMutexInit(&pSyncNode->arbTokenMutex, NULL);
13,373✔
1204
  syncUtilGenerateArbToken(pSyncNode->myNodeInfo.nodeId, pSyncInfo->vgId, pSyncNode->arbToken);
13,373✔
1205
  sInfo("vgId:%d, generate arb token:%s", pSyncNode->vgId, pSyncNode->arbToken);
13,373✔
1206

1207
  // init raft algorithm
1208
  pSyncNode->pFsm = pSyncInfo->pFsm;
13,373✔
1209
  pSyncInfo->pFsm = NULL;
13,373✔
1210
  pSyncNode->quorum = syncUtilQuorum(pSyncNode->raftCfg.cfg.replicaNum);
13,373✔
1211
  pSyncNode->leaderCache = EMPTY_RAFT_ID;
13,373✔
1212

1213
  // init life cycle outside
1214

1215
  // TLA+ Spec
1216
  // InitHistoryVars == /\ elections = {}
1217
  //                    /\ allLogs   = {}
1218
  //                    /\ voterLog  = [i \in Server |-> [j \in {} |-> <<>>]]
1219
  // InitServerVars == /\ currentTerm = [i \in Server |-> 1]
1220
  //                   /\ state       = [i \in Server |-> Follower]
1221
  //                   /\ votedFor    = [i \in Server |-> Nil]
1222
  // InitCandidateVars == /\ votesResponded = [i \in Server |-> {}]
1223
  //                      /\ votesGranted   = [i \in Server |-> {}]
1224
  // \* The values nextIndex[i][i] and matchIndex[i][i] are never read, since the
1225
  // \* leader does not send itself messages. It's still easier to include these
1226
  // \* in the functions.
1227
  // InitLeaderVars == /\ nextIndex  = [i \in Server |-> [j \in Server |-> 1]]
1228
  //                   /\ matchIndex = [i \in Server |-> [j \in Server |-> 0]]
1229
  // InitLogVars == /\ log          = [i \in Server |-> << >>]
1230
  //                /\ commitIndex  = [i \in Server |-> 0]
1231
  // Init == /\ messages = [m \in {} |-> 0]
1232
  //         /\ InitHistoryVars
1233
  //         /\ InitServerVars
1234
  //         /\ InitCandidateVars
1235
  //         /\ InitLeaderVars
1236
  //         /\ InitLogVars
1237
  //
1238

1239
  // init TLA+ server vars
1240
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
13,373✔
1241
  pSyncNode->roleTimeMs = taosGetTimestampMs();
13,373✔
1242
  if ((code = raftStoreOpen(pSyncNode)) != 0) {
13,373!
1243
    terrno = code;
×
1244
    sError("vgId:%d, failed to open raft store at path %s", pSyncNode->vgId, pSyncNode->raftStorePath);
×
1245
    goto _error;
×
1246
  }
1247

1248
  // init TLA+ candidate vars
1249
  pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode);
13,373✔
1250
  if (pSyncNode->pVotesGranted == NULL) {
13,373!
1251
    sError("vgId:%d, failed to create VotesGranted", pSyncNode->vgId);
×
1252
    goto _error;
×
1253
  }
1254
  pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode);
13,373✔
1255
  if (pSyncNode->pVotesRespond == NULL) {
13,373!
1256
    sError("vgId:%d, failed to create VotesRespond", pSyncNode->vgId);
×
1257
    goto _error;
×
1258
  }
1259

1260
  // init TLA+ leader vars
1261
  pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode);
13,373✔
1262
  if (pSyncNode->pNextIndex == NULL) {
13,370!
1263
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
×
1264
    goto _error;
×
1265
  }
1266
  pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode);
13,370✔
1267
  if (pSyncNode->pMatchIndex == NULL) {
13,373!
1268
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
×
1269
    goto _error;
×
1270
  }
1271

1272
  // init TLA+ log vars
1273
  pSyncNode->pLogStore = logStoreCreate(pSyncNode);
13,373✔
1274
  if (pSyncNode->pLogStore == NULL) {
13,368!
1275
    sError("vgId:%d, failed to create SyncLogStore", pSyncNode->vgId);
×
1276
    goto _error;
×
1277
  }
1278

1279
  SyncIndex commitIndex = SYNC_INDEX_INVALID;
13,368✔
1280
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
13,368!
1281
    SSnapshot snapshot = {0};
13,371✔
1282
    // TODO check return value
1283
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
13,371✔
1284
    if (snapshot.lastApplyIndex > commitIndex) {
13,373✔
1285
      commitIndex = snapshot.lastApplyIndex;
1,233✔
1286
      sNTrace(pSyncNode, "reset commit index by snapshot");
1,233!
1287
    }
1288
    pSyncNode->fsmState = snapshot.state;
13,373✔
1289
    if (pSyncNode->fsmState == SYNC_FSM_STATE_INCOMPLETE) {
13,373!
1290
      sError("vgId:%d, fsm state is incomplete.", pSyncNode->vgId);
×
1291
      if (pSyncNode->replicaNum == 1) {
×
1292
        terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1293
        goto _error;
×
1294
      }
1295
    }
1296
  }
1297
  pSyncNode->commitIndex = commitIndex;
13,370✔
1298
  sInfo("vgId:%d, sync node commitIndex initialized as %" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
13,370✔
1299

1300
  // restore log store on need
1301
  if ((code = syncNodeLogStoreRestoreOnNeed(pSyncNode)) < 0) {
13,370!
1302
    terrno = code;
×
1303
    sError("vgId:%d, failed to restore log store since %s.", pSyncNode->vgId, terrstr());
×
1304
    goto _error;
×
1305
  }
1306

1307
  // timer ms init
1308
  pSyncNode->pingBaseLine = PING_TIMER_MS;
13,373✔
1309
  pSyncNode->electBaseLine = tsElectInterval;
13,373✔
1310
  pSyncNode->hbBaseLine = tsHeartbeatInterval;
13,373✔
1311

1312
  // init ping timer
1313
  pSyncNode->pPingTimer = NULL;
13,373✔
1314
  pSyncNode->pingTimerMS = pSyncNode->pingBaseLine;
13,373✔
1315
  atomic_store_64(&pSyncNode->pingTimerLogicClock, 0);
13,373✔
1316
  atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0);
13,373✔
1317
  pSyncNode->FpPingTimerCB = syncNodeEqPingTimer;
13,373✔
1318
  pSyncNode->pingTimerCounter = 0;
13,373✔
1319

1320
  // init elect timer
1321
  pSyncNode->pElectTimer = NULL;
13,373✔
1322
  pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
13,373✔
1323
  atomic_store_64(&pSyncNode->electTimerLogicClock, 0);
13,373✔
1324
  pSyncNode->FpElectTimerCB = syncNodeEqElectTimer;
13,373✔
1325
  pSyncNode->electTimerCounter = 0;
13,373✔
1326

1327
  // init heartbeat timer
1328
  pSyncNode->pHeartbeatTimer = NULL;
13,373✔
1329
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
13,373✔
1330
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0);
13,373✔
1331
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0);
13,373✔
1332
#ifdef BUILD_NO_CALL
1333
  pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer;
1334
#endif
1335
  pSyncNode->heartbeatTimerCounter = 0;
13,372✔
1336

1337
  // init peer heartbeat timer
1338
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
213,959✔
1339
    if ((code = syncHbTimerInit(pSyncNode, &(pSyncNode->peerHeartbeatTimerArr[i]), (pSyncNode->replicasId)[i])) != 0) {
200,586!
1340
      errno = code;
×
1341
      goto _error;
×
1342
    }
1343
  }
1344

1345
  // tools
1346
  if ((code = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS, &pSyncNode->pSyncRespMgr)) != 0) {
13,373!
1347
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
×
1348
    goto _error;
×
1349
  }
1350
  if (pSyncNode->pSyncRespMgr == NULL) {
13,373!
1351
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
×
1352
    goto _error;
×
1353
  }
1354

1355
  // restore state
1356
  pSyncNode->restoreFinish = false;
13,373✔
1357

1358
  // snapshot senders
1359
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
213,889✔
1360
    SSyncSnapshotSender* pSender = NULL;
200,508✔
1361
    code = snapshotSenderCreate(pSyncNode, i, &pSender);
200,508✔
1362
    if (pSender == NULL) return NULL;
200,517!
1363

1364
    pSyncNode->senders[i] = pSender;
200,517✔
1365
    sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender);
200,517✔
1366
  }
1367

1368
  // snapshot receivers
1369
  code = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID, &pSyncNode->pNewNodeReceiver);
13,381✔
1370
  if (pSyncNode->pNewNodeReceiver == NULL) return NULL;
13,373!
1371
  sRDebug(pSyncNode->pNewNodeReceiver, "snapshot receiver create while open sync node, data:%p",
13,373✔
1372
          pSyncNode->pNewNodeReceiver);
1373

1374
  // is config changing
1375
  pSyncNode->changing = false;
13,373✔
1376

1377
  // replication mgr
1378
  if ((code = syncNodeLogReplInit(pSyncNode)) < 0) {
13,373!
1379
    terrno = code;
×
1380
    sError("vgId:%d, failed to init repl mgr since %s.", pSyncNode->vgId, terrstr());
×
1381
    goto _error;
×
1382
  }
1383

1384
  // peer state
1385
  if ((code = syncNodePeerStateInit(pSyncNode)) < 0) {
13,373!
1386
    terrno = code;
×
1387
    sError("vgId:%d, failed to init peer stat since %s.", pSyncNode->vgId, terrstr());
×
1388
    goto _error;
×
1389
  }
1390

1391
  //
1392
  // min match index
1393
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
13,372✔
1394

1395
  // start in syncNodeStart
1396
  // start raft
1397

1398
  int64_t timeNow = taosGetTimestampMs();
13,372✔
1399
  pSyncNode->startTime = timeNow;
13,372✔
1400
  pSyncNode->lastReplicateTime = timeNow;
13,372✔
1401

1402
  // snapshotting
1403
  atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
13,372✔
1404

1405
  // init log buffer
1406
  if ((code = syncLogBufferInit(pSyncNode->pLogBuf, pSyncNode)) < 0) {
13,372!
1407
    terrno = code;
×
1408
    sError("vgId:%d, failed to init sync log buffer since %s", pSyncNode->vgId, terrstr());
×
1409
    goto _error;
×
1410
  }
1411

1412
  pSyncNode->isStart = true;
13,373✔
1413
  pSyncNode->electNum = 0;
13,373✔
1414
  pSyncNode->becomeLeaderNum = 0;
13,373✔
1415
  pSyncNode->becomeAssignedLeaderNum = 0;
13,373✔
1416
  pSyncNode->configChangeNum = 0;
13,373✔
1417
  pSyncNode->hbSlowNum = 0;
13,373✔
1418
  pSyncNode->hbrSlowNum = 0;
13,373✔
1419
  pSyncNode->tmrRoutineNum = 0;
13,373✔
1420

1421
  sNInfo(pSyncNode, "sync node opened, node:%p electInterval:%d heartbeatInterval:%d heartbeatTimeout:%d", pSyncNode,
13,373✔
1422
         tsElectInterval, tsHeartbeatInterval, tsHeartbeatTimeout);
1423
  return pSyncNode;
13,373✔
1424

1425
_error:
×
1426
  if (pSyncInfo->pFsm) {
×
1427
    taosMemoryFree(pSyncInfo->pFsm);
×
1428
    pSyncInfo->pFsm = NULL;
×
1429
  }
1430
  syncNodeClose(pSyncNode);
×
1431
  pSyncNode = NULL;
×
1432
  return NULL;
×
1433
}
1434

1435
#ifdef BUILD_NO_CALL
1436
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
1437
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
1438
    SSnapshot snapshot = {0};
1439
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
1440
    if (snapshot.lastApplyIndex > pSyncNode->commitIndex) {
1441
      pSyncNode->commitIndex = snapshot.lastApplyIndex;
1442
    }
1443
  }
1444
}
1445
#endif
1446

1447
int32_t syncNodeRestore(SSyncNode* pSyncNode) {
13,372✔
1448
  int32_t code = 0;
13,372✔
1449
  if (pSyncNode->pLogStore == NULL) {
13,372!
1450
    sError("vgId:%d, log store not created", pSyncNode->vgId);
×
1451
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1452
  }
1453
  if (pSyncNode->pLogBuf == NULL) {
13,372!
1454
    sError("vgId:%d, ring log buffer not created", pSyncNode->vgId);
×
1455
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1456
  }
1457

1458
  (void)taosThreadMutexLock(&pSyncNode->pLogBuf->mutex);
13,372✔
1459
  SyncIndex lastVer = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
13,372✔
1460
  SyncIndex commitIndex = pSyncNode->pLogStore->syncLogCommitIndex(pSyncNode->pLogStore);
13,371✔
1461
  SyncIndex endIndex = pSyncNode->pLogBuf->endIndex;
13,371✔
1462
  (void)taosThreadMutexUnlock(&pSyncNode->pLogBuf->mutex);
13,371✔
1463

1464
  if (lastVer != -1 && endIndex != lastVer + 1) {
13,372!
1465
    code = TSDB_CODE_WAL_LOG_INCOMPLETE;
×
1466
    sWarn("vgId:%d, failed to restore sync node since %s. expected lastLogIndex:%" PRId64 ", lastVer:%" PRId64 "",
×
1467
          pSyncNode->vgId, terrstr(), endIndex - 1, lastVer);
1468
    // TAOS_RETURN(code);
1469
  }
1470

1471
  // if (endIndex != lastVer + 1) return TSDB_CODE_SYN_INTERNAL_ERROR;
1472
  pSyncNode->commitIndex = TMAX(pSyncNode->commitIndex, commitIndex);
13,372✔
1473
  sInfo("vgId:%d, restore began, and keep syncing until commitIndex:%" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
13,372✔
1474

1475
  if (pSyncNode->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
26,744!
1476
      (code = syncLogBufferCommit(pSyncNode->pLogBuf, pSyncNode, pSyncNode->commitIndex)) < 0) {
13,372✔
1477
    TAOS_RETURN(code);
×
1478
  }
1479

1480
  TAOS_RETURN(code);
13,372✔
1481
}
1482

1483
int32_t syncNodeStart(SSyncNode* pSyncNode) {
13,372✔
1484
  // start raft
1485
  sInfo("vgId:%d, begin to start sync node", pSyncNode->vgId);
13,372✔
1486
  if (pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole == TAOS_SYNC_ROLE_LEARNER) {
13,372✔
1487
    syncNodeBecomeLearner(pSyncNode, "first start");
262✔
1488
  } else {
1489
    if (pSyncNode->replicaNum == 1) {
13,110✔
1490
      raftStoreNextTerm(pSyncNode);
10,241✔
1491
      syncNodeBecomeLeader(pSyncNode, "one replica start");
10,241✔
1492

1493
      // Raft 3.6.2 Committing entries from previous terms
1494
      TAOS_CHECK_RETURN(syncNodeAppendNoop(pSyncNode));
10,241!
1495
    } else {
1496
      SRaftId id = {0};
2,869✔
1497
      syncNodeBecomeFollower(pSyncNode, id, "first start");
2,869✔
1498
    }
1499
  }
1500

1501
  int32_t ret = 0;
13,371✔
1502
  ret = syncNodeStartPingTimer(pSyncNode);
13,371✔
1503
  if (ret != 0) {
13,372!
1504
    sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, tstrerror(ret));
×
1505
  }
1506
  sInfo("vgId:%d, sync node started", pSyncNode->vgId);
13,372✔
1507
  return ret;
13,372✔
1508
}
1509

1510
#ifdef BUILD_NO_CALL
1511
int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) {
1512
  // state change
1513
  int32_t code = 0;
1514
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
1515
  pSyncNode->roleTimeMs = taosGetTimestampMs();
1516
  // TODO check return value
1517
  TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
1518

1519
  // reset elect timer, long enough
1520
  int32_t electMS = TIMER_MAX_MS;
1521
  code = syncNodeRestartElectTimer(pSyncNode, electMS);
1522
  if (code < 0) {
1523
    sError("vgId:%d, failed to restart elect timer since %s", pSyncNode->vgId, terrstr());
1524
    return -1;
1525
  }
1526

1527
  code = syncNodeStartPingTimer(pSyncNode);
1528
  if (code < 0) {
1529
    sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, terrstr());
1530
    return -1;
1531
  }
1532
  return code;
1533
}
1534
#endif
1535

1536
void syncNodePreClose(SSyncNode* pSyncNode) {
13,363✔
1537
  int32_t code = 0;
13,363✔
1538
  if (pSyncNode == NULL) {
13,363!
1539
    sError("failed to pre close sync node since sync node is null");
×
1540
    return;
×
1541
  }
1542
  if (pSyncNode->pFsm == NULL) {
13,363!
1543
    sError("failed to pre close sync node since fsm is null");
×
1544
    return;
×
1545
  }
1546
  if (pSyncNode->pFsm->FpApplyQueueItems == NULL) {
13,363!
1547
    sError("failed to pre close sync node since FpApplyQueueItems is null");
×
1548
    return;
×
1549
  }
1550

1551
  // stop elect timer
1552
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
13,363!
1553
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
1554
    return;
×
1555
  }
1556

1557
  // stop heartbeat timer
1558
  if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
13,363!
1559
    sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
1560
    return;
×
1561
  }
1562

1563
  // stop ping timer
1564
  if ((code = syncNodeStopPingTimer(pSyncNode)) != 0) {
13,363!
1565
    sError("vgId:%d, failed to stop ping timer since %s", pSyncNode->vgId, tstrerror(code));
×
1566
    return;
×
1567
  }
1568

1569
  // clean rsp
1570
  syncRespCleanRsp(pSyncNode->pSyncRespMgr);
13,363✔
1571
}
1572

1573
void syncNodePostClose(SSyncNode* pSyncNode) {
11,601✔
1574
  if (pSyncNode->pNewNodeReceiver != NULL) {
11,601!
1575
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
11,603!
1576
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
1577
    }
1578

1579
    sDebug("vgId:%d, snapshot receiver destroy while preclose sync node, data:%p", pSyncNode->vgId,
11,603✔
1580
           pSyncNode->pNewNodeReceiver);
1581
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
11,603✔
1582
    pSyncNode->pNewNodeReceiver = NULL;
11,602✔
1583
  }
1584
}
11,600✔
1585

1586
void syncHbTimerDataFree(SSyncHbTimerData* pData) { taosMemoryFree(pData); }
2,327!
1587

1588
void syncNodeClose(SSyncNode* pSyncNode) {
13,343✔
1589
  int32_t code = 0;
13,343✔
1590
  if (pSyncNode == NULL) return;
13,343!
1591
  sNInfo(pSyncNode, "sync close, node:%p", pSyncNode);
13,343✔
1592

1593
  syncRespCleanRsp(pSyncNode->pSyncRespMgr);
13,343✔
1594

1595
  if ((code = syncNodeStopPingTimer(pSyncNode)) != 0) {
13,343!
1596
    sError("vgId:%d, failed to stop ping timer since %s", pSyncNode->vgId, tstrerror(code));
×
1597
    return;
×
1598
  }
1599
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
13,343!
1600
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
1601
    return;
×
1602
  }
1603
  if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
13,343!
1604
    sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
1605
    return;
×
1606
  }
1607
  syncNodeLogReplDestroy(pSyncNode);
13,343✔
1608

1609
  syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
13,342✔
1610
  pSyncNode->pSyncRespMgr = NULL;
13,342✔
1611
  voteGrantedDestroy(pSyncNode->pVotesGranted);
13,342✔
1612
  pSyncNode->pVotesGranted = NULL;
13,342✔
1613
  votesRespondDestory(pSyncNode->pVotesRespond);
13,342✔
1614
  pSyncNode->pVotesRespond = NULL;
13,342✔
1615
  syncIndexMgrDestroy(pSyncNode->pNextIndex);
13,342✔
1616
  pSyncNode->pNextIndex = NULL;
13,340✔
1617
  syncIndexMgrDestroy(pSyncNode->pMatchIndex);
13,340✔
1618
  pSyncNode->pMatchIndex = NULL;
13,340✔
1619
  logStoreDestory(pSyncNode->pLogStore);
13,340✔
1620
  pSyncNode->pLogStore = NULL;
13,343✔
1621
  syncLogBufferDestroy(pSyncNode->pLogBuf);
13,343✔
1622
  pSyncNode->pLogBuf = NULL;
13,343✔
1623

1624
  (void)taosThreadMutexDestroy(&pSyncNode->arbTokenMutex);
13,343✔
1625

1626
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
213,435✔
1627
    if (pSyncNode->senders[i] != NULL) {
200,092✔
1628
      sDebug("vgId:%d, snapshot sender destroy while close, data:%p", pSyncNode->vgId, pSyncNode->senders[i]);
200,090✔
1629

1630
      if (snapshotSenderIsStart(pSyncNode->senders[i])) {
200,090!
1631
        snapshotSenderStop(pSyncNode->senders[i], false);
×
1632
      }
1633

1634
      snapshotSenderDestroy(pSyncNode->senders[i]);
200,096✔
1635
      pSyncNode->senders[i] = NULL;
200,106✔
1636
    }
1637
  }
1638

1639
  if (pSyncNode->pNewNodeReceiver != NULL) {
13,343✔
1640
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
1,740!
1641
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
1642
    }
1643

1644
    sDebug("vgId:%d, snapshot receiver destroy while close, data:%p", pSyncNode->vgId, pSyncNode->pNewNodeReceiver);
1,740✔
1645
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
1,740✔
1646
    pSyncNode->pNewNodeReceiver = NULL;
1,740✔
1647
  }
1648

1649
  if (pSyncNode->pFsm != NULL) {
13,343!
1650
    taosMemoryFree(pSyncNode->pFsm);
13,343!
1651
  }
1652

1653
  raftStoreClose(pSyncNode);
13,343✔
1654

1655
  taosMemoryFree(pSyncNode);
13,343!
1656
}
1657

1658
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->raftCfg.snapshotStrategy; }
×
1659

1660
// timer control --------------
1661
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
13,371✔
1662
  int32_t code = 0;
13,371✔
1663
  if (syncIsInit()) {
13,371!
1664
    bool stopped = taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, (void*)pSyncNode->rid,
13,372✔
1665
                                syncEnv()->pTimerManager, &pSyncNode->pPingTimer);
13,372✔
1666
    if (stopped) {
13,372!
1667
      sError("vgId:%d, failed to reset ping timer, ms:%d", pSyncNode->vgId, pSyncNode->pingTimerMS);
×
1668
      return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1669
    }
1670
    atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
13,372✔
1671
  } else {
1672
    sError("vgId:%d, start ping timer error, sync env is stop", pSyncNode->vgId);
×
1673
  }
1674
  return code;
13,372✔
1675
}
1676

1677
int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
26,706✔
1678
  int32_t code = 0;
26,706✔
1679
  (void)atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
26,706✔
1680
  bool stop = taosTmrStop(pSyncNode->pPingTimer);
26,706✔
1681
  sDebug("vgId:%d, stop ping timer, stop:%d", pSyncNode->vgId, stop);
26,706✔
1682
  pSyncNode->pPingTimer = NULL;
26,706✔
1683
  return code;
26,706✔
1684
}
1685

1686
int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
379,568✔
1687
  int32_t code = 0;
379,568✔
1688
  if (syncIsInit()) {
379,568!
1689
    pSyncNode->electTimerMS = ms;
379,568✔
1690

1691
    int64_t execTime = taosGetTimestampMs() + ms;
379,568✔
1692
    atomic_store_64(&(pSyncNode->electTimerParam.executeTime), execTime);
379,568✔
1693
    atomic_store_64(&(pSyncNode->electTimerParam.logicClock), pSyncNode->electTimerLogicClock);
379,567✔
1694
    pSyncNode->electTimerParam.pSyncNode = pSyncNode;
379,567✔
1695
    pSyncNode->electTimerParam.pData = NULL;
379,567✔
1696

1697
    bool stopped = taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, (void*)(pSyncNode->rid),
379,567✔
1698
                                syncEnv()->pTimerManager, &pSyncNode->pElectTimer);
379,567✔
1699
    if (stopped) sError("vgId:%d, failed to reset elect timer, ms:%d", pSyncNode->vgId, ms);
379,567!
1700
  } else {
1701
    sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
×
1702
  }
1703
  return code;
379,567✔
1704
}
1705

1706
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
417,471✔
1707
  int32_t code = 0;
417,471✔
1708
  (void)atomic_add_fetch_64(&pSyncNode->electTimerLogicClock, 1);
417,471✔
1709
  bool stop = taosTmrStop(pSyncNode->pElectTimer);
417,471✔
1710
  sDebug("vgId:%d, stop elect timer, stop:%d", pSyncNode->vgId, stop);
417,471✔
1711
  pSyncNode->pElectTimer = NULL;
417,471✔
1712

1713
  return code;
417,471✔
1714
}
1715

1716
int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
379,568✔
1717
  int32_t ret = 0;
379,568✔
1718
  TAOS_CHECK_RETURN(syncNodeStopElectTimer(pSyncNode));
379,568!
1719
  TAOS_CHECK_RETURN(syncNodeStartElectTimer(pSyncNode, ms));
379,568!
1720
  return ret;
379,567✔
1721
}
1722

1723
void syncNodeResetElectTimer(SSyncNode* pSyncNode) {
379,567✔
1724
  int32_t code = 0;
379,567✔
1725
  int32_t electMS;
1726

1727
  if (pSyncNode->raftCfg.isStandBy) {
379,567!
1728
    electMS = TIMER_MAX_MS;
×
1729
  } else {
1730
    electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
379,567✔
1731
  }
1732

1733
  if ((code = syncNodeRestartElectTimer(pSyncNode, electMS)) != 0) {
379,568!
1734
    sWarn("vgId:%d, failed to restart elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
1735
    return;
×
1736
  };
1737

1738
  sNTrace(pSyncNode, "reset elect timer, min:%d, max:%d, ms:%d", pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine,
379,567!
1739
          electMS);
1740
}
1741

1742
#ifdef BUILD_NO_CALL
1743
static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) {
1744
  int32_t code = 0;
1745
  if (syncIsInit()) {
1746
    TAOS_CHECK_RETURN(taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, (void*)pSyncNode->rid,
1747
                                   syncEnv()->pTimerManager, &pSyncNode->pHeartbeatTimer));
1748
    atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
1749
  } else {
1750
    sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId);
1751
  }
1752

1753
  sNTrace(pSyncNode, "start heartbeat timer, ms:%d", pSyncNode->heartbeatTimerMS);
1754
  return code;
1755
}
1756
#endif
1757

1758
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
12,654✔
1759
  int32_t ret = 0;
12,654✔
1760

1761
#if 0
1762
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
1763
  ret = syncNodeDoStartHeartbeatTimer(pSyncNode);
1764
#endif
1765

1766
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
14,984✔
1767
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
2,330✔
1768
    if (pSyncTimer != NULL) {
2,330!
1769
      TAOS_CHECK_RETURN(syncHbTimerStart(pSyncNode, pSyncTimer));
2,330!
1770
    }
1771
  }
1772

1773
  return ret;
12,654✔
1774
}
1775

1776
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
33,099✔
1777
  int32_t code = 0;
33,099✔
1778

1779
#if 0
1780
  TAOS_CHECK_RETURN(atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1));
1781
  bool stop = taosTmrStop(pSyncNode->pHeartbeatTimer);
1782
  sDebug("vgId:%d, stop heartbeat timer, stop:%d", pSyncNode->vgId, stop);
1783
  pSyncNode->pHeartbeatTimer = NULL;
1784
#endif
1785

1786
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
57,142✔
1787
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
24,044✔
1788
    if (pSyncTimer != NULL) {
24,042!
1789
      TAOS_CHECK_RETURN(syncHbTimerStop(pSyncNode, pSyncTimer));
24,042!
1790
    }
1791
  }
1792

1793
  return code;
33,098✔
1794
}
1795

1796
#ifdef BUILD_NO_CALL
1797
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) {
1798
  // TODO check return value
1799
  int32_t code = 0;
1800
  TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
1801
  TAOS_CHECK_RETURN(syncNodeStartHeartbeatTimer(pSyncNode));
1802
  return 0;
1803
}
1804
#endif
1805

1806
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pNode, SRpcMsg* pMsg) {
2,447,029✔
1807
  SEpSet* epSet = NULL;
2,447,029✔
1808
  for (int32_t i = 0; i < pNode->peersNum; ++i) {
2,823,495✔
1809
    if (destRaftId->addr == pNode->peersId[i].addr) {
2,823,431✔
1810
      epSet = &pNode->peersEpset[i];
2,446,965✔
1811
      break;
2,446,965✔
1812
    }
1813
  }
1814

1815
  int32_t code = -1;
2,447,029✔
1816
  if (pNode->syncSendMSg != NULL && epSet != NULL) {
2,447,029!
1817
    syncUtilMsgHtoN(pMsg->pCont);
2,446,968✔
1818
    pMsg->info.noResp = 1;
2,446,951✔
1819
    code = pNode->syncSendMSg(epSet, pMsg);
2,446,951✔
1820
  }
1821

1822
  if (code < 0) {
2,447,058✔
1823
    sError("vgId:%d, failed to send sync msg since %s. epset:%p dnode:%d addr:%" PRId64, pNode->vgId, tstrerror(code),
84!
1824
           epSet, DID(destRaftId), destRaftId->addr);
1825
    rpcFreeCont(pMsg->pCont);
84✔
1826
  }
1827

1828
  TAOS_RETURN(code);
2,447,058✔
1829
}
1830

1831
inline bool syncNodeInConfig(SSyncNode* pNode, const SSyncCfg* pCfg) {
2,258✔
1832
  bool b1 = false;
2,258✔
1833
  bool b2 = false;
2,258✔
1834

1835
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
2,734!
1836
    if (strcmp(pCfg->nodeInfo[i].nodeFqdn, pNode->myNodeInfo.nodeFqdn) == 0 &&
2,734!
1837
        pCfg->nodeInfo[i].nodePort == pNode->myNodeInfo.nodePort) {
2,734✔
1838
      b1 = true;
2,258✔
1839
      break;
2,258✔
1840
    }
1841
  }
1842

1843
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
2,734!
1844
    SRaftId raftId = {
2,734✔
1845
        .addr = SYNC_ADDR(&pCfg->nodeInfo[i]),
2,734✔
1846
        .vgId = pNode->vgId,
2,734✔
1847
    };
1848

1849
    if (syncUtilSameId(&raftId, &pNode->myRaftId)) {
2,734✔
1850
      b2 = true;
2,258✔
1851
      break;
2,258✔
1852
    }
1853
  }
1854

1855
  if (b1 != b2) {
2,258!
1856
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1857
    return false;
×
1858
  }
1859
  return b1;
2,258✔
1860
}
1861

1862
static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg) {
3,038✔
1863
  if (pOldCfg->totalReplicaNum != pNewCfg->totalReplicaNum) return true;
3,038✔
1864
  if (pOldCfg->myIndex != pNewCfg->myIndex) return true;
2,059✔
1865
  for (int32_t i = 0; i < pOldCfg->totalReplicaNum; ++i) {
4,954✔
1866
    const SNodeInfo* pOldInfo = &pOldCfg->nodeInfo[i];
3,602✔
1867
    const SNodeInfo* pNewInfo = &pNewCfg->nodeInfo[i];
3,602✔
1868
    if (strcmp(pOldInfo->nodeFqdn, pNewInfo->nodeFqdn) != 0) return true;
3,602!
1869
    if (pOldInfo->nodePort != pNewInfo->nodePort) return true;
3,602✔
1870
    if (pOldInfo->nodeRole != pNewInfo->nodeRole) return true;
3,600✔
1871
  }
1872

1873
  return false;
1,352✔
1874
}
1875

1876
int32_t syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) {
1,654✔
1877
  int32_t  code = 0;
1,654✔
1878
  SSyncCfg oldConfig = pSyncNode->raftCfg.cfg;
1,654✔
1879
  if (!syncIsConfigChanged(&oldConfig, pNewConfig)) {
1,654✔
1880
    sInfo("vgId:1, sync not reconfig since not changed");
1,352✔
1881
    return 0;
1,352✔
1882
  }
1883

1884
  pSyncNode->raftCfg.cfg = *pNewConfig;
302✔
1885
  pSyncNode->raftCfg.lastConfigIndex = lastConfigChangeIndex;
302✔
1886

1887
  pSyncNode->configChangeNum++;
302✔
1888

1889
  bool IamInOld = syncNodeInConfig(pSyncNode, &oldConfig);
302✔
1890
  bool IamInNew = syncNodeInConfig(pSyncNode, pNewConfig);
302✔
1891

1892
  bool isDrop = false;
302✔
1893
  bool isAdd = false;
302✔
1894

1895
  if (IamInOld && !IamInNew) {
302!
1896
    isDrop = true;
×
1897
  } else {
1898
    isDrop = false;
302✔
1899
  }
1900

1901
  if (!IamInOld && IamInNew) {
302!
1902
    isAdd = true;
×
1903
  } else {
1904
    isAdd = false;
302✔
1905
  }
1906

1907
  // log begin config change
1908
  sNInfo(pSyncNode, "begin do config change, from %d to %d, from %" PRId64 " to %" PRId64 ", replicas:%d",
302!
1909
         pSyncNode->vgId, oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum, oldConfig.lastIndex,
1910
         pNewConfig->lastIndex);
1911

1912
  if (IamInNew) {
302!
1913
    pSyncNode->raftCfg.isStandBy = 0;  // change isStandBy to normal
302✔
1914
  }
1915
  if (isDrop) {
302!
1916
    pSyncNode->raftCfg.isStandBy = 1;  // set standby
×
1917
  }
1918

1919
  // add last config index
1920
  SRaftCfg* pCfg = &pSyncNode->raftCfg;
302✔
1921
  if (pCfg->configIndexCount >= MAX_CONFIG_INDEX_COUNT) {
302!
1922
    sNError(pSyncNode, "failed to add cfg index:%d since out of range", pCfg->configIndexCount);
×
1923
    terrno = TSDB_CODE_OUT_OF_RANGE;
×
1924
    return -1;
×
1925
  }
1926

1927
  pCfg->configIndexArr[pCfg->configIndexCount] = lastConfigChangeIndex;
302✔
1928
  pCfg->configIndexCount++;
302✔
1929

1930
  if (IamInNew) {
302!
1931
    //-----------------------------------------
1932
    int32_t ret = 0;
302✔
1933

1934
    // save snapshot senders
1935
    SRaftId oldReplicasId[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
1936
    memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId));
302✔
1937
    SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
1938
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
4,832✔
1939
      oldSenders[i] = pSyncNode->senders[i];
4,530✔
1940
      sSTrace(oldSenders[i], "snapshot sender save old");
4,530!
1941
    }
1942

1943
    // init internal
1944
    pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
302✔
1945
    if (syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId) == false) return terrno;
302!
1946

1947
    // init peersNum, peers, peersId
1948
    pSyncNode->peersNum = pSyncNode->raftCfg.cfg.totalReplicaNum - 1;
302✔
1949
    int32_t j = 0;
302✔
1950
    for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
1,094✔
1951
      if (i != pSyncNode->raftCfg.cfg.myIndex) {
792✔
1952
        pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
490✔
1953
        syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
490✔
1954
        j++;
490✔
1955
      }
1956
    }
1957
    for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
792✔
1958
      if (syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]) == false)
490!
1959
        return terrno;
×
1960
    }
1961

1962
    // init replicaNum, replicasId
1963
    pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
302✔
1964
    pSyncNode->totalReplicaNum = pSyncNode->raftCfg.cfg.totalReplicaNum;
302✔
1965
    for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
1,094✔
1966
      if (syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]) ==
792!
1967
          false)
1968
        return terrno;
×
1969
    }
1970

1971
    // update quorum first
1972
    pSyncNode->quorum = syncUtilQuorum(pSyncNode->raftCfg.cfg.replicaNum);
302✔
1973

1974
    syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode);
302✔
1975
    syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode);
302✔
1976
    voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode);
302✔
1977
    votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode);
302✔
1978

1979
    // reset snapshot senders
1980

1981
    // clear new
1982
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
4,832✔
1983
      pSyncNode->senders[i] = NULL;
4,530✔
1984
    }
1985

1986
    // reset new
1987
    for (int32_t i = 0; i < pSyncNode->totalReplicaNum; ++i) {
1,094✔
1988
      // reset sender
1989
      bool reset = false;
792✔
1990
      for (int32_t j = 0; j < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++j) {
3,390✔
1991
        if (syncUtilSameId(&(pSyncNode->replicasId)[i], &oldReplicasId[j]) && oldSenders[j] != NULL) {
3,247!
1992
          sNTrace(pSyncNode, "snapshot sender reset for:%" PRId64 ", newIndex:%d, dnode:%d, %p",
649!
1993
                  (pSyncNode->replicasId)[i].addr, i, DID(&pSyncNode->replicasId[i]), oldSenders[j]);
1994

1995
          pSyncNode->senders[i] = oldSenders[j];
649✔
1996
          oldSenders[j] = NULL;
649✔
1997
          reset = true;
649✔
1998

1999
          // reset replicaIndex
2000
          int32_t oldreplicaIndex = pSyncNode->senders[i]->replicaIndex;
649✔
2001
          pSyncNode->senders[i]->replicaIndex = i;
649✔
2002

2003
          sNTrace(pSyncNode, "snapshot sender udpate replicaIndex from %d to %d, dnode:%d, %p, reset:%d",
649!
2004
                  oldreplicaIndex, i, DID(&pSyncNode->replicasId[i]), pSyncNode->senders[i], reset);
2005

2006
          break;
649✔
2007
        }
2008
      }
2009
    }
2010

2011
    // create new
2012
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
4,832✔
2013
      if (pSyncNode->senders[i] == NULL) {
4,530✔
2014
        TAOS_CHECK_RETURN(snapshotSenderCreate(pSyncNode, i, &pSyncNode->senders[i]));
3,881!
2015
        if (pSyncNode->senders[i] == NULL) {
3,881!
2016
          // will be created later while send snapshot
2017
          sSError(pSyncNode->senders[i], "snapshot sender create failed while reconfig");
×
2018
        } else {
2019
          sSDebug(pSyncNode->senders[i], "snapshot sender create while reconfig, data:%p", pSyncNode->senders[i]);
3,881!
2020
        }
2021
      } else {
2022
        sSDebug(pSyncNode->senders[i], "snapshot sender already exist, data:%p", pSyncNode->senders[i]);
649!
2023
      }
2024
    }
2025

2026
    // free old
2027
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
4,832✔
2028
      if (oldSenders[i] != NULL) {
4,530✔
2029
        sSDebug(oldSenders[i], "snapshot sender destroy old, data:%p replica-index:%d", oldSenders[i], i);
3,881!
2030
        snapshotSenderDestroy(oldSenders[i]);
3,881✔
2031
        oldSenders[i] = NULL;
3,881✔
2032
      }
2033
    }
2034

2035
    // persist cfg
2036
    TAOS_CHECK_RETURN(syncWriteCfgFile(pSyncNode));
302!
2037
  } else {
2038
    // persist cfg
2039
    TAOS_CHECK_RETURN(syncWriteCfgFile(pSyncNode));
×
2040
    sNInfo(pSyncNode, "do not config change from %d to %d", oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum);
×
2041
  }
2042

2043
_END:
×
2044
  // log end config change
2045
  sNInfo(pSyncNode, "end do config change, from %d to %d", oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum);
302!
2046
  return 0;
302✔
2047
}
2048

2049
// raft state change --------------
2050
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
6,438✔
2051
  if (term > raftStoreGetTerm(pSyncNode)) {
6,438!
2052
    raftStoreSetTerm(pSyncNode, term);
×
2053
  }
2054
}
6,438✔
2055

2056
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm, SRaftId id) {
331,923✔
2057
  SyncTerm currentTerm = raftStoreGetTerm(pSyncNode);
331,923✔
2058
  if (currentTerm > newTerm) {
331,927!
2059
    sNTrace(pSyncNode, "step down, ignore, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
×
2060
    return;
×
2061
  }
2062

2063
  do {
2064
    sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
331,927!
2065
  } while (0);
2066

2067
  if (pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
331,927!
2068
    (void)taosThreadMutexLock(&pSyncNode->arbTokenMutex);
×
2069
    syncUtilGenerateArbToken(pSyncNode->myNodeInfo.nodeId, pSyncNode->vgId, pSyncNode->arbToken);
×
2070
    sInfo("vgId:%d, step down as assigned leader, new arbToken:%s", pSyncNode->vgId, pSyncNode->arbToken);
×
2071
    (void)taosThreadMutexUnlock(&pSyncNode->arbTokenMutex);
×
2072
  }
2073

2074
  if (currentTerm < newTerm) {
331,922✔
2075
    raftStoreSetTerm(pSyncNode, newTerm);
2,047✔
2076
    char tmpBuf[64];
2077
    snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm);
2,047✔
2078
    syncNodeBecomeFollower(pSyncNode, id, tmpBuf);
2,047✔
2079
    raftStoreClearVote(pSyncNode);
2,047✔
2080
  } else {
2081
    if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
329,875✔
2082
      syncNodeBecomeFollower(pSyncNode, id, "step down");
6✔
2083
    }
2084
  }
2085
}
2086

2087
void syncNodeLeaderChangeRsp(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); }
4,936✔
2088

2089
void syncNodeBecomeFollower(SSyncNode* pSyncNode, SRaftId leaderId, const char* debugStr) {
4,936✔
2090
  int32_t code = 0;  // maybe clear leader cache
4,936✔
2091
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
4,936✔
2092
    pSyncNode->leaderCache = EMPTY_RAFT_ID;
20✔
2093
  }
2094

2095
  pSyncNode->hbSlowNum = 0;
4,936✔
2096

2097
  pSyncNode->leaderCache = leaderId;  // state change
4,936✔
2098
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
4,936✔
2099
  pSyncNode->roleTimeMs = taosGetTimestampMs();
4,936✔
2100
  if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
4,936!
2101
    sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
2102
    return;
×
2103
  }
2104

2105
  // trace log
2106
  sNTrace(pSyncNode, "become follower %s", debugStr);
4,936!
2107

2108
  // send rsp to client
2109
  syncNodeLeaderChangeRsp(pSyncNode);
4,936✔
2110

2111
  // call back
2112
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeFollowerCb != NULL) {
4,936!
2113
    pSyncNode->pFsm->FpBecomeFollowerCb(pSyncNode->pFsm);
4,936✔
2114
  }
2115

2116
  // min match index
2117
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
4,936✔
2118

2119
  // reset log buffer
2120
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
4,936!
2121
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2122
    return;
×
2123
  }
2124

2125
  // reset elect timer
2126
  syncNodeResetElectTimer(pSyncNode);
4,936✔
2127

2128
  sInfo("vgId:%d, become follower. %s", pSyncNode->vgId, debugStr);
4,936!
2129
}
2130

2131
void syncNodeBecomeLearner(SSyncNode* pSyncNode, const char* debugStr) {
262✔
2132
  pSyncNode->hbSlowNum = 0;
262✔
2133

2134
  // state change
2135
  pSyncNode->state = TAOS_SYNC_STATE_LEARNER;
262✔
2136
  pSyncNode->roleTimeMs = taosGetTimestampMs();
262✔
2137

2138
  // trace log
2139
  sNTrace(pSyncNode, "become learner %s", debugStr);
262!
2140

2141
  // call back
2142
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLearnerCb != NULL) {
262!
2143
    pSyncNode->pFsm->FpBecomeLearnerCb(pSyncNode->pFsm);
262✔
2144
  }
2145

2146
  // min match index
2147
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
262✔
2148

2149
  // reset log buffer
2150
  int32_t code = 0;
262✔
2151
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
262!
2152
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2153
    return;
×
2154
  };
2155
}
2156

2157
// TLA+ Spec
2158
// \* Candidate i transitions to leader.
2159
// BecomeLeader(i) ==
2160
//     /\ state[i] = Candidate
2161
//     /\ votesGranted[i] \in Quorum
2162
//     /\ state'      = [state EXCEPT ![i] = Leader]
2163
//     /\ nextIndex'  = [nextIndex EXCEPT ![i] =
2164
//                          [j \in Server |-> Len(log[i]) + 1]]
2165
//     /\ matchIndex' = [matchIndex EXCEPT ![i] =
2166
//                          [j \in Server |-> 0]]
2167
//     /\ elections'  = elections \cup
2168
//                          {[eterm     |-> currentTerm[i],
2169
//                            eleader   |-> i,
2170
//                            elog      |-> log[i],
2171
//                            evotes    |-> votesGranted[i],
2172
//                            evoterLog |-> voterLog[i]]}
2173
//     /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
2174
//
2175
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
11,197✔
2176
  int32_t code = 0;
11,197✔
2177
  pSyncNode->becomeLeaderNum++;
11,197✔
2178
  pSyncNode->hbrSlowNum = 0;
11,197✔
2179

2180
  // reset restoreFinish
2181
  pSyncNode->restoreFinish = false;
11,197✔
2182

2183
  // state change
2184
  pSyncNode->state = TAOS_SYNC_STATE_LEADER;
11,197✔
2185
  pSyncNode->roleTimeMs = taosGetTimestampMs();
11,197✔
2186

2187
  // set leader cache
2188
  pSyncNode->leaderCache = pSyncNode->myRaftId;
11,197✔
2189

2190
  for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
24,271✔
2191
    SyncIndex lastIndex;
2192
    SyncTerm  lastTerm;
2193
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
13,075✔
2194
    if (code != 0) {
13,075✔
2195
      sError("vgId:%d, failed to become leader since %s", pSyncNode->vgId, tstrerror(code));
1!
2196
      return;
×
2197
    }
2198
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
13,074✔
2199
  }
2200

2201
  for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
24,270✔
2202
    // maybe overwrite myself, no harm
2203
    // just do it!
2204
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
13,074✔
2205
  }
2206

2207
  // init peer mgr
2208
  if ((code = syncNodePeerStateInit(pSyncNode)) != 0) {
11,196!
2209
    sError("vgId:%d, failed to init peer state since %s", pSyncNode->vgId, tstrerror(code));
×
2210
    return;
×
2211
  }
2212

2213
#if 0
2214
  // update sender private term
2215
  SSyncSnapshotSender* pMySender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->myRaftId));
2216
  if (pMySender != NULL) {
2217
    for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
2218
      if (pSyncNode->senders[i]->privateTerm > pMySender->privateTerm) {
2219
        pMySender->privateTerm = pSyncNode->senders[i]->privateTerm;
2220
      }
2221
    }
2222
    (pMySender->privateTerm) += 100;
2223
  }
2224
#endif
2225

2226
  // close receiver
2227
  if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
11,197!
2228
    snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
2229
  }
2230

2231
  // stop elect timer
2232
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
11,197!
2233
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
2234
    return;
×
2235
  }
2236

2237
  // start heartbeat timer
2238
  if ((code = syncNodeStartHeartbeatTimer(pSyncNode)) != 0) {
11,197!
2239
    sError("vgId:%d, failed to start heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
2240
    return;
×
2241
  }
2242

2243
  // send heartbeat right now
2244
  if ((code = syncNodeHeartbeatPeers(pSyncNode)) != 0) {
11,197!
2245
    sError("vgId:%d, failed to send heartbeat to peers since %s", pSyncNode->vgId, tstrerror(code));
×
2246
    return;
×
2247
  }
2248

2249
  // call back
2250
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLeaderCb != NULL) {
11,197!
2251
    pSyncNode->pFsm->FpBecomeLeaderCb(pSyncNode->pFsm);
11,197✔
2252
  }
2253

2254
  // min match index
2255
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
11,197✔
2256

2257
  // reset log buffer
2258
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
11,197!
2259
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2260
    return;
×
2261
  }
2262

2263
  // trace log
2264
  sNInfo(pSyncNode, "become leader %s", debugStr);
11,197✔
2265
}
2266

2267
void syncNodeBecomeAssignedLeader(SSyncNode* pSyncNode) {
×
2268
  int32_t code = 0;
×
2269
  pSyncNode->becomeAssignedLeaderNum++;
×
2270
  pSyncNode->hbrSlowNum = 0;
×
2271

2272
  // reset restoreFinish
2273
  // pSyncNode->restoreFinish = false;
2274

2275
  // state change
2276
  pSyncNode->state = TAOS_SYNC_STATE_ASSIGNED_LEADER;
×
2277
  pSyncNode->roleTimeMs = taosGetTimestampMs();
×
2278

2279
  // set leader cache
2280
  pSyncNode->leaderCache = pSyncNode->myRaftId;
×
2281

2282
  for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
×
2283
    SyncIndex lastIndex;
2284
    SyncTerm  lastTerm;
2285
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
×
2286
    if (code != 0) {
×
2287
      sError("vgId:%d, failed to become assigned leader since %s", pSyncNode->vgId, tstrerror(code));
×
2288
      return;
×
2289
    }
2290
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
×
2291
  }
2292

2293
  for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
×
2294
    // maybe overwrite myself, no harm
2295
    // just do it!
2296
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
×
2297
  }
2298

2299
  // init peer mgr
2300
  if ((code = syncNodePeerStateInit(pSyncNode)) != 0) {
×
2301
    sError("vgId:%d, failed to init peer state since %s", pSyncNode->vgId, tstrerror(code));
×
2302
    return;
×
2303
  }
2304

2305
  // close receiver
2306
  if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
×
2307
    snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
2308
  }
2309

2310
  // stop elect timer
2311
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
×
2312
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
2313
    return;
×
2314
  }
2315

2316
  // start heartbeat timer
2317
  if ((code = syncNodeStartHeartbeatTimer(pSyncNode)) != 0) {
×
2318
    sError("vgId:%d, failed to start heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
2319
    return;
×
2320
  }
2321

2322
  // send heartbeat right now
2323
  if ((code = syncNodeHeartbeatPeers(pSyncNode)) != 0) {
×
2324
    sError("vgId:%d, failed to send heartbeat to peers since %s", pSyncNode->vgId, tstrerror(code));
×
2325
    return;
×
2326
  }
2327

2328
  // call back
2329
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeAssignedLeaderCb != NULL) {
×
2330
    pSyncNode->pFsm->FpBecomeAssignedLeaderCb(pSyncNode->pFsm);
×
2331
  }
2332

2333
  // min match index
2334
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
×
2335

2336
  // reset log buffer
2337
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
×
2338
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2339
    return;
×
2340
  }
2341

2342
  // trace log
2343
  sNInfo(pSyncNode, "become assigned leader");
×
2344
}
2345

2346
void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
956✔
2347
  if (pSyncNode->state != TAOS_SYNC_STATE_CANDIDATE) {
956!
2348
    sError("vgId:%d, failed leader from candidate since node state is wrong:%d", pSyncNode->vgId, pSyncNode->state);
×
2349
    return;
×
2350
  }
2351
  bool granted = voteGrantedMajority(pSyncNode->pVotesGranted);
956✔
2352
  if (!granted) {
956!
2353
    sError("vgId:%d, not granted by majority.", pSyncNode->vgId);
×
2354
    return;
×
2355
  }
2356
  syncNodeBecomeLeader(pSyncNode, "candidate to leader");
956✔
2357

2358
  sNTrace(pSyncNode, "state change syncNodeCandidate2Leader");
956!
2359

2360
  int32_t ret = syncNodeAppendNoop(pSyncNode);
956✔
2361
  if (ret < 0) {
956!
2362
    sError("vgId:%d, failed to append noop entry since %s", pSyncNode->vgId, terrstr());
×
2363
  }
2364

2365
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
956✔
2366

2367
  sInfo("vgId:%d, become leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64 "", pSyncNode->vgId,
956!
2368
        raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
2369
}
2370

2371
bool syncNodeIsMnode(SSyncNode* pSyncNode) { return (pSyncNode->vgId == 1); }
432,347✔
2372

2373
int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) {
24,560✔
2374
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
393,074✔
2375
    pSyncNode->peerStates[i].lastSendIndex = SYNC_INDEX_INVALID;
368,514✔
2376
    pSyncNode->peerStates[i].lastSendTime = 0;
368,514✔
2377
  }
2378

2379
  return 0;
24,560✔
2380
}
2381

2382
void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
1,182✔
2383
  if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
1,182!
2384
    sError("vgId:%d, failed candidate from follower since node state is wrong:%d", pSyncNode->vgId, pSyncNode->state);
×
2385
    return;
×
2386
  }
2387
  pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
1,182✔
2388
  pSyncNode->roleTimeMs = taosGetTimestampMs();
1,182✔
2389
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
1,182✔
2390
  sInfo("vgId:%d, become candidate from follower. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
1,182!
2391
        pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
2392

2393
  sNTrace(pSyncNode, "follower to candidate");
1,182!
2394
}
2395

2396
int32_t syncNodeAssignedLeader2Leader(SSyncNode* pSyncNode) {
×
2397
  if (pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) return TSDB_CODE_SYN_INTERNAL_ERROR;
×
2398
  syncNodeBecomeLeader(pSyncNode, "assigned leader to leader");
×
2399

2400
  sNTrace(pSyncNode, "assigned leader to leader");
×
2401

2402
  int32_t ret = syncNodeAppendNoop(pSyncNode);
×
2403
  if (ret < 0) {
×
2404
    sError("vgId:%d, failed to append noop entry since %s", pSyncNode->vgId, tstrerror(ret));
×
2405
  }
2406

2407
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
×
2408
  sInfo("vgId:%d, become leader from assigned leader. term:%" PRId64 ", commit index:%" PRId64
×
2409
        "assigned commit index:%" PRId64 ", last index:%" PRId64,
2410
        pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, pSyncNode->assignedCommitIndex,
2411
        lastIndex);
2412
  return 0;
×
2413
}
2414

2415
// just called by syncNodeVoteForSelf
2416
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
1,252✔
2417
  SyncTerm storeTerm = raftStoreGetTerm(pSyncNode);
1,252✔
2418
  if (term != storeTerm) {
1,252!
2419
    sError("vgId:%d, failed to vote for term, term:%" PRId64 ", storeTerm:%" PRId64, pSyncNode->vgId, term, storeTerm);
×
2420
    return;
×
2421
  }
2422
  bool voted = raftStoreHasVoted(pSyncNode);
1,252✔
2423
  if (voted) {
1,252!
2424
    sError("vgId:%d, failed to vote for term since not voted", pSyncNode->vgId);
×
2425
    return;
×
2426
  }
2427

2428
  raftStoreVote(pSyncNode, pRaftId);
1,252✔
2429
}
2430

2431
// simulate get vote from outside
2432
void syncNodeVoteForSelf(SSyncNode* pSyncNode, SyncTerm currentTerm) {
1,252✔
2433
  syncNodeVoteForTerm(pSyncNode, currentTerm, &pSyncNode->myRaftId);
1,252✔
2434

2435
  SRpcMsg rpcMsg = {0};
1,252✔
2436
  int32_t ret = syncBuildRequestVoteReply(&rpcMsg, pSyncNode->vgId);
1,252✔
2437
  if (ret != 0) return;
1,252!
2438

2439
  SyncRequestVoteReply* pMsg = rpcMsg.pCont;
1,252✔
2440
  pMsg->srcId = pSyncNode->myRaftId;
1,252✔
2441
  pMsg->destId = pSyncNode->myRaftId;
1,252✔
2442
  pMsg->term = currentTerm;
1,252✔
2443
  pMsg->voteGranted = true;
1,252✔
2444

2445
  voteGrantedVote(pSyncNode->pVotesGranted, pMsg);
1,252✔
2446
  votesRespondAdd(pSyncNode->pVotesRespond, pMsg);
1,252✔
2447
  rpcFreeCont(rpcMsg.pCont);
1,252✔
2448
}
2449

2450
// return if has a snapshot
2451
bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
17,557✔
2452
  bool      ret = false;
17,557✔
2453
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
17,557✔
2454
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
17,557!
2455
    // TODO check return value
2456
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
17,557✔
2457
    if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) {
17,557✔
2458
      ret = true;
2,132✔
2459
    }
2460
  }
2461
  return ret;
17,557✔
2462
}
2463

2464
// return max(logLastIndex, snapshotLastIndex)
2465
// if no snapshot and log, return -1
2466
SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode) {
17,559✔
2467
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
17,559✔
2468
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
17,559!
2469
    // TODO check return value
2470
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
17,559✔
2471
  }
2472
  SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
17,559✔
2473

2474
  SyncIndex lastIndex = logLastIndex > snapshot.lastApplyIndex ? logLastIndex : snapshot.lastApplyIndex;
17,559✔
2475
  return lastIndex;
17,559✔
2476
}
2477

2478
// return the last term of snapshot and log
2479
// if error, return SYNC_TERM_INVALID (by syncLogLastTerm)
2480
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) {
17,557✔
2481
  SyncTerm lastTerm = 0;
17,557✔
2482
  if (syncNodeHasSnapshot(pSyncNode)) {
17,557✔
2483
    // has snapshot
2484
    SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
2,132✔
2485
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
2,132!
2486
      // TODO check return value
2487
      (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
2,132✔
2488
    }
2489

2490
    SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
2,132✔
2491
    if (logLastIndex > snapshot.lastApplyIndex) {
2,132✔
2492
      lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
1,335✔
2493
    } else {
2494
      lastTerm = snapshot.lastApplyTerm;
797✔
2495
    }
2496

2497
  } else {
2498
    // no snapshot
2499
    lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
15,425✔
2500
  }
2501

2502
  return lastTerm;
17,557✔
2503
}
2504

2505
// get last index and term along with snapshot
2506
int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm) {
15,467✔
2507
  *pLastIndex = syncNodeGetLastIndex(pSyncNode);
15,467✔
2508
  *pLastTerm = syncNodeGetLastTerm(pSyncNode);
15,467✔
2509
  return 0;
15,466✔
2510
}
2511

2512
#ifdef BUILD_NO_CALL
2513
// return append-entries first try index
2514
SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) {
2515
  SyncIndex syncStartIndex = syncNodeGetLastIndex(pSyncNode) + 1;
2516
  return syncStartIndex;
2517
}
2518

2519
// if index > 0, return index - 1
2520
// else, return -1
2521
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
2522
  SyncIndex preIndex = index - 1;
2523
  if (preIndex < SYNC_INDEX_INVALID) {
2524
    preIndex = SYNC_INDEX_INVALID;
2525
  }
2526

2527
  return preIndex;
2528
}
2529

2530
// if index < 0, return SYNC_TERM_INVALID
2531
// if index == 0, return 0
2532
// if index > 0, return preTerm
2533
// if error, return SYNC_TERM_INVALID
2534
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
2535
  if (index < SYNC_INDEX_BEGIN) {
2536
    return SYNC_TERM_INVALID;
2537
  }
2538

2539
  if (index == SYNC_INDEX_BEGIN) {
2540
    return 0;
2541
  }
2542

2543
  SyncTerm  preTerm = 0;
2544
  SyncIndex preIndex = index - 1;
2545

2546
  SSyncRaftEntry* pPreEntry = NULL;
2547
  SLRUCache*      pCache = pSyncNode->pLogStore->pCache;
2548
  LRUHandle*      h = taosLRUCacheLookup(pCache, &preIndex, sizeof(preIndex));
2549
  int32_t         code = 0;
2550
  if (h) {
2551
    pPreEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
2552
    code = 0;
2553

2554
    pSyncNode->pLogStore->cacheHit++;
2555
    sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", preIndex, pPreEntry->bytes, pPreEntry);
2556

2557
  } else {
2558
    pSyncNode->pLogStore->cacheMiss++;
2559
    sNTrace(pSyncNode, "miss cache index:%" PRId64, preIndex);
2560

2561
    code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry);
2562
  }
2563

2564
  SSnapshot snapshot = {.data = NULL,
2565
                        .lastApplyIndex = SYNC_INDEX_INVALID,
2566
                        .lastApplyTerm = SYNC_TERM_INVALID,
2567
                        .lastConfigIndex = SYNC_INDEX_INVALID};
2568

2569
  if (code == 0) {
2570
    if (pPreEntry == NULL) return -1;
2571
    preTerm = pPreEntry->term;
2572

2573
    if (h) {
2574
      taosLRUCacheRelease(pCache, h, false);
2575
    } else {
2576
      syncEntryDestroy(pPreEntry);
2577
    }
2578

2579
    return preTerm;
2580
  } else {
2581
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
2582
      // TODO check return value
2583
      (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
2584
      if (snapshot.lastApplyIndex == preIndex) {
2585
        return snapshot.lastApplyTerm;
2586
      }
2587
    }
2588
  }
2589

2590
  sNError(pSyncNode, "sync node get pre term error, index:%" PRId64 ", snap-index:%" PRId64 ", snap-term:%" PRId64,
2591
          index, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
2592
  return SYNC_TERM_INVALID;
2593
}
2594

2595
// get pre index and term of "index"
2596
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm) {
2597
  *pPreIndex = syncNodeGetPreIndex(pSyncNode, index);
2598
  *pPreTerm = syncNodeGetPreTerm(pSyncNode, index);
2599
  return 0;
2600
}
2601
#endif
2602

2603
static void syncNodeEqPingTimer(void* param, void* tmrId) {
201,140✔
2604
  if (!syncIsInit()) return;
201,140!
2605

2606
  int64_t    rid = (int64_t)param;
201,140✔
2607
  SSyncNode* pNode = syncNodeAcquire(rid);
201,140✔
2608

2609
  if (pNode == NULL) return;
201,140!
2610

2611
  if (atomic_load_64(&pNode->pingTimerLogicClockUser) <= atomic_load_64(&pNode->pingTimerLogicClock)) {
201,140!
2612
    SRpcMsg rpcMsg = {0};
201,140✔
2613
    int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock),
201,140✔
2614
                                    pNode->pingTimerMS, pNode);
2615
    if (code != 0) {
201,140!
2616
      sError("failed to build ping msg");
×
2617
      rpcFreeCont(rpcMsg.pCont);
×
2618
      goto _out;
×
2619
    }
2620

2621
    // sTrace("enqueue ping msg");
2622
    code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
201,140✔
2623
    if (code != 0) {
201,140!
2624
      sError("failed to sync enqueue ping msg since %s", terrstr());
×
2625
      rpcFreeCont(rpcMsg.pCont);
×
2626
      goto _out;
×
2627
    }
2628

2629
  _out:
201,140✔
2630
    if (taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager,
201,140!
2631
                     &pNode->pPingTimer))
2632
      sError("failed to reset ping timer");
×
2633
  }
2634
  syncNodeRelease(pNode);
201,140✔
2635
}
2636

2637
static void syncNodeEqElectTimer(void* param, void* tmrId) {
1,254✔
2638
  if (!syncIsInit()) return;
1,256!
2639

2640
  int64_t    rid = (int64_t)param;
1,254✔
2641
  SSyncNode* pNode = syncNodeAcquire(rid);
1,254✔
2642

2643
  if (pNode == NULL) return;
1,254✔
2644

2645
  if (pNode->syncEqMsg == NULL) {
1,253!
2646
    syncNodeRelease(pNode);
×
2647
    return;
×
2648
  }
2649

2650
  int64_t tsNow = taosGetTimestampMs();
1,253✔
2651
  if (tsNow < pNode->electTimerParam.executeTime) {
1,253✔
2652
    syncNodeRelease(pNode);
1✔
2653
    return;
1✔
2654
  }
2655

2656
  SRpcMsg rpcMsg = {0};
1,252✔
2657
  int32_t code =
2658
      syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pNode->electTimerParam.logicClock, pNode->electTimerMS, pNode);
1,252✔
2659

2660
  if (code != 0) {
1,252!
2661
    sError("failed to build elect msg");
×
2662
    syncNodeRelease(pNode);
×
2663
    return;
×
2664
  }
2665

2666
  SyncTimeout* pTimeout = rpcMsg.pCont;
1,252✔
2667
  sNTrace(pNode, "enqueue elect msg lc:%" PRId64, pTimeout->logicClock);
1,252!
2668

2669
  code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
1,252✔
2670
  if (code != 0) {
1,252!
2671
    sError("failed to sync enqueue elect msg since %s", terrstr());
×
2672
    rpcFreeCont(rpcMsg.pCont);
×
2673
    syncNodeRelease(pNode);
×
2674
    return;
×
2675
  }
2676

2677
  syncNodeRelease(pNode);
1,252✔
2678
}
2679

2680
#ifdef BUILD_NO_CALL
2681
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
2682
  if (!syncIsInit()) return;
2683

2684
  int64_t    rid = (int64_t)param;
2685
  SSyncNode* pNode = syncNodeAcquire(rid);
2686

2687
  if (pNode == NULL) return;
2688

2689
  if (pNode->totalReplicaNum > 1) {
2690
    if (atomic_load_64(&pNode->heartbeatTimerLogicClockUser) <= atomic_load_64(&pNode->heartbeatTimerLogicClock)) {
2691
      SRpcMsg rpcMsg = {0};
2692
      int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pNode->heartbeatTimerLogicClock),
2693
                                      pNode->heartbeatTimerMS, pNode);
2694

2695
      if (code != 0) {
2696
        sError("failed to build heartbeat msg");
2697
        goto _out;
2698
      }
2699

2700
      sTrace("vgId:%d, enqueue heartbeat timer", pNode->vgId);
2701
      code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
2702
      if (code != 0) {
2703
        sError("failed to enqueue heartbeat msg since %s", terrstr());
2704
        rpcFreeCont(rpcMsg.pCont);
2705
        goto _out;
2706
      }
2707

2708
    _out:
2709
      if (taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager,
2710
                       &pNode->pHeartbeatTimer) != 0)
2711
        return;
2712

2713
    } else {
2714
      sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRId64 ", heartbeatTimerLogicClockUser:%" PRId64,
2715
             pNode->heartbeatTimerLogicClock, pNode->heartbeatTimerLogicClockUser);
2716
    }
2717
  }
2718
}
2719
#endif
2720

2721
static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
39,512✔
2722
  int32_t code = 0;
39,512✔
2723
  int64_t hbDataRid = (int64_t)param;
39,512✔
2724
  int64_t tsNow = taosGetTimestampMs();
39,512✔
2725

2726
  SSyncHbTimerData* pData = syncHbTimerDataAcquire(hbDataRid);
39,512✔
2727
  if (pData == NULL) {
39,512!
2728
    sError("hb timer get pData NULL, %" PRId64, hbDataRid);
×
2729
    return;
×
2730
  }
2731

2732
  SSyncNode* pSyncNode = syncNodeAcquire(pData->syncNodeRid);
39,512✔
2733
  if (pSyncNode == NULL) {
39,512✔
2734
    syncHbTimerDataRelease(pData);
2✔
2735
    sError("hb timer get pSyncNode NULL");
2!
2736
    return;
2✔
2737
  }
2738

2739
  SSyncTimer* pSyncTimer = pData->pTimer;
39,510✔
2740

2741
  if (!pSyncNode->isStart) {
39,510!
2742
    syncNodeRelease(pSyncNode);
×
2743
    syncHbTimerDataRelease(pData);
×
2744
    sError("vgId:%d, hb timer sync node already stop", pSyncNode->vgId);
×
2745
    return;
×
2746
  }
2747

2748
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
39,510!
2749
    syncNodeRelease(pSyncNode);
×
2750
    syncHbTimerDataRelease(pData);
×
2751
    sError("vgId:%d, hb timer sync node not leader", pSyncNode->vgId);
×
2752
    return;
×
2753
  }
2754

2755
  sTrace("vgId:%d, peer hb timer execution, rid:%" PRId64 " addr:%" PRId64, pSyncNode->vgId, hbDataRid,
39,510!
2756
         pData->destId.addr);
2757

2758
  if (pSyncNode->totalReplicaNum > 1) {
39,510✔
2759
    int64_t timerLogicClock = atomic_load_64(&pSyncTimer->logicClock);
39,508✔
2760
    int64_t msgLogicClock = atomic_load_64(&pData->logicClock);
39,508✔
2761

2762
    if (timerLogicClock == msgLogicClock) {
39,508✔
2763
      if (tsNow > pData->execTime) {
39,506✔
2764
        pData->execTime += pSyncTimer->timerMS;
39,432✔
2765

2766
        SRpcMsg rpcMsg = {0};
39,432✔
2767
        if ((code = syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId)) != 0) {
39,432!
2768
          sError("vgId:%d, failed to build heartbeat msg since %s", pSyncNode->vgId, tstrerror(code));
×
2769
          syncNodeRelease(pSyncNode);
×
2770
          syncHbTimerDataRelease(pData);
×
2771
          return;
×
2772
        }
2773

2774
        pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode);
39,432✔
2775

2776
        SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
39,432✔
2777
        pSyncMsg->srcId = pSyncNode->myRaftId;
39,432✔
2778
        pSyncMsg->destId = pData->destId;
39,432✔
2779
        pSyncMsg->term = raftStoreGetTerm(pSyncNode);
39,432✔
2780
        pSyncMsg->commitIndex = pSyncNode->commitIndex;
39,432✔
2781
        pSyncMsg->minMatchIndex = pSyncNode->minMatchIndex;
39,432✔
2782
        pSyncMsg->privateTerm = 0;
39,432✔
2783
        pSyncMsg->timeStamp = tsNow;
39,432✔
2784

2785
        // update reset time
2786
        int64_t timerElapsed = tsNow - pSyncTimer->timeStamp;
39,432✔
2787
        pSyncTimer->timeStamp = tsNow;
39,432✔
2788

2789
        // send msg
2790
        TRACE_SET_MSGID(&(rpcMsg.info.traceId), tGenIdPI64());
39,432✔
2791
        STraceId* trace = &(rpcMsg.info.traceId);
39,432✔
2792
        sGTrace("vgId:%d, send sync-heartbeat to dnode:%d", pSyncNode->vgId, DID(&(pSyncMsg->destId)));
39,432!
2793
        syncLogSendHeartbeat(pSyncNode, pSyncMsg, false, timerElapsed, pData->execTime);
39,432✔
2794
        int ret = syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
39,432✔
2795
        if (ret != 0) {
39,432✔
2796
          sError("vgId:%d, failed to send heartbeat since %s", pSyncNode->vgId, tstrerror(ret));
84!
2797
        }
2798
      }
2799

2800
      if (syncIsInit()) {
39,506!
2801
        sTrace("vgId:%d, reset peer hb timer at %d", pSyncNode->vgId, pSyncTimer->timerMS);
39,506!
2802
        bool stopped = taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS, (void*)hbDataRid,
39,506✔
2803
                                    syncEnv()->pTimerManager, &pSyncTimer->pTimer);
39,506✔
2804
        if (stopped) sError("vgId:%d, reset peer hb timer error, %s", pSyncNode->vgId, tstrerror(code));
39,506!
2805

2806
      } else {
2807
        sError("sync env is stop, reset peer hb timer error");
×
2808
      }
2809

2810
    } else {
2811
      sTrace("vgId:%d, do not send hb, timerLogicClock:%" PRId64 ", msgLogicClock:%" PRId64 "", pSyncNode->vgId,
2!
2812
             timerLogicClock, msgLogicClock);
2813
    }
2814
  }
2815

2816
  syncHbTimerDataRelease(pData);
39,510✔
2817
  syncNodeRelease(pSyncNode);
39,510✔
2818
}
2819

2820
#ifdef BUILD_NO_CALL
2821
static void deleteCacheEntry(const void* key, size_t keyLen, void* value, void* ud) {
2822
  (void)ud;
2823
  taosMemoryFree(value);
2824
}
2825

2826
int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) {
2827
  SSyncLogStoreData* pData = pLogStore->data;
2828
  sNTrace(pData->pSyncNode, "in cache index:%" PRId64 ", bytes:%u, %p", pEntry->index, pEntry->bytes, pEntry);
2829

2830
  int32_t   code = 0;
2831
  int32_t   entryLen = sizeof(*pEntry) + pEntry->dataLen;
2832
  LRUStatus status = taosLRUCacheInsert(pLogStore->pCache, &pEntry->index, sizeof(pEntry->index), pEntry, entryLen,
2833
                                        deleteCacheEntry, h, TAOS_LRU_PRIORITY_LOW, NULL);
2834
  if (status != TAOS_LRU_STATUS_OK) {
2835
    code = -1;
2836
  }
2837

2838
  return code;
2839
}
2840
#endif
2841

2842
void syncBuildConfigFromReq(SAlterVnodeReplicaReq* pReq, SSyncCfg* cfg) {  // TODO SAlterVnodeReplicaReq name is proper?
×
2843
  cfg->replicaNum = 0;
×
2844
  cfg->totalReplicaNum = 0;
×
2845
  int32_t code = 0;
×
2846

2847
  for (int i = 0; i < pReq->replica; ++i) {
×
2848
    SNodeInfo* pNode = &cfg->nodeInfo[i];
×
2849
    pNode->nodeId = pReq->replicas[i].id;
×
2850
    pNode->nodePort = pReq->replicas[i].port;
×
2851
    tstrncpy(pNode->nodeFqdn, pReq->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
×
2852
    pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
×
2853
    bool update = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
×
2854
    sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d, update:%d", pReq->vgId, i, pNode->nodeFqdn,
×
2855
          pNode->nodePort, pNode->nodeId, pNode->nodeRole, update);
2856
    cfg->replicaNum++;
×
2857
  }
2858
  if (pReq->selfIndex != -1) {
×
2859
    cfg->myIndex = pReq->selfIndex;
×
2860
  }
2861
  for (int i = cfg->replicaNum; i < pReq->replica + pReq->learnerReplica; ++i) {
×
2862
    SNodeInfo* pNode = &cfg->nodeInfo[i];
×
2863
    pNode->nodeId = pReq->learnerReplicas[cfg->totalReplicaNum].id;
×
2864
    pNode->nodePort = pReq->learnerReplicas[cfg->totalReplicaNum].port;
×
2865
    pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
2866
    tstrncpy(pNode->nodeFqdn, pReq->learnerReplicas[cfg->totalReplicaNum].fqdn, sizeof(pNode->nodeFqdn));
×
2867
    bool update = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
×
2868
    sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d, update:%d", pReq->vgId, i, pNode->nodeFqdn,
×
2869
          pNode->nodePort, pNode->nodeId, pNode->nodeRole, update);
2870
    cfg->totalReplicaNum++;
×
2871
  }
2872
  cfg->totalReplicaNum += pReq->replica;
×
2873
  if (pReq->learnerSelfIndex != -1) {
×
2874
    cfg->myIndex = pReq->replica + pReq->learnerSelfIndex;
×
2875
  }
2876
  cfg->changeVersion = pReq->changeVersion;
×
2877
}
×
2878

2879
int32_t syncNodeCheckChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry) {
×
2880
  int32_t code = 0;
×
2881
  if (pEntry->originalRpcType != TDMT_SYNC_CONFIG_CHANGE) {
×
2882
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
2883
  }
2884

2885
  SMsgHead* head = (SMsgHead*)pEntry->data;
×
2886
  void*     pReq = POINTER_SHIFT(head, sizeof(SMsgHead));
×
2887

2888
  SAlterVnodeTypeReq req = {0};
×
2889
  if (tDeserializeSAlterVnodeReplicaReq(pReq, head->contLen, &req) != 0) {
×
2890
    code = TSDB_CODE_INVALID_MSG;
×
2891
    TAOS_RETURN(code);
×
2892
  }
2893

2894
  SSyncCfg cfg = {0};
×
2895
  syncBuildConfigFromReq(&req, &cfg);
×
2896

2897
  if (cfg.totalReplicaNum >= 1 &&
×
2898
      (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER)) {
×
2899
    bool incfg = false;
×
2900
    for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
×
2901
      if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
×
2902
          ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
×
2903
        incfg = true;
×
2904
        break;
×
2905
      }
2906
    }
2907

2908
    if (!incfg) {
×
2909
      SyncTerm currentTerm = raftStoreGetTerm(ths);
×
2910
      SRaftId  id = EMPTY_RAFT_ID;
×
2911
      syncNodeStepDown(ths, currentTerm, id);
×
2912
      return 1;
×
2913
    }
2914
  }
2915
  return 0;
×
2916
}
2917

2918
void syncNodeLogConfigInfo(SSyncNode* ths, SSyncCfg* cfg, char* str) {
×
2919
  sInfo("vgId:%d, %s. SyncNode, replicaNum:%d, peersNum:%d, lastConfigIndex:%" PRId64
×
2920
        ", changeVersion:%d, "
2921
        "restoreFinish:%d",
2922
        ths->vgId, str, ths->replicaNum, ths->peersNum, ths->raftCfg.lastConfigIndex, ths->raftCfg.cfg.changeVersion,
2923
        ths->restoreFinish);
2924

2925
  sInfo("vgId:%d, %s, myNodeInfo, clusterId:%" PRId64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str,
×
2926
        ths->myNodeInfo.clusterId, ths->myNodeInfo.nodeId, ths->myNodeInfo.nodeFqdn, ths->myNodeInfo.nodePort,
2927
        ths->myNodeInfo.nodeRole);
2928

2929
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
2930
    sInfo("vgId:%d, %s, peersNodeInfo%d, clusterId:%" PRId64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str,
×
2931
          i, ths->peersNodeInfo[i].clusterId, ths->peersNodeInfo[i].nodeId, ths->peersNodeInfo[i].nodeFqdn,
2932
          ths->peersNodeInfo[i].nodePort, ths->peersNodeInfo[i].nodeRole);
2933
  }
2934

2935
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
2936
    char    buf[256];
2937
    int32_t len = 256;
×
2938
    int32_t n = 0;
×
2939
    n += tsnprintf(buf + n, len - n, "%s", "{");
×
2940
    for (int i = 0; i < ths->peersEpset->numOfEps; i++) {
×
2941
      n += tsnprintf(buf + n, len - n, "%s:%d%s", ths->peersEpset->eps[i].fqdn, ths->peersEpset->eps[i].port,
×
2942
                     (i + 1 < ths->peersEpset->numOfEps ? ", " : ""));
×
2943
    }
2944
    n += tsnprintf(buf + n, len - n, "%s", "}");
×
2945

2946
    sInfo("vgId:%d, %s, peersEpset%d, %s, inUse:%d", ths->vgId, str, i, buf, ths->peersEpset->inUse);
×
2947
  }
2948

2949
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
2950
    sInfo("vgId:%d, %s, peersId%d, addr:%" PRId64, ths->vgId, str, i, ths->peersId[i].addr);
×
2951
  }
2952

2953
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
2954
    sInfo("vgId:%d, %s, nodeInfo%d, clusterId:%" PRId64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str, i,
×
2955
          ths->raftCfg.cfg.nodeInfo[i].clusterId, ths->raftCfg.cfg.nodeInfo[i].nodeId,
2956
          ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, ths->raftCfg.cfg.nodeInfo[i].nodePort,
2957
          ths->raftCfg.cfg.nodeInfo[i].nodeRole);
2958
  }
2959

2960
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
2961
    sInfo("vgId:%d, %s, replicasId%d, addr:%" PRId64, ths->vgId, str, i, ths->replicasId[i].addr);
×
2962
  }
2963
}
×
2964

2965
int32_t syncNodeRebuildPeerAndCfg(SSyncNode* ths, SSyncCfg* cfg) {
×
2966
  int32_t i = 0;
×
2967

2968
  // change peersNodeInfo
2969
  i = 0;
×
2970
  for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
2971
    if (!(strcmp(ths->myNodeInfo.nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
2972
          ths->myNodeInfo.nodePort == cfg->nodeInfo[j].nodePort)) {
×
2973
      ths->peersNodeInfo[i].nodeRole = cfg->nodeInfo[j].nodeRole;
×
2974
      ths->peersNodeInfo[i].clusterId = cfg->nodeInfo[j].clusterId;
×
2975
      tstrncpy(ths->peersNodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn, TSDB_FQDN_LEN);
×
2976
      ths->peersNodeInfo[i].nodeId = cfg->nodeInfo[j].nodeId;
×
2977
      ths->peersNodeInfo[i].nodePort = cfg->nodeInfo[j].nodePort;
×
2978

2979
      syncUtilNodeInfo2EpSet(&ths->peersNodeInfo[i], &ths->peersEpset[i]);
×
2980

2981
      if (syncUtilNodeInfo2RaftId(&ths->peersNodeInfo[i], ths->vgId, &ths->peersId[i]) == false) {
×
2982
        sError("vgId:%d, failed to determine raft member id, peer:%d", ths->vgId, i);
×
2983
        return terrno;
×
2984
      }
2985

2986
      i++;
×
2987
    }
2988
  }
2989
  ths->peersNum = i;
×
2990

2991
  // change cfg nodeInfo
2992
  ths->raftCfg.cfg.replicaNum = 0;
×
2993
  i = 0;
×
2994
  for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
2995
    if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
2996
      ths->raftCfg.cfg.replicaNum++;
×
2997
    }
2998
    ths->raftCfg.cfg.nodeInfo[i].nodeRole = cfg->nodeInfo[j].nodeRole;
×
2999
    ths->raftCfg.cfg.nodeInfo[i].clusterId = cfg->nodeInfo[j].clusterId;
×
3000
    tstrncpy(ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn, TSDB_FQDN_LEN);
×
3001
    ths->raftCfg.cfg.nodeInfo[i].nodeId = cfg->nodeInfo[j].nodeId;
×
3002
    ths->raftCfg.cfg.nodeInfo[i].nodePort = cfg->nodeInfo[j].nodePort;
×
3003
    if ((strcmp(ths->myNodeInfo.nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
3004
         ths->myNodeInfo.nodePort == cfg->nodeInfo[j].nodePort)) {
×
3005
      ths->raftCfg.cfg.myIndex = i;
×
3006
    }
3007
    i++;
×
3008
  }
3009
  ths->raftCfg.cfg.totalReplicaNum = i;
×
3010

3011
  return 0;
×
3012
}
3013

3014
void syncNodeChangePeerAndCfgToVoter(SSyncNode* ths, SSyncCfg* cfg) {
×
3015
  // change peersNodeInfo
3016
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
3017
    for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
3018
      if (strcmp(ths->peersNodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
3019
          ths->peersNodeInfo[i].nodePort == cfg->nodeInfo[j].nodePort) {
×
3020
        if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3021
          ths->peersNodeInfo[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3022
        }
3023
      }
3024
    }
3025
  }
3026

3027
  // change cfg nodeInfo
3028
  ths->raftCfg.cfg.replicaNum = 0;
×
3029
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3030
    for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
3031
      if (strcmp(ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
3032
          ths->raftCfg.cfg.nodeInfo[i].nodePort == cfg->nodeInfo[j].nodePort) {
×
3033
        if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3034
          ths->raftCfg.cfg.nodeInfo[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3035
          ths->raftCfg.cfg.replicaNum++;
×
3036
        }
3037
      }
3038
    }
3039
  }
3040
}
×
3041

3042
int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum) {
×
3043
  int32_t code = 0;
×
3044
  // 1.rebuild replicasId, remove deleted one
3045
  SRaftId oldReplicasId[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
3046
  memcpy(oldReplicasId, ths->replicasId, sizeof(oldReplicasId));
×
3047

3048
  ths->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3049
  ths->totalReplicaNum = ths->raftCfg.cfg.totalReplicaNum;
×
3050
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3051
    if (syncUtilNodeInfo2RaftId(&ths->raftCfg.cfg.nodeInfo[i], ths->vgId, &ths->replicasId[i]) == false) return terrno;
×
3052
  }
3053

3054
  // 2.rebuild MatchIndex, remove deleted one
3055
  SSyncIndexMgr* oldIndex = ths->pMatchIndex;
×
3056

3057
  ths->pMatchIndex = syncIndexMgrCreate(ths);
×
3058
  if (ths->pMatchIndex == NULL) {
×
3059
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
3060
    if (terrno != 0) code = terrno;
×
3061
    TAOS_RETURN(code);
×
3062
  }
3063

3064
  syncIndexMgrCopyIfExist(ths->pMatchIndex, oldIndex, oldReplicasId);
×
3065

3066
  syncIndexMgrDestroy(oldIndex);
×
3067

3068
  // 3.rebuild NextIndex, remove deleted one
3069
  SSyncIndexMgr* oldNextIndex = ths->pNextIndex;
×
3070

3071
  ths->pNextIndex = syncIndexMgrCreate(ths);
×
3072
  if (ths->pNextIndex == NULL) {
×
3073
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
3074
    if (terrno != 0) code = terrno;
×
3075
    TAOS_RETURN(code);
×
3076
  }
3077

3078
  syncIndexMgrCopyIfExist(ths->pNextIndex, oldNextIndex, oldReplicasId);
×
3079

3080
  syncIndexMgrDestroy(oldNextIndex);
×
3081

3082
  // 4.rebuild pVotesGranted, pVotesRespond, no need to keep old vote state, only rebuild
3083
  voteGrantedUpdate(ths->pVotesGranted, ths);
×
3084
  votesRespondUpdate(ths->pVotesRespond, ths);
×
3085

3086
  // 5.rebuild logReplMgr
3087
  for (int i = 0; i < oldtotalReplicaNum; ++i) {
×
3088
    sDebug("vgId:%d, old logReplMgrs i:%d, peerId:%d, restoreed:%d, [%" PRId64 " %" PRId64 ", %" PRId64 ")", ths->vgId,
×
3089
           i, ths->logReplMgrs[i]->peerId, ths->logReplMgrs[i]->restored, ths->logReplMgrs[i]->startIndex,
3090
           ths->logReplMgrs[i]->matchIndex, ths->logReplMgrs[i]->endIndex);
3091
  }
3092

3093
  SSyncLogReplMgr* oldLogReplMgrs = NULL;
×
3094
  int64_t          length = sizeof(SSyncLogReplMgr) * (TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA);
×
3095
  oldLogReplMgrs = taosMemoryMalloc(length);
×
3096
  if (NULL == oldLogReplMgrs) return terrno;
×
3097
  memset(oldLogReplMgrs, 0, length);
×
3098

3099
  for (int i = 0; i < oldtotalReplicaNum; i++) {
×
3100
    oldLogReplMgrs[i] = *(ths->logReplMgrs[i]);
×
3101
  }
3102

3103
  syncNodeLogReplDestroy(ths);
×
3104
  if ((code = syncNodeLogReplInit(ths)) != 0) {
×
3105
    taosMemoryFree(oldLogReplMgrs);
×
3106
    TAOS_RETURN(code);
×
3107
  }
3108

3109
  for (int i = 0; i < ths->totalReplicaNum; ++i) {
×
3110
    for (int j = 0; j < oldtotalReplicaNum; j++) {
×
3111
      if (syncUtilSameId(&ths->replicasId[i], &oldReplicasId[j])) {
×
3112
        *(ths->logReplMgrs[i]) = oldLogReplMgrs[j];
×
3113
        ths->logReplMgrs[i]->peerId = i;
×
3114
      }
3115
    }
3116
  }
3117

3118
  for (int i = 0; i < ths->totalReplicaNum; ++i) {
×
3119
    sDebug("vgId:%d, new logReplMgrs i:%d, peerId:%d, restoreed:%d, [%" PRId64 " %" PRId64 ", %" PRId64 ")", ths->vgId,
×
3120
           i, ths->logReplMgrs[i]->peerId, ths->logReplMgrs[i]->restored, ths->logReplMgrs[i]->startIndex,
3121
           ths->logReplMgrs[i]->matchIndex, ths->logReplMgrs[i]->endIndex);
3122
  }
3123

3124
  // 6.rebuild sender
3125
  for (int i = 0; i < oldtotalReplicaNum; ++i) {
×
3126
    sDebug("vgId:%d, old sender i:%d, replicaIndex:%d, lastSendTime:%" PRId64, ths->vgId, i,
×
3127
           ths->senders[i]->replicaIndex, ths->senders[i]->lastSendTime)
3128
  }
3129

3130
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
×
3131
    if (ths->senders[i] != NULL) {
×
3132
      sDebug("vgId:%d, snapshot sender destroy while close, data:%p", ths->vgId, ths->senders[i]);
×
3133

3134
      if (snapshotSenderIsStart(ths->senders[i])) {
×
3135
        snapshotSenderStop(ths->senders[i], false);
×
3136
      }
3137

3138
      snapshotSenderDestroy(ths->senders[i]);
×
3139
      ths->senders[i] = NULL;
×
3140
    }
3141
  }
3142

3143
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
×
3144
    SSyncSnapshotSender* pSender = NULL;
×
3145
    int32_t              code = snapshotSenderCreate(ths, i, &pSender);
×
3146
    if (pSender == NULL) return terrno = code;
×
3147

3148
    ths->senders[i] = pSender;
×
3149
    sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender);
×
3150
  }
3151

3152
  for (int i = 0; i < ths->totalReplicaNum; i++) {
×
3153
    sDebug("vgId:%d, new sender i:%d, replicaIndex:%d, lastSendTime:%" PRId64, ths->vgId, i,
×
3154
           ths->senders[i]->replicaIndex, ths->senders[i]->lastSendTime)
3155
  }
3156

3157
  // 7.rebuild synctimer
3158
  if ((code = syncNodeStopHeartbeatTimer(ths)) != 0) {
×
3159
    taosMemoryFree(oldLogReplMgrs);
×
3160
    TAOS_RETURN(code);
×
3161
  }
3162

3163
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
×
3164
    if ((code = syncHbTimerInit(ths, &ths->peerHeartbeatTimerArr[i], ths->replicasId[i])) != 0) {
×
3165
      taosMemoryFree(oldLogReplMgrs);
×
3166
      TAOS_RETURN(code);
×
3167
    }
3168
  }
3169

3170
  if ((code = syncNodeStartHeartbeatTimer(ths)) != 0) {
×
3171
    taosMemoryFree(oldLogReplMgrs);
×
3172
    TAOS_RETURN(code);
×
3173
  }
3174

3175
  // 8.rebuild peerStates
3176
  SPeerState oldState[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA] = {0};
×
3177
  for (int i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; i++) {
×
3178
    oldState[i] = ths->peerStates[i];
×
3179
  }
3180

3181
  for (int i = 0; i < ths->totalReplicaNum; i++) {
×
3182
    for (int j = 0; j < oldtotalReplicaNum; j++) {
×
3183
      if (syncUtilSameId(&ths->replicasId[i], &oldReplicasId[j])) {
×
3184
        ths->peerStates[i] = oldState[j];
×
3185
      }
3186
    }
3187
  }
3188

3189
  taosMemoryFree(oldLogReplMgrs);
×
3190

3191
  return 0;
×
3192
}
3193

3194
void syncNodeChangeToVoter(SSyncNode* ths) {
×
3195
  // replicasId, only need to change replicaNum when 1->3
3196
  ths->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3197
  sDebug("vgId:%d, totalReplicaNum:%d", ths->vgId, ths->totalReplicaNum);
×
3198
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
×
3199
    sDebug("vgId:%d, i:%d, replicaId.addr:%" PRIx64, ths->vgId, i, ths->replicasId[i].addr);
×
3200
  }
3201

3202
  // pMatchIndex, pNextIndex, only need to change replicaNum when 1->3
3203
  ths->pMatchIndex->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3204
  ths->pNextIndex->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3205

3206
  sDebug("vgId:%d, pMatchIndex->totalReplicaNum:%d", ths->vgId, ths->pMatchIndex->totalReplicaNum);
×
3207
  for (int32_t i = 0; i < ths->pMatchIndex->totalReplicaNum; ++i) {
×
3208
    sDebug("vgId:%d, i:%d, match.index:%" PRId64, ths->vgId, i, ths->pMatchIndex->index[i]);
×
3209
  }
3210

3211
  // pVotesGranted, pVotesRespond
3212
  voteGrantedUpdate(ths->pVotesGranted, ths);
×
3213
  votesRespondUpdate(ths->pVotesRespond, ths);
×
3214

3215
  // logRepMgrs
3216
  // no need to change logRepMgrs when 1->3
3217
}
×
3218

3219
void syncNodeResetPeerAndCfg(SSyncNode* ths) {
×
3220
  SNodeInfo node = {0};
×
3221
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
3222
    memcpy(&ths->peersNodeInfo[i], &node, sizeof(SNodeInfo));
×
3223
  }
3224

3225
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3226
    memcpy(&ths->raftCfg.cfg.nodeInfo[i], &node, sizeof(SNodeInfo));
×
3227
  }
3228
}
×
3229

3230
int32_t syncNodeChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry, char* str) {
×
3231
  int32_t code = 0;
×
3232
  if (pEntry->originalRpcType != TDMT_SYNC_CONFIG_CHANGE) {
×
3233
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3234
  }
3235

3236
  SMsgHead* head = (SMsgHead*)pEntry->data;
×
3237
  void*     pReq = POINTER_SHIFT(head, sizeof(SMsgHead));
×
3238

3239
  SAlterVnodeTypeReq req = {0};
×
3240
  if (tDeserializeSAlterVnodeReplicaReq(pReq, head->contLen, &req) != 0) {
×
3241
    code = TSDB_CODE_INVALID_MSG;
×
3242
    TAOS_RETURN(code);
×
3243
  }
3244

3245
  SSyncCfg cfg = {0};
×
3246
  syncBuildConfigFromReq(&req, &cfg);
×
3247

3248
  if (cfg.changeVersion <= ths->raftCfg.cfg.changeVersion) {
×
3249
    sInfo(
×
3250
        "vgId:%d, skip conf change entry since lower version. "
3251
        "this entry, index:%" PRId64 ", term:%" PRId64
3252
        ", totalReplicaNum:%d, changeVersion:%d; "
3253
        "current node, replicaNum:%d, peersNum:%d, lastConfigIndex:%" PRId64 ", changeVersion:%d",
3254
        ths->vgId, pEntry->index, pEntry->term, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum, ths->peersNum,
3255
        ths->raftCfg.lastConfigIndex, ths->raftCfg.cfg.changeVersion);
3256
    return 0;
×
3257
  }
3258

3259
  if (strcmp(str, "Commit") == 0) {
×
3260
    sInfo(
×
3261
        "vgId:%d, change config from %s. "
3262
        "this, i:%" PRId64
3263
        ", trNum:%d, vers:%d; "
3264
        "node, rNum:%d, pNum:%d, trNum:%d, "
3265
        "buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
3266
        "), "
3267
        "cond:(next i:%" PRId64 ", t:%" PRId64 " ==%s)",
3268
        ths->vgId, str, pEntry->index - 1, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum, ths->peersNum,
3269
        ths->totalReplicaNum, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex, ths->pLogBuf->matchIndex,
3270
        ths->pLogBuf->endIndex, pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType));
3271
  } else {
3272
    sInfo(
×
3273
        "vgId:%d, change config from %s. "
3274
        "this, i:%" PRId64 ", t:%" PRId64
3275
        ", trNum:%d, vers:%d; "
3276
        "node, rNum:%d, pNum:%d, trNum:%d, "
3277
        "buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
3278
        "), "
3279
        "cond:(pre i:%" PRId64 "==ci:%" PRId64 ", bci:%" PRId64 ")",
3280
        ths->vgId, str, pEntry->index, pEntry->term, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum,
3281
        ths->peersNum, ths->totalReplicaNum, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
3282
        ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex, pEntry->index - 1, ths->commitIndex,
3283
        ths->pLogBuf->commitIndex);
3284
  }
3285

3286
  syncNodeLogConfigInfo(ths, &cfg, "before config change");
×
3287

3288
  int32_t oldTotalReplicaNum = ths->totalReplicaNum;
×
3289

3290
  if (cfg.totalReplicaNum == 1 || cfg.totalReplicaNum == 2) {  // remove replica
×
3291

3292
    bool incfg = false;
×
3293
    for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
×
3294
      if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
×
3295
          ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
×
3296
        incfg = true;
×
3297
        break;
×
3298
      }
3299
    }
3300

3301
    if (incfg) {  // remove other
×
3302
      syncNodeResetPeerAndCfg(ths);
×
3303

3304
      // no need to change myNodeInfo
3305

3306
      if ((code = syncNodeRebuildPeerAndCfg(ths, &cfg)) != 0) {
×
3307
        TAOS_RETURN(code);
×
3308
      };
3309

3310
      if ((code = syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum)) != 0) {
×
3311
        TAOS_RETURN(code);
×
3312
      };
3313
    } else {  // remove myself
3314
      // no need to do anything actually, to change the following to reduce distruptive server chance
3315

3316
      syncNodeResetPeerAndCfg(ths);
×
3317

3318
      // change myNodeInfo
3319
      ths->myNodeInfo.nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3320

3321
      // change peer and cfg
3322
      ths->peersNum = 0;
×
3323
      memcpy(&ths->raftCfg.cfg.nodeInfo[0], &ths->myNodeInfo, sizeof(SNodeInfo));
×
3324
      ths->raftCfg.cfg.replicaNum = 0;
×
3325
      ths->raftCfg.cfg.totalReplicaNum = 1;
×
3326

3327
      // change other
3328
      if ((code = syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum)) != 0) {
×
3329
        TAOS_RETURN(code);
×
3330
      }
3331

3332
      // change state
3333
      ths->state = TAOS_SYNC_STATE_LEARNER;
×
3334
    }
3335

3336
    ths->restoreFinish = false;
×
3337
  } else {                            // add replica, or change replica type
3338
    if (ths->totalReplicaNum == 3) {  // change replica type
×
3339
      sInfo("vgId:%d, begin change replica type", ths->vgId);
×
3340

3341
      // change myNodeInfo
3342
      for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
×
3343
        if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
×
3344
            ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
×
3345
          if (cfg.nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3346
            ths->myNodeInfo.nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3347
          }
3348
        }
3349
      }
3350

3351
      // change peer and cfg
3352
      syncNodeChangePeerAndCfgToVoter(ths, &cfg);
×
3353

3354
      // change other
3355
      syncNodeChangeToVoter(ths);
×
3356

3357
      // change state
3358
      if (ths->state == TAOS_SYNC_STATE_LEARNER) {
×
3359
        if (ths->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3360
          ths->state = TAOS_SYNC_STATE_FOLLOWER;
×
3361
        }
3362
      }
3363

3364
      ths->restoreFinish = false;
×
3365
    } else {  // add replica
3366
      sInfo("vgId:%d, begin add replica", ths->vgId);
×
3367

3368
      // no need to change myNodeInfo
3369

3370
      // change peer and cfg
3371
      if ((code = syncNodeRebuildPeerAndCfg(ths, &cfg)) != 0) {
×
3372
        TAOS_RETURN(code);
×
3373
      };
3374

3375
      // change other
3376
      if ((code = syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum)) != 0) {
×
3377
        TAOS_RETURN(code);
×
3378
      };
3379

3380
      // no need to change state
3381

3382
      if (ths->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_LEARNER) {
×
3383
        ths->restoreFinish = false;
×
3384
      }
3385
    }
3386
  }
3387

3388
  ths->quorum = syncUtilQuorum(ths->replicaNum);
×
3389

3390
  ths->raftCfg.lastConfigIndex = pEntry->index;
×
3391
  ths->raftCfg.cfg.lastIndex = pEntry->index;
×
3392
  ths->raftCfg.cfg.changeVersion = cfg.changeVersion;
×
3393

3394
  syncNodeLogConfigInfo(ths, &cfg, "after config change");
×
3395

3396
  if ((code = syncWriteCfgFile(ths)) != 0) {
×
3397
    sError("vgId:%d, failed to create sync cfg file", ths->vgId);
×
3398
    TAOS_RETURN(code);
×
3399
  };
3400

3401
  TAOS_RETURN(code);
×
3402
}
3403

3404
int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) {
10,286,277✔
3405
  int32_t code = -1;
10,286,277✔
3406
  if (pEntry->dataLen < sizeof(SMsgHead)) {
10,286,277!
3407
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
3408
    sError("vgId:%d, cannot append an invalid client request with no msg head. type:%s, dataLen:%d", ths->vgId,
×
3409
           TMSG_INFO(pEntry->originalRpcType), pEntry->dataLen);
3410
    syncEntryDestroy(pEntry);
×
3411
    pEntry = NULL;
×
3412
    goto _out;
×
3413
  }
3414

3415
  // append to log buffer
3416
  if ((code = syncLogBufferAppend(ths->pLogBuf, ths, pEntry)) < 0) {
10,286,277✔
3417
    sError("vgId:%d, failed to enqueue sync log buffer, index:%" PRId64, ths->vgId, pEntry->index);
261!
3418
    int32_t ret = 0;
261✔
3419
    if ((ret = syncFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, terrno, false)) != 0) {
261!
3420
      sError("vgId:%d, failed to execute fsm, since %s", ths->vgId, tstrerror(ret));
×
3421
    }
3422
    syncEntryDestroy(pEntry);
×
3423
    pEntry = NULL;
×
3424
    goto _out;
×
3425
  }
3426

3427
  code = 0;
10,286,258✔
3428
_out:;
10,286,258✔
3429
  // proceed match index, with replicating on needed
3430
  SyncIndex matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, NULL, "Append");
10,286,258✔
3431

3432
  if (pEntry != NULL)
10,286,236!
3433
    sTrace("vgId:%d, append raft entry. index:%" PRId64 ", term:%" PRId64 " pBuf: [%" PRId64 " %" PRId64 " %" PRId64
10,286,337✔
3434
           ", %" PRId64 ")",
3435
           ths->vgId, pEntry->index, pEntry->term, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
3436
           ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex);
3437

3438
  if (code == 0 && ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
10,286,236!
3439
    int64_t index = syncNodeUpdateAssignedCommitIndex(ths, matchIndex);
×
3440
    sTrace("vgId:%d, update assigned commit index %" PRId64 "", ths->vgId, index);
×
3441

3442
    if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
×
3443
        syncLogBufferCommit(ths->pLogBuf, ths, ths->assignedCommitIndex) < 0) {
×
3444
      sError("vgId:%d, failed to commit until commitIndex:%" PRId64 "", ths->vgId, ths->commitIndex);
×
3445
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
3446
    }
3447
  }
3448

3449
  // multi replica
3450
  if (ths->replicaNum > 1) {
10,286,118✔
3451
    TAOS_RETURN(code);
134,027✔
3452
  }
3453

3454
  // single replica
3455
  SyncIndex returnIndex = syncNodeUpdateCommitIndex(ths, matchIndex);
10,152,091✔
3456
  sTrace("vgId:%d, update commit return index %" PRId64 "", ths->vgId, returnIndex);
10,152,299✔
3457

3458
  if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
20,304,723!
3459
      (code = syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex)) < 0) {
10,152,371✔
3460
    sError("vgId:%d, failed to commit until commitIndex:%" PRId64 "", ths->vgId, ths->commitIndex);
×
3461
  }
3462

3463
  TAOS_RETURN(code);
10,152,352✔
3464
}
3465

3466
bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode) {
10,276,321✔
3467
  if (pSyncNode->totalReplicaNum == 1) {
10,276,321✔
3468
    return false;
10,085,616✔
3469
  }
3470

3471
  int32_t toCount = 0;
190,705✔
3472
  int64_t tsNow = taosGetTimestampMs();
190,840✔
3473
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
515,227✔
3474
    if (pSyncNode->peersNodeInfo[i].nodeRole == TAOS_SYNC_ROLE_LEARNER) {
324,397✔
3475
      continue;
56,906✔
3476
    }
3477
    int64_t recvTime = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
267,491✔
3478
    if (recvTime == 0 || recvTime == -1) {
267,481!
3479
      continue;
×
3480
    }
3481

3482
    if (tsNow - recvTime > tsHeartbeatTimeout) {
267,481✔
3483
      toCount++;
3,639✔
3484
    }
3485
  }
3486

3487
  bool b = (toCount >= pSyncNode->quorum ? true : false);
190,830✔
3488

3489
  return b;
190,830✔
3490
}
3491

3492
bool syncNodeSnapshotSending(SSyncNode* pSyncNode) {
×
3493
  if (pSyncNode == NULL) return false;
×
3494
  bool b = false;
×
3495
  for (int32_t i = 0; i < pSyncNode->totalReplicaNum; ++i) {
×
3496
    if (pSyncNode->senders[i] != NULL && pSyncNode->senders[i]->start) {
×
3497
      b = true;
×
3498
      break;
×
3499
    }
3500
  }
3501
  return b;
×
3502
}
3503

3504
bool syncNodeSnapshotRecving(SSyncNode* pSyncNode) {
×
3505
  if (pSyncNode == NULL) return false;
×
3506
  if (pSyncNode->pNewNodeReceiver == NULL) return false;
×
3507
  if (pSyncNode->pNewNodeReceiver->start) return true;
×
3508
  return false;
×
3509
}
3510

3511
static int32_t syncNodeAppendNoop(SSyncNode* ths) {
11,197✔
3512
  int32_t   code = 0;
11,197✔
3513
  SyncIndex index = syncLogBufferGetEndIndex(ths->pLogBuf);
11,197✔
3514
  SyncTerm  term = raftStoreGetTerm(ths);
11,197✔
3515

3516
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
11,197✔
3517
  if (pEntry == NULL) {
11,197!
3518
    code = TSDB_CODE_OUT_OF_MEMORY;
×
3519
    TAOS_RETURN(code);
×
3520
  }
3521

3522
  code = syncNodeAppend(ths, pEntry);
11,197✔
3523
  TAOS_RETURN(code);
11,196✔
3524
}
3525

3526
#ifdef BUILD_NO_CALL
3527
static int32_t syncNodeAppendNoopOld(SSyncNode* ths) {
3528
  int32_t ret = 0;
3529

3530
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
3531
  SyncTerm        term = raftStoreGetTerm(ths);
3532
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
3533
  if (pEntry == NULL) return -1;
3534

3535
  LRUHandle* h = NULL;
3536

3537
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
3538
    int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry, false);
3539
    if (code != 0) {
3540
      sError("append noop error");
3541
      return -1;
3542
    }
3543

3544
    syncCacheEntry(ths->pLogStore, pEntry, &h);
3545
  }
3546

3547
  if (h) {
3548
    taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
3549
  } else {
3550
    syncEntryDestroy(pEntry);
3551
  }
3552

3553
  return ret;
3554
}
3555
#endif
3556

3557
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
36,487✔
3558
  SyncHeartbeat* pMsg = pRpcMsg->pCont;
36,487✔
3559
  bool           resetElect = false;
36,487✔
3560

3561
  const STraceId* trace = &pRpcMsg->info.traceId;
36,487✔
3562
  char            tbuf[40] = {0};
36,487✔
3563
  TRACE_TO_STR(trace, tbuf);
36,487!
3564

3565
  int64_t tsMs = taosGetTimestampMs();
36,488✔
3566
  int64_t timeDiff = tsMs - pMsg->timeStamp;
36,488✔
3567
  syncLogRecvHeartbeat(ths, pMsg, timeDiff, tbuf);
36,488✔
3568

3569
  if (!syncNodeInRaftGroup(ths, &pMsg->srcId)) {
36,488✔
3570
    sWarn(
6!
3571
        "vgId:%d, drop heartbeat msg from dnode:%d, because it come from another cluster:%d, differ from current "
3572
        "cluster:%d",
3573
        ths->vgId, DID(&(pMsg->srcId)), CID(&(pMsg->srcId)), CID(&(ths->myRaftId)));
3574
    return 0;
6✔
3575
  }
3576

3577
  SRpcMsg rpcMsg = {0};
36,482✔
3578
  TAOS_CHECK_RETURN(syncBuildHeartbeatReply(&rpcMsg, ths->vgId));
36,482!
3579
  SyncTerm currentTerm = raftStoreGetTerm(ths);
36,482✔
3580

3581
  SyncHeartbeatReply* pMsgReply = rpcMsg.pCont;
36,482✔
3582
  pMsgReply->destId = pMsg->srcId;
36,482✔
3583
  pMsgReply->srcId = ths->myRaftId;
36,482✔
3584
  pMsgReply->term = currentTerm;
36,482✔
3585
  pMsgReply->privateTerm = 8864;  // magic number
36,482✔
3586
  pMsgReply->startTime = ths->startTime;
36,482✔
3587
  pMsgReply->timeStamp = tsMs;
36,482✔
3588

3589
  sGTrace("vgId:%d, process sync-heartbeat msg from dnode:%d, cluster:%d, Msgterm:%" PRId64 " currentTerm:%" PRId64,
36,482!
3590
          ths->vgId, DID(&(pMsg->srcId)), CID(&(pMsg->srcId)), pMsg->term, currentTerm);
3591

3592
  if (pMsg->term > currentTerm && ths->state == TAOS_SYNC_STATE_LEARNER) {
36,482✔
3593
    raftStoreSetTerm(ths, pMsg->term);
258✔
3594
    currentTerm = pMsg->term;
258✔
3595
  }
3596

3597
  if (pMsg->term == currentTerm &&
36,482✔
3598
      (ths->state != TAOS_SYNC_STATE_LEADER && ths->state != TAOS_SYNC_STATE_ASSIGNED_LEADER)) {
36,302!
3599
    syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs);
36,302✔
3600
    resetElect = true;
36,302✔
3601

3602
    ths->minMatchIndex = pMsg->minMatchIndex;
36,302✔
3603

3604
    if (ths->state == TAOS_SYNC_STATE_FOLLOWER || ths->state == TAOS_SYNC_STATE_LEARNER) {
36,302✔
3605
      SRpcMsg rpcMsgLocalCmd = {0};
36,299✔
3606
      TAOS_CHECK_RETURN(syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId));
36,299!
3607

3608
      SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
36,298✔
3609
      pSyncMsg->cmd =
36,298✔
3610
          (ths->state == TAOS_SYNC_STATE_LEARNER) ? SYNC_LOCAL_CMD_LEARNER_CMT : SYNC_LOCAL_CMD_FOLLOWER_CMT;
36,298✔
3611
      pSyncMsg->commitIndex = pMsg->commitIndex;
36,298✔
3612
      pSyncMsg->currentTerm = pMsg->term;
36,298✔
3613

3614
      if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
36,298!
3615
        int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
36,298✔
3616
        if (code != 0) {
36,299!
3617
          sError("vgId:%d, failed to enqueue commit msg from heartbeat since %s, code:%d", ths->vgId, terrstr(), code);
×
3618
          rpcFreeCont(rpcMsgLocalCmd.pCont);
×
3619
        } else {
3620
          sTrace("vgId:%d, enqueue commit msg from heartbeat, commit-index:%" PRId64 ", term:%" PRId64, ths->vgId,
36,299!
3621
                 pMsg->commitIndex, pMsg->term);
3622
        }
3623
      }
3624
    }
3625
  }
3626

3627
  if (pMsg->term >= currentTerm &&
36,482!
3628
      (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER)) {
36,482!
3629
    SRpcMsg rpcMsgLocalCmd = {0};
×
3630
    TAOS_CHECK_RETURN(syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId));
×
3631

3632
    SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
×
3633
    pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN;
×
3634
    pSyncMsg->currentTerm = pMsg->term;
×
3635
    pSyncMsg->commitIndex = pMsg->commitIndex;
×
3636

3637
    if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
×
3638
      int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
×
3639
      if (code != 0) {
×
3640
        sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code);
×
3641
        rpcFreeCont(rpcMsgLocalCmd.pCont);
×
3642
      } else {
3643
        sTrace("vgId:%d, sync enqueue step-down msg, new-term:%" PRId64, ths->vgId, pMsg->term);
×
3644
      }
3645
    }
3646
  }
3647

3648
  // reply
3649
  TAOS_CHECK_RETURN(syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg));
36,482!
3650

3651
  if (resetElect) syncNodeResetElectTimer(ths);
36,482✔
3652
  return 0;
36,482✔
3653
}
3654

3655
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
35,896✔
3656
  int32_t         code = 0;
35,896✔
3657
  const STraceId* trace = &pRpcMsg->info.traceId;
35,896✔
3658
  char            tbuf[40] = {0};
35,896✔
3659
  TRACE_TO_STR(trace, tbuf);
35,896!
3660

3661
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
35,897✔
3662
  SSyncLogReplMgr*    pMgr = syncNodeGetLogReplMgr(ths, &pMsg->srcId);
35,897✔
3663
  if (pMgr == NULL) {
35,897!
3664
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
3665
    if (terrno != 0) code = terrno;
×
3666
    sError("vgId:%d, failed to get log repl mgr for the peer at addr 0x016%" PRIx64 "", ths->vgId, pMsg->srcId.addr);
×
3667
    TAOS_RETURN(code);
×
3668
  }
3669

3670
  int64_t tsMs = taosGetTimestampMs();
35,896✔
3671
  syncLogRecvHeartbeatReply(ths, pMsg, tsMs - pMsg->timeStamp, tbuf);
35,896✔
3672

3673
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);
35,896✔
3674

3675
  return syncLogReplProcessHeartbeatReply(pMgr, ths, pMsg);
35,897✔
3676
}
3677

3678
#ifdef BUILD_NO_CALL
3679
int32_t syncNodeOnHeartbeatReplyOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
3680
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
3681

3682
  const STraceId* trace = &pRpcMsg->info.traceId;
3683
  char            tbuf[40] = {0};
3684
  TRACE_TO_STR(trace, tbuf);
3685

3686
  int64_t tsMs = taosGetTimestampMs();
3687
  int64_t timeDiff = tsMs - pMsg->timeStamp;
3688
  syncLogRecvHeartbeatReply(ths, pMsg, timeDiff, tbuf);
3689

3690
  // update last reply time, make decision whether the other node is alive or not
3691
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);
3692
  return 0;
3693
}
3694
#endif
3695

3696
int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
36,297✔
3697
  SyncLocalCmd* pMsg = pRpcMsg->pCont;
36,297✔
3698
  syncLogRecvLocalCmd(ths, pMsg, "");
36,297✔
3699

3700
  if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
36,297!
3701
    SRaftId id = EMPTY_RAFT_ID;
×
3702
    syncNodeStepDown(ths, pMsg->currentTerm, id);
×
3703

3704
  } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT || pMsg->cmd == SYNC_LOCAL_CMD_LEARNER_CMT) {
72,595!
3705
    if (syncLogBufferIsEmpty(ths->pLogBuf)) {
36,297!
3706
      sError("vgId:%d, sync log buffer is empty.", ths->vgId);
×
3707
      return 0;
×
3708
    }
3709
    SyncTerm matchTerm = syncLogBufferGetLastMatchTerm(ths->pLogBuf);
36,298✔
3710
    if (matchTerm < 0) {
36,299!
3711
      return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3712
    }
3713
    if (pMsg->currentTerm == matchTerm) {
36,299✔
3714
      SyncIndex returnIndex = syncNodeUpdateCommitIndex(ths, pMsg->commitIndex);
33,118✔
3715
      sTrace("vgId:%d, update commit return index %" PRId64 "", ths->vgId, returnIndex);
33,118!
3716
    }
3717
    if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE && syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) {
36,299!
3718
      sError("vgId:%d, failed to commit raft log since %s. commit index:%" PRId64 "", ths->vgId, terrstr(),
×
3719
             ths->commitIndex);
3720
    }
3721
  } else {
3722
    sError("error local cmd");
×
3723
  }
3724

3725
  return 0;
36,298✔
3726
}
3727

3728
// TLA+ Spec
3729
// ClientRequest(i, v) ==
3730
//     /\ state[i] = Leader
3731
//     /\ LET entry == [term  |-> currentTerm[i],
3732
//                      value |-> v]
3733
//            newLog == Append(log[i], entry)
3734
//        IN  log' = [log EXCEPT ![i] = newLog]
3735
//     /\ UNCHANGED <<messages, serverVars, candidateVars,
3736
//                    leaderVars, commitIndex>>
3737
//
3738

3739
int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex) {
10,275,142✔
3740
  sNTrace(ths, "on client request");
10,275,142✔
3741

3742
  int32_t code = 0;
10,275,142✔
3743

3744
  SyncIndex       index = syncLogBufferGetEndIndex(ths->pLogBuf);
10,275,142✔
3745
  SyncTerm        term = raftStoreGetTerm(ths);
10,275,307✔
3746
  SSyncRaftEntry* pEntry = NULL;
10,275,260✔
3747
  if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
10,275,260✔
3748
    pEntry = syncEntryBuildFromClientRequest(pMsg->pCont, term, index);
217,266✔
3749
  } else {
3750
    pEntry = syncEntryBuildFromRpcMsg(pMsg, term, index);
10,057,994✔
3751
  }
3752

3753
  if (pEntry == NULL) {
10,275,262!
3754
    sError("vgId:%d, failed to process client request since %s.", ths->vgId, terrstr());
×
3755
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3756
  }
3757

3758
  // 1->2, config change is add in write thread, and will continue in sync thread
3759
  // need save message for it
3760
  if (pMsg->msgType == TDMT_SYNC_CONFIG_CHANGE) {
10,275,262!
3761
    SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
×
3762
    uint64_t  seqNum = syncRespMgrAdd(ths->pSyncRespMgr, &stub);
×
3763
    pEntry->seqNum = seqNum;
×
3764
  }
3765

3766
  if (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
10,275,262!
3767
    if (pRetIndex) {
10,275,262✔
3768
      (*pRetIndex) = index;
10,057,950✔
3769
    }
3770

3771
    if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) {
10,275,262!
3772
      int32_t code = syncNodeCheckChangeConfig(ths, pEntry);
×
3773
      if (code < 0) {
×
3774
        sError("vgId:%d, failed to check change config since %s.", ths->vgId, terrstr());
×
3775
        syncEntryDestroy(pEntry);
×
3776
        pEntry = NULL;
×
3777
        TAOS_RETURN(code);
×
3778
      }
3779

3780
      if (code > 0) {
×
3781
        SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
×
3782
        int32_t num = syncRespMgrGetAndDel(ths->pSyncRespMgr, pEntry->seqNum, &rsp.info);
×
3783
        sDebug("vgId:%d, get response stub for config change, seqNum:%" PRIu64 ", num:%d", ths->vgId, pEntry->seqNum,
×
3784
               num);
3785
        if (rsp.info.handle != NULL) {
×
3786
          tmsgSendRsp(&rsp);
×
3787
        }
3788
        syncEntryDestroy(pEntry);
×
3789
        pEntry = NULL;
×
3790
        TAOS_RETURN(code);
×
3791
      }
3792
    }
3793

3794
    code = syncNodeAppend(ths, pEntry);
10,275,262✔
3795
    return code;
10,274,889✔
3796
  } else {
3797
    syncEntryDestroy(pEntry);
×
3798
    pEntry = NULL;
×
3799
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3800
  }
3801
}
3802

3803
const char* syncStr(ESyncState state) {
1,722,531✔
3804
  switch (state) {
1,722,531!
3805
    case TAOS_SYNC_STATE_FOLLOWER:
135,732✔
3806
      return "follower";
135,732✔
3807
    case TAOS_SYNC_STATE_CANDIDATE:
10,689✔
3808
      return "candidate";
10,689✔
3809
    case TAOS_SYNC_STATE_LEADER:
1,564,728✔
3810
      return "leader";
1,564,728✔
3811
    case TAOS_SYNC_STATE_ERROR:
×
3812
      return "error";
×
3813
    case TAOS_SYNC_STATE_OFFLINE:
3,550✔
3814
      return "offline";
3,550✔
3815
    case TAOS_SYNC_STATE_LEARNER:
7,891✔
3816
      return "learner";
7,891✔
3817
    case TAOS_SYNC_STATE_ASSIGNED_LEADER:
×
3818
      return "assigned leader";
×
3819
    default:
×
3820
      return "unknown";
×
3821
  }
3822
}
3823

3824
int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) {
1,654✔
3825
  for (int32_t i = 0; i < pNewCfg->totalReplicaNum; ++i) {
1,906!
3826
    SRaftId raftId = {
1,906✔
3827
        .addr = SYNC_ADDR(&pNewCfg->nodeInfo[i]),
1,906✔
3828
        .vgId = ths->vgId,
1,906✔
3829
    };
3830

3831
    if (syncUtilSameId(&(ths->myRaftId), &raftId)) {
1,906✔
3832
      pNewCfg->myIndex = i;
1,654✔
3833
      return 0;
1,654✔
3834
    }
3835
  }
3836

3837
  return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3838
}
3839

3840
bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) {
10,276,281✔
3841
  return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1);
10,276,281!
3842
}
3843

3844
bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId) {
2,388,990✔
3845
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
3,925,508✔
3846
    if (syncUtilSameId(&((ths->replicasId)[i]), pRaftId)) {
3,925,501✔
3847
      return true;
2,388,982✔
3848
    }
3849
  }
3850
  return false;
7✔
3851
}
3852

3853
SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId) {
32,316✔
3854
  SSyncSnapshotSender* pSender = NULL;
32,316✔
3855
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
123,708✔
3856
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
91,362✔
3857
      pSender = (ths->senders)[i];
32,363✔
3858
    }
3859
  }
3860
  return pSender;
32,346✔
3861
}
3862

3863
SSyncTimer* syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId) {
26,374✔
3864
  SSyncTimer* pTimer = NULL;
26,374✔
3865
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
110,903✔
3866
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
84,530✔
3867
      pTimer = &((ths->peerHeartbeatTimerArr)[i]);
26,372✔
3868
    }
3869
  }
3870
  return pTimer;
26,373✔
3871
}
3872

3873
SPeerState* syncNodeGetPeerState(SSyncNode* ths, const SRaftId* pDestId) {
19,051✔
3874
  SPeerState* pState = NULL;
19,051✔
3875
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
41,274✔
3876
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
22,224✔
3877
      pState = &((ths->peerStates)[i]);
19,050✔
3878
    }
3879
  }
3880
  return pState;
19,050✔
3881
}
3882

3883
#ifdef BUILD_NO_CALL
3884
bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const SyncAppendEntries* pMsg) {
3885
  SPeerState* pState = syncNodeGetPeerState(ths, pDestId);
3886
  if (pState == NULL) {
3887
    sError("vgId:%d, replica maybe dropped", ths->vgId);
3888
    return false;
3889
  }
3890

3891
  SyncIndex sendIndex = pMsg->prevLogIndex + 1;
3892
  int64_t   tsNow = taosGetTimestampMs();
3893

3894
  if (pState->lastSendIndex == sendIndex && tsNow - pState->lastSendTime < SYNC_APPEND_ENTRIES_TIMEOUT_MS) {
3895
    return false;
3896
  }
3897

3898
  return true;
3899
}
3900

3901
bool syncNodeCanChange(SSyncNode* pSyncNode) {
3902
  if (pSyncNode->changing) {
3903
    sError("sync cannot change");
3904
    return false;
3905
  }
3906

3907
  if ((pSyncNode->commitIndex >= SYNC_INDEX_BEGIN)) {
3908
    SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
3909
    if (pSyncNode->commitIndex != lastIndex) {
3910
      sError("sync cannot change2");
3911
      return false;
3912
    }
3913
  }
3914

3915
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
3916
    SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->peersId)[i]);
3917
    if (pSender != NULL && pSender->start) {
3918
      sError("sync cannot change3");
3919
      return false;
3920
    }
3921
  }
3922

3923
  return true;
3924
}
3925
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc