• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3546

03 Dec 2024 10:02AM UTC coverage: 60.691% (-0.1%) from 60.839%
#3546

push

travis-ci

web-flow
Merge pull request #29015 from taosdata/fix/TS-5668

[TS-5668] fix(keeper): fix endpoint value too long for column/tag and eliminate warnings

120577 of 253823 branches covered (47.5%)

Branch coverage included in aggregate %.

201666 of 277134 relevant lines covered (72.77%)

18719900.08 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

51.35
/source/libs/sync/src/syncMain.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "sync.h"
18
#include "syncAppendEntries.h"
19
#include "syncAppendEntriesReply.h"
20
#include "syncCommit.h"
21
#include "syncElection.h"
22
#include "syncEnv.h"
23
#include "syncIndexMgr.h"
24
#include "syncInt.h"
25
#include "syncMessage.h"
26
#include "syncPipeline.h"
27
#include "syncRaftCfg.h"
28
#include "syncRaftLog.h"
29
#include "syncRaftStore.h"
30
#include "syncReplication.h"
31
#include "syncRequestVote.h"
32
#include "syncRequestVoteReply.h"
33
#include "syncRespMgr.h"
34
#include "syncSnapshot.h"
35
#include "syncTimeout.h"
36
#include "syncUtil.h"
37
#include "syncVoteMgr.h"
38
#include "tglobal.h"
39
#include "tmisce.h"
40
#include "tref.h"
41

42
static void    syncNodeEqPingTimer(void* param, void* tmrId);
43
static void    syncNodeEqElectTimer(void* param, void* tmrId);
44
static void    syncNodeEqHeartbeatTimer(void* param, void* tmrId);
45
static int32_t syncNodeAppendNoop(SSyncNode* ths);
46
static void    syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId);
47
static bool    syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg);
48
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId);
49
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
50
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
51
static int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg);
52
static bool    syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config);
53
static int32_t syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex);
54
static bool    syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg);
55

56
static bool    syncNodeCanChange(SSyncNode* pSyncNode);
57
static int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode);
58
static int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader);
59
static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry);
60

61
static ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
62

63
int64_t syncOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
15,905✔
64
  sInfo("vgId:%d, start to open sync", pSyncInfo->vgId);
15,905!
65
  SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo, vnodeVersion);
15,905✔
66
  if (pSyncNode == NULL) {
15,906!
67
    sError("vgId:%d, failed to open sync node", pSyncInfo->vgId);
×
68
    return -1;
×
69
  }
70

71
  pSyncNode->rid = syncNodeAdd(pSyncNode);
15,906✔
72
  if (pSyncNode->rid < 0) {
15,906!
73
    syncNodeClose(pSyncNode);
×
74
    return -1;
×
75
  }
76

77
  pSyncNode->pingBaseLine = pSyncInfo->pingMs;
15,906✔
78
  pSyncNode->pingTimerMS = pSyncInfo->pingMs;
15,906✔
79
  pSyncNode->electBaseLine = pSyncInfo->electMs;
15,906✔
80
  pSyncNode->hbBaseLine = pSyncInfo->heartbeatMs;
15,906✔
81
  pSyncNode->heartbeatTimerMS = pSyncInfo->heartbeatMs;
15,906✔
82
  pSyncNode->msgcb = pSyncInfo->msgcb;
15,906✔
83
  sInfo("vgId:%d, sync opened", pSyncInfo->vgId);
15,906!
84
  return pSyncNode->rid;
15,906✔
85
}
86

87
int32_t syncStart(int64_t rid) {
15,905✔
88
  int32_t    code = 0;
15,905✔
89
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
15,905✔
90
  if (pSyncNode == NULL) {
15,904!
91
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
92
    if (terrno != 0) code = terrno;
×
93
    sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid);
×
94
    TAOS_RETURN(code);
×
95
  }
96
  sInfo("vgId:%d, begin to start sync", pSyncNode->vgId);
15,904✔
97

98
  if ((code = syncNodeRestore(pSyncNode)) < 0) {
15,906!
99
    sError("vgId:%d, failed to restore sync log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
100
    goto _err;
×
101
  }
102

103
  if ((code = syncNodeStart(pSyncNode)) < 0) {
15,905!
104
    sError("vgId:%d, failed to start sync node since %s", pSyncNode->vgId, tstrerror(code));
×
105
    goto _err;
×
106
  }
107

108
  syncNodeRelease(pSyncNode);
15,905✔
109

110
  sInfo("vgId:%d, sync started", pSyncNode->vgId);
15,905!
111

112
  TAOS_RETURN(code);
15,905✔
113

114
_err:
×
115
  syncNodeRelease(pSyncNode);
×
116
  TAOS_RETURN(code);
×
117
}
118

119
int32_t syncNodeGetConfig(int64_t rid, SSyncCfg* cfg) {
31,722✔
120
  int32_t    code = 0;
31,722✔
121
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
31,722✔
122

123
  if (pSyncNode == NULL) {
31,723!
124
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
125
    if (terrno != 0) code = terrno;
×
126
    sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid);
×
127
    TAOS_RETURN(code);
×
128
  }
129

130
  *cfg = pSyncNode->raftCfg.cfg;
31,723✔
131

132
  syncNodeRelease(pSyncNode);
31,723✔
133

134
  return 0;
31,722✔
135
}
136

137
void syncStop(int64_t rid) {
15,905✔
138
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
15,905✔
139
  if (pSyncNode != NULL) {
15,905!
140
    pSyncNode->isStart = false;
15,905✔
141
    syncNodeRelease(pSyncNode);
15,905✔
142
    syncNodeRemove(rid);
15,905✔
143
  }
144
}
15,904✔
145

146
void syncPreStop(int64_t rid) {
15,905✔
147
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
15,905✔
148
  if (pSyncNode != NULL) {
15,905!
149
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
15,905!
150
      sInfo("vgId:%d, stop snapshot receiver", pSyncNode->vgId);
×
151
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
152
    }
153
    syncNodePreClose(pSyncNode);
15,905✔
154
    syncNodeRelease(pSyncNode);
15,904✔
155
  }
156
}
15,905✔
157

158
void syncPostStop(int64_t rid) {
13,892✔
159
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
13,892✔
160
  if (pSyncNode != NULL) {
13,891!
161
    syncNodePostClose(pSyncNode);
13,891✔
162
    syncNodeRelease(pSyncNode);
13,892✔
163
  }
164
}
13,890✔
165

166
static bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pCfg) {
1,881✔
167
  if (!syncNodeInConfig(pSyncNode, pCfg)) return false;
1,881!
168
  return abs(pCfg->replicaNum - pSyncNode->replicaNum) <= 1;
1,881!
169
}
170

171
int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) {
1,991✔
172
  int32_t    code = 0;
1,991✔
173
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
1,991✔
174
  if (pSyncNode == NULL) {
1,991!
175
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
176
    if (terrno != 0) code = terrno;
×
177
    TAOS_RETURN(code);
×
178
  }
179

180
  if (pSyncNode->raftCfg.lastConfigIndex >= pNewCfg->lastIndex) {
1,991✔
181
    syncNodeRelease(pSyncNode);
110✔
182
    sInfo("vgId:%d, no need Reconfig, current index:%" PRId64 ", new index:%" PRId64, pSyncNode->vgId,
110!
183
          pSyncNode->raftCfg.lastConfigIndex, pNewCfg->lastIndex);
184
    return 0;
110✔
185
  }
186

187
  if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) {
1,881!
188
    syncNodeRelease(pSyncNode);
×
189
    code = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
×
190
    sError("vgId:%d, failed to reconfig since invalid new config", pSyncNode->vgId);
×
191
    TAOS_RETURN(code);
×
192
  }
193

194
  TAOS_CHECK_RETURN(syncNodeUpdateNewConfigIndex(pSyncNode, pNewCfg));
1,881!
195

196
  if (syncNodeDoConfigChange(pSyncNode, pNewCfg, pNewCfg->lastIndex) != 0) {
1,881!
197
    code = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
×
198
    sError("vgId:%d, failed to reconfig since do change error", pSyncNode->vgId);
×
199
    TAOS_RETURN(code);
×
200
  }
201

202
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER || pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
1,881!
203
    // TODO check return value
204
    TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
1,693!
205

206
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
27,088✔
207
      TAOS_CHECK_RETURN(syncHbTimerInit(pSyncNode, &pSyncNode->peerHeartbeatTimerArr[i], pSyncNode->replicasId[i]));
25,395!
208
    }
209

210
    TAOS_CHECK_RETURN(syncNodeStartHeartbeatTimer(pSyncNode));
1,693!
211
    // syncNodeReplicate(pSyncNode);
212
  }
213

214
  syncNodeRelease(pSyncNode);
1,881✔
215
  TAOS_RETURN(code);
1,881✔
216
}
217

218
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
7,288,777✔
219
  int32_t code = -1;
7,288,777✔
220
  if (!syncIsInit()) {
7,288,777!
221
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
222
    if (terrno != 0) code = terrno;
×
223
    TAOS_RETURN(code);
×
224
  }
225

226
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
7,288,744✔
227
  if (pSyncNode == NULL) {
7,289,231✔
228
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
276✔
229
    if (terrno != 0) code = terrno;
276!
230
    TAOS_RETURN(code);
×
231
  }
232

233
  switch (pMsg->msgType) {
7,288,955!
234
    case TDMT_SYNC_HEARTBEAT:
43,732✔
235
      code = syncNodeOnHeartbeat(pSyncNode, pMsg);
43,732✔
236
      break;
43,732✔
237
    case TDMT_SYNC_HEARTBEAT_REPLY:
43,562✔
238
      code = syncNodeOnHeartbeatReply(pSyncNode, pMsg);
43,562✔
239
      break;
43,562✔
240
    case TDMT_SYNC_TIMEOUT:
204,799✔
241
      code = syncNodeOnTimeout(pSyncNode, pMsg);
204,799✔
242
      break;
204,826✔
243
    case TDMT_SYNC_TIMEOUT_ELECTION:
1,330✔
244
      code = syncNodeOnTimeout(pSyncNode, pMsg);
1,330✔
245
      break;
1,330✔
246
    case TDMT_SYNC_CLIENT_REQUEST:
263,129✔
247
      code = syncNodeOnClientRequest(pSyncNode, pMsg, NULL);
263,129✔
248
      break;
263,131✔
249
    case TDMT_SYNC_REQUEST_VOTE:
2,238✔
250
      code = syncNodeOnRequestVote(pSyncNode, pMsg);
2,238✔
251
      break;
2,238✔
252
    case TDMT_SYNC_REQUEST_VOTE_REPLY:
2,138✔
253
      code = syncNodeOnRequestVoteReply(pSyncNode, pMsg);
2,138✔
254
      break;
2,138✔
255
    case TDMT_SYNC_APPEND_ENTRIES:
3,285,710✔
256
      code = syncNodeOnAppendEntries(pSyncNode, pMsg);
3,285,710✔
257
      break;
3,285,714✔
258
    case TDMT_SYNC_APPEND_ENTRIES_REPLY:
3,284,789✔
259
      code = syncNodeOnAppendEntriesReply(pSyncNode, pMsg);
3,284,789✔
260
      break;
3,284,791✔
261
    case TDMT_SYNC_SNAPSHOT_SEND:
56,929✔
262
      code = syncNodeOnSnapshot(pSyncNode, pMsg);
56,929✔
263
      break;
56,929✔
264
    case TDMT_SYNC_SNAPSHOT_RSP:
57,037✔
265
      code = syncNodeOnSnapshotRsp(pSyncNode, pMsg);
57,037✔
266
      break;
57,037✔
267
    case TDMT_SYNC_LOCAL_CMD:
43,557✔
268
      code = syncNodeOnLocalCmd(pSyncNode, pMsg);
43,557✔
269
      break;
43,557✔
270
    case TDMT_SYNC_FORCE_FOLLOWER:
5✔
271
      code = syncForceBecomeFollower(pSyncNode, pMsg);
5✔
272
      break;
5✔
273
    case TDMT_SYNC_SET_ASSIGNED_LEADER:
×
274
      code = syncBecomeAssignedLeader(pSyncNode, pMsg);
×
275
      break;
×
276
    default:
×
277
      code = TSDB_CODE_MSG_NOT_PROCESSED;
×
278
  }
279

280
  syncNodeRelease(pSyncNode);
7,288,990✔
281
  if (code != 0) {
7,288,951✔
282
    sDebug("vgId:%d, failed to process sync msg:%p type:%s since %s", pSyncNode->vgId, pMsg, TMSG_INFO(pMsg->msgType),
41!
283
           tstrerror(code));
284
  }
285
  TAOS_RETURN(code);
7,288,951✔
286
}
287

288
int32_t syncLeaderTransfer(int64_t rid) {
15,905✔
289
  int32_t    code = 0;
15,905✔
290
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
15,905✔
291
  if (pSyncNode == NULL) {
15,905!
292
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
293
    if (terrno != 0) code = terrno;
×
294
    TAOS_RETURN(code);
×
295
  }
296

297
  int32_t ret = syncNodeLeaderTransfer(pSyncNode);
15,905✔
298
  syncNodeRelease(pSyncNode);
15,904✔
299
  return ret;
15,905✔
300
}
301

302
int32_t syncForceBecomeFollower(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
5✔
303
  SRaftId id = {0};
5✔
304
  syncNodeBecomeFollower(ths, id, "force election");
5✔
305

306
  SRpcMsg rsp = {
5✔
307
      .code = 0,
308
      .pCont = pRpcMsg->info.rsp,
5✔
309
      .contLen = pRpcMsg->info.rspLen,
5✔
310
      .info = pRpcMsg->info,
311
  };
312
  tmsgSendRsp(&rsp);
5✔
313

314
  return 0;
5✔
315
}
316

317
int32_t syncBecomeAssignedLeader(SSyncNode* ths, SRpcMsg* pRpcMsg) {
×
318
  int32_t code = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
×
319
  void*   pHead = NULL;
×
320
  int32_t contLen = 0;
×
321

322
  SVArbSetAssignedLeaderReq req = {0};
×
323
  if (tDeserializeSVArbSetAssignedLeaderReq((char*)pRpcMsg->pCont + sizeof(SMsgHead), pRpcMsg->contLen, &req) != 0) {
×
324
    sError("vgId:%d, failed to deserialize SVArbSetAssignedLeaderReq", ths->vgId);
×
325
    code = TSDB_CODE_INVALID_MSG;
×
326
    goto _OVER;
×
327
  }
328

329
  if (ths->arbTerm > req.arbTerm) {
×
330
    sInfo("vgId:%d, skip to set assigned leader, msg with lower term, local:%" PRId64 "msg:%" PRId64, ths->vgId,
×
331
          ths->arbTerm, req.arbTerm);
332
    goto _OVER;
×
333
  }
334

335
  ths->arbTerm = TMAX(req.arbTerm, ths->arbTerm);
×
336

337
  if (strncmp(req.memberToken, ths->arbToken, TSDB_ARB_TOKEN_SIZE) != 0) {
×
338
    sInfo("vgId:%d, skip to set assigned leader, token mismatch, local:%s, msg:%s", ths->vgId, ths->arbToken,
×
339
          req.memberToken);
340
    goto _OVER;
×
341
  }
342

343
  if (ths->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
×
344
    code = TSDB_CODE_SUCCESS;
×
345
    raftStoreNextTerm(ths);
×
346
    if (terrno != TSDB_CODE_SUCCESS) {
×
347
      code = terrno;
×
348
      sError("vgId:%d, failed to set next term since:%s", ths->vgId, tstrerror(code));
×
349
      goto _OVER;
×
350
    }
351
    syncNodeBecomeAssignedLeader(ths);
×
352

353
    if ((code = syncNodeAppendNoop(ths)) < 0) {
×
354
      sError("vgId:%d, assigned leader failed to append noop entry since %s", ths->vgId, tstrerror(code));
×
355
    }
356
  }
357

358
  SVArbSetAssignedLeaderRsp rsp = {0};
×
359
  rsp.arbToken = req.arbToken;
×
360
  rsp.memberToken = req.memberToken;
×
361
  rsp.vgId = ths->vgId;
×
362

363
  contLen = tSerializeSVArbSetAssignedLeaderRsp(NULL, 0, &rsp);
×
364
  if (contLen <= 0) {
×
365
    code = TSDB_CODE_OUT_OF_MEMORY;
×
366
    sError("vgId:%d, failed to serialize SVArbSetAssignedLeaderRsp", ths->vgId);
×
367
    goto _OVER;
×
368
  }
369
  pHead = rpcMallocCont(contLen);
×
370
  if (!pHead) {
×
371
    code = terrno;
×
372
    sError("vgId:%d, failed to malloc memory for SVArbSetAssignedLeaderRsp", ths->vgId);
×
373
    goto _OVER;
×
374
  }
375
  if (tSerializeSVArbSetAssignedLeaderRsp(pHead, contLen, &rsp) <= 0) {
×
376
    code = TSDB_CODE_OUT_OF_MEMORY;
×
377
    sError("vgId:%d, failed to serialize SVArbSetAssignedLeaderRsp", ths->vgId);
×
378
    rpcFreeCont(pHead);
×
379
    goto _OVER;
×
380
  }
381

382
  code = TSDB_CODE_SUCCESS;
×
383

384
_OVER:;
×
385
  SRpcMsg rspMsg = {
×
386
      .code = code,
387
      .pCont = pHead,
388
      .contLen = contLen,
389
      .info = pRpcMsg->info,
390
  };
391

392
  tmsgSendRsp(&rspMsg);
×
393

394
  tFreeSVArbSetAssignedLeaderReq(&req);
×
395
  TAOS_RETURN(code);
×
396
}
397

398
int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) {
×
399
  int32_t    code = 0;
×
400
  SSyncNode* pNode = syncNodeAcquire(rid);
×
401
  if (pNode == NULL) {
×
402
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
403
    if (terrno != 0) code = terrno;
×
404
    TAOS_RETURN(code);
×
405
  }
406

407
  SRpcMsg rpcMsg = {0, .info.notFreeAhandle = 1};
×
408
  int32_t ret = syncRespMgrGetAndDel(pNode->pSyncRespMgr, seq, &rpcMsg.info);
×
409
  rpcMsg.code = TSDB_CODE_SYN_TIMEOUT;
×
410

411
  syncNodeRelease(pNode);
×
412
  if (ret == 1) {
×
413
    sInfo("send timeout response, seq:%" PRId64 " handle:%p ahandle:%p", seq, rpcMsg.info.handle, rpcMsg.info.ahandle);
×
414
    code = rpcSendResponse(&rpcMsg);
×
415
    return code;
×
416
  } else {
417
    sError("no message handle to send timeout response, seq:%" PRId64, seq);
×
418
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
419
  }
420
}
421

422
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) {
47,485✔
423
  SyncIndex minMatchIndex = SYNC_INDEX_INVALID;
47,485✔
424

425
  if (pSyncNode->peersNum > 0) {
47,485!
426
    minMatchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
47,485✔
427
  }
428

429
  for (int32_t i = 1; i < pSyncNode->peersNum; ++i) {
93,559✔
430
    SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
46,074✔
431
    if (matchIndex < minMatchIndex) {
46,074✔
432
      minMatchIndex = matchIndex;
3,422✔
433
    }
434
  }
435
  return minMatchIndex;
47,485✔
436
}
437

438
static SyncIndex syncLogRetentionIndex(SSyncNode* pSyncNode, int64_t bytes) {
1,969✔
439
  return pSyncNode->pLogStore->syncLogIndexRetention(pSyncNode->pLogStore, bytes);
1,969✔
440
}
441

442
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
35,246✔
443
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
35,246✔
444
  int32_t    code = 0;
35,246✔
445
  if (pSyncNode == NULL) {
35,246✔
446
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
17✔
447
    if (terrno != 0) code = terrno;
17!
448
    sError("sync begin snapshot error");
17!
449
    TAOS_RETURN(code);
17✔
450
  }
451

452
  SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
35,229✔
453
  SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
35,228✔
454
  bool      isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);
35,228✔
455

456
  if (isEmpty || !(lastApplyIndex >= beginIndex && lastApplyIndex <= endIndex)) {
35,227!
457
    sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", empty:%d, do not delete wal", lastApplyIndex, isEmpty);
101!
458
    syncNodeRelease(pSyncNode);
101✔
459
    return 0;
101✔
460
  }
461

462
  int64_t logRetention = 0;
35,126✔
463

464
  if (syncNodeIsMnode(pSyncNode)) {
35,126✔
465
    // mnode
466
    logRetention = tsMndLogRetention;
3,504✔
467
  } else {
468
    // vnode
469
    if (pSyncNode->replicaNum > 1) {
31,622✔
470
      logRetention = SYNC_VNODE_LOG_RETENTION;
1,438✔
471
    }
472
  }
473

474
  if (pSyncNode->totalReplicaNum > 1) {
35,126✔
475
    if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER &&
1,988✔
476
        pSyncNode->state != TAOS_SYNC_STATE_LEARNER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
343!
477
      sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " candidate or unknown state, do not delete wal",
19!
478
              lastApplyIndex);
479
      syncNodeRelease(pSyncNode);
19✔
480
      return 0;
19✔
481
    }
482
    SyncIndex retentionIndex =
1,969✔
483
        TMAX(pSyncNode->minMatchIndex, syncLogRetentionIndex(pSyncNode, SYNC_WAL_LOG_RETENTION_SIZE));
1,969✔
484
    logRetention += TMAX(0, lastApplyIndex - retentionIndex);
1,969✔
485
  }
486

487
_DEL_WAL:
33,138✔
488

489
  do {
490
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
35,107✔
491
    SyncIndex          snapshotVer = walGetSnapshotVer(pData->pWal);
35,107✔
492
    SyncIndex          walCommitVer = walGetCommittedVer(pData->pWal);
35,107✔
493
    SyncIndex          wallastVer = walGetLastVer(pData->pWal);
35,107✔
494
    if (lastApplyIndex <= walCommitVer) {
35,107!
495
      SyncIndex snapshottingIndex = atomic_load_64(&pSyncNode->snapshottingIndex);
35,107✔
496

497
      if (snapshottingIndex == SYNC_INDEX_INVALID) {
35,107!
498
        atomic_store_64(&pSyncNode->snapshottingIndex, lastApplyIndex);
35,107✔
499
        pSyncNode->snapshottingTime = taosGetTimestampMs();
35,109✔
500

501
        code = walBeginSnapshot(pData->pWal, lastApplyIndex, logRetention);
35,109✔
502
        if (code == 0) {
35,109!
503
          sNTrace(pSyncNode, "wal snapshot begin, index:%" PRId64 ", last apply index:%" PRId64,
35,109✔
504
                  pSyncNode->snapshottingIndex, lastApplyIndex);
505
        } else {
506
          sNError(pSyncNode, "wal snapshot begin error since:%s, index:%" PRId64 ", last apply index:%" PRId64,
×
507
                  terrstr(), pSyncNode->snapshottingIndex, lastApplyIndex);
508
          atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
×
509
        }
510

511
      } else {
512
        sNTrace(pSyncNode, "snapshotting for %" PRId64 ", do not delete wal for new-snapshot-index:%" PRId64,
×
513
                snapshottingIndex, lastApplyIndex);
514
      }
515
    }
516
  } while (0);
517

518
  syncNodeRelease(pSyncNode);
35,109✔
519
  TAOS_RETURN(code);
35,109✔
520
}
521

522
int32_t syncEndSnapshot(int64_t rid) {
35,227✔
523
  int32_t    code = 0;
35,227✔
524
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
35,227✔
525
  if (pSyncNode == NULL) {
35,228!
526
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
527
    if (terrno != 0) code = terrno;
×
528
    sError("sync end snapshot error");
×
529
    TAOS_RETURN(code);
×
530
  }
531

532
  if (atomic_load_64(&pSyncNode->snapshottingIndex) != SYNC_INDEX_INVALID) {
35,228✔
533
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
35,108✔
534
    code = walEndSnapshot(pData->pWal);
35,108✔
535
    if (code != 0) {
35,108!
536
      sNError(pSyncNode, "wal snapshot end error since:%s", tstrerror(code));
×
537
      syncNodeRelease(pSyncNode);
×
538
      TAOS_RETURN(code);
×
539
    } else {
540
      sNTrace(pSyncNode, "wal snapshot end, index:%" PRId64, atomic_load_64(&pSyncNode->snapshottingIndex));
35,108✔
541
      atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
35,108✔
542
    }
543
  }
544

545
  syncNodeRelease(pSyncNode);
35,228✔
546
  TAOS_RETURN(code);
35,228✔
547
}
548

549
bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) {
15,316,151✔
550
  if (pSyncNode == NULL) {
15,316,151!
551
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
552
    sError("sync ready for read error");
×
553
    return false;
×
554
  }
555

556
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
15,316,151!
557
    terrno = TSDB_CODE_SYN_NOT_LEADER;
441,065✔
558
    return false;
441,065✔
559
  }
560

561
  if (!pSyncNode->restoreFinish) {
14,875,086✔
562
    terrno = TSDB_CODE_SYN_RESTORING;
49,232✔
563
    return false;
49,232✔
564
  }
565

566
  return true;
14,825,854✔
567
}
568

569
bool syncIsReadyForRead(int64_t rid) {
13,249,089✔
570
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
13,249,089✔
571
  if (pSyncNode == NULL) {
13,255,723!
572
    sError("sync ready for read error");
×
573
    return false;
×
574
  }
575

576
  bool ready = syncNodeIsReadyForRead(pSyncNode);
13,255,723✔
577

578
  syncNodeRelease(pSyncNode);
13,256,223✔
579
  return ready;
13,252,864✔
580
}
581

582
#ifdef BUILD_NO_CALL
583
bool syncSnapshotSending(int64_t rid) {
584
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
585
  if (pSyncNode == NULL) {
586
    return false;
587
  }
588

589
  bool b = syncNodeSnapshotSending(pSyncNode);
590
  syncNodeRelease(pSyncNode);
591
  return b;
592
}
593

594
bool syncSnapshotRecving(int64_t rid) {
595
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
596
  if (pSyncNode == NULL) {
597
    return false;
598
  }
599

600
  bool b = syncNodeSnapshotRecving(pSyncNode);
601
  syncNodeRelease(pSyncNode);
602
  return b;
603
}
604
#endif
605

606
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) {
15,905✔
607
  if (pSyncNode->peersNum == 0) {
15,905✔
608
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
12,267✔
609
    return 0;
12,267✔
610
  }
611

612
  int32_t ret = 0;
3,638✔
613
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && pSyncNode->replicaNum > 1) {
3,638✔
614
    SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0];
1,074✔
615
    if (pSyncNode->peersNum == 2) {
1,074✔
616
      SyncIndex matchIndex0 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
726✔
617
      SyncIndex matchIndex1 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[1]));
726✔
618
      if (matchIndex1 > matchIndex0) {
726✔
619
        newLeader = (pSyncNode->peersNodeInfo)[1];
30✔
620
      }
621
    }
622
    ret = syncNodeLeaderTransferTo(pSyncNode, newLeader);
1,074✔
623
  }
624

625
  return ret;
3,638✔
626
}
627

628
int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) {
1,074✔
629
  if (pSyncNode->replicaNum == 1) {
1,074!
630
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
×
631
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
632
  }
633

634
  sNTrace(pSyncNode, "begin leader transfer to %s:%u", newLeader.nodeFqdn, newLeader.nodePort);
1,074!
635

636
  SRpcMsg rpcMsg = {0};
1,074✔
637
  TAOS_CHECK_RETURN(syncBuildLeaderTransfer(&rpcMsg, pSyncNode->vgId));
1,074!
638

639
  SyncLeaderTransfer* pMsg = rpcMsg.pCont;
1,074✔
640
  pMsg->newLeaderId.addr = SYNC_ADDR(&newLeader);
1,074✔
641
  pMsg->newLeaderId.vgId = pSyncNode->vgId;
1,074✔
642
  pMsg->newNodeInfo = newLeader;
1,074✔
643

644
  int32_t ret = syncNodePropose(pSyncNode, &rpcMsg, false, NULL);
1,074✔
645
  rpcFreeCont(rpcMsg.pCont);
1,074✔
646
  return ret;
1,074✔
647
}
648

649
SSyncState syncGetState(int64_t rid) {
5,651,031✔
650
  SSyncState state = {.state = TAOS_SYNC_STATE_ERROR};
5,651,031✔
651

652
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
5,651,031✔
653
  if (pSyncNode != NULL) {
5,651,879✔
654
    state.state = pSyncNode->state;
5,651,825✔
655
    state.roleTimeMs = pSyncNode->roleTimeMs;
5,651,825✔
656
    state.startTimeMs = pSyncNode->startTime;
5,651,825✔
657
    state.restored = pSyncNode->restoreFinish;
5,651,825✔
658
    if (pSyncNode->vgId != 1) {
5,651,825✔
659
      state.canRead = syncNodeIsReadyForRead(pSyncNode);
2,063,540✔
660
    } else {
661
      state.canRead = state.restored;
3,588,285✔
662
    }
663
    /*
664
    double progress = 0;
665
    if(pSyncNode->pLogBuf->totalIndex > 0 && pSyncNode->pLogBuf->commitIndex > 0){
666
      progress = (double)pSyncNode->pLogBuf->commitIndex/(double)pSyncNode->pLogBuf->totalIndex;
667
      state.progress = (int32_t)(progress * 100);
668
    }
669
    else{
670
      state.progress = -1;
671
    }
672
    sDebug("vgId:%d, learner progress state, commitIndex:%" PRId64 " totalIndex:%" PRId64 ", "
673
            "progress:%lf, progress:%d",
674
          pSyncNode->vgId,
675
         pSyncNode->pLogBuf->commitIndex, pSyncNode->pLogBuf->totalIndex, progress, state.progress);
676
    */
677
    state.term = raftStoreGetTerm(pSyncNode);
5,651,730✔
678
    syncNodeRelease(pSyncNode);
5,651,897✔
679
  }
680

681
  return state;
5,651,861✔
682
}
683

684
int32_t syncGetArbToken(int64_t rid, char* outToken) {
17,587✔
685
  int32_t    code = 0;
17,587✔
686
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
17,587✔
687
  if (pSyncNode == NULL) {
17,587!
688
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
689
    if (terrno != 0) code = terrno;
×
690
    TAOS_RETURN(code);
×
691
  }
692

693
  memset(outToken, 0, TSDB_ARB_TOKEN_SIZE);
17,587✔
694
  (void)taosThreadMutexLock(&pSyncNode->arbTokenMutex);
17,587✔
695
  strncpy(outToken, pSyncNode->arbToken, TSDB_ARB_TOKEN_SIZE);
17,587✔
696
  (void)taosThreadMutexUnlock(&pSyncNode->arbTokenMutex);
17,587✔
697

698
  syncNodeRelease(pSyncNode);
17,587✔
699
  TAOS_RETURN(code);
17,587✔
700
}
701

702
int32_t syncGetAssignedLogSynced(int64_t rid) {
×
703
  int32_t    code = 0;
×
704
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
×
705
  if (pSyncNode == NULL) {
×
706
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
707
    if (terrno != 0) code = terrno;
×
708
    TAOS_RETURN(code);
×
709
  }
710

711
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
×
712
    code = TSDB_CODE_VND_ARB_NOT_SYNCED;
×
713
    syncNodeRelease(pSyncNode);
×
714
    TAOS_RETURN(code);
×
715
  }
716

717
  bool isSync = pSyncNode->commitIndex >= pSyncNode->assignedCommitIndex;
×
718
  code = (isSync ? TSDB_CODE_SUCCESS : TSDB_CODE_VND_ARB_NOT_SYNCED);
×
719

720
  syncNodeRelease(pSyncNode);
×
721
  TAOS_RETURN(code);
×
722
}
723

724
int32_t syncUpdateArbTerm(int64_t rid, SyncTerm arbTerm) {
16✔
725
  int32_t    code = 0;
16✔
726
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
16✔
727
  if (pSyncNode == NULL) {
16!
728
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
729
    if (terrno != 0) code = terrno;
×
730
    TAOS_RETURN(code);
×
731
  }
732

733
  pSyncNode->arbTerm = TMAX(arbTerm, pSyncNode->arbTerm);
16✔
734
  syncNodeRelease(pSyncNode);
16✔
735
  TAOS_RETURN(code);
16✔
736
}
737

738
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex) {
3,829,799✔
739
  if (!(pSyncNode->raftCfg.configIndexCount >= 1)) {
3,829,799!
740
    sError("vgId:%d, failed get snapshot config index, configIndexCount:%d", pSyncNode->vgId,
×
741
           pSyncNode->raftCfg.configIndexCount);
742
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
743
    return -2;
×
744
  }
745
  SyncIndex lastIndex = (pSyncNode->raftCfg.configIndexArr)[0];
3,829,799✔
746

747
  for (int32_t i = 0; i < pSyncNode->raftCfg.configIndexCount; ++i) {
7,675,973✔
748
    if ((pSyncNode->raftCfg.configIndexArr)[i] > lastIndex &&
3,846,174✔
749
        (pSyncNode->raftCfg.configIndexArr)[i] <= snapshotLastApplyIndex) {
16,378!
750
      lastIndex = (pSyncNode->raftCfg.configIndexArr)[i];
16,378✔
751
    }
752
  }
753
  sTrace("vgId:%d, sync get last config index, index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId,
3,829,799✔
754
         snapshotLastApplyIndex, lastIndex);
755

756
  return lastIndex;
3,829,793✔
757
}
758

759
static SRaftId syncGetRaftIdByEp(SSyncNode* pSyncNode, const SEp* pEp) {
112,491✔
760
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
223,060✔
761
    if (strcmp(pEp->fqdn, (pSyncNode->peersNodeInfo)[i].nodeFqdn) == 0 &&
182,789!
762
        pEp->port == (pSyncNode->peersNodeInfo)[i].nodePort) {
182,789✔
763
      return pSyncNode->peersId[i];
72,220✔
764
    }
765
  }
766
  return EMPTY_RAFT_ID;
40,271✔
767
}
768

769
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
40,280✔
770
  pEpSet->numOfEps = 0;
40,280✔
771

772
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
40,280✔
773
  if (pSyncNode == NULL) return;
40,280!
774

775
  int index = -1;
40,280✔
776

777
  int j = 0;
40,280✔
778
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
153,663✔
779
    if (pSyncNode->raftCfg.cfg.nodeInfo[i].nodeRole == TAOS_SYNC_ROLE_LEARNER) continue;
113,383✔
780
    SEp* pEp = &pEpSet->eps[j];
112,491✔
781
    tstrncpy(pEp->fqdn, pSyncNode->raftCfg.cfg.nodeInfo[i].nodeFqdn, TSDB_FQDN_LEN);
112,491✔
782
    pEp->port = (pSyncNode->raftCfg.cfg.nodeInfo)[i].nodePort;
112,491✔
783
    pEpSet->numOfEps++;
112,491✔
784
    sDebug("vgId:%d, sync get retry epset, index:%d %s:%d", pSyncNode->vgId, i, pEp->fqdn, pEp->port);
112,491✔
785
    SRaftId id = syncGetRaftIdByEp(pSyncNode, pEp);
112,491✔
786
    if (id.addr == pSyncNode->leaderCache.addr && id.vgId == pSyncNode->leaderCache.vgId && id.addr != 0 &&
112,491!
787
        id.vgId != 0)
14,759!
788
      index = j;
14,759✔
789
    j++;
112,491✔
790
  }
791
  if (pEpSet->numOfEps > 0) {
40,280!
792
    if (index != -1) {
40,280✔
793
      pEpSet->inUse = index;
14,759✔
794
    } else {
795
      if (pSyncNode->myRaftId.addr == pSyncNode->leaderCache.addr &&
25,521✔
796
          pSyncNode->myRaftId.vgId == pSyncNode->leaderCache.vgId) {
1,147!
797
        pEpSet->inUse = pSyncNode->raftCfg.cfg.myIndex;
1,147✔
798
      } else {
799
        pEpSet->inUse = (pSyncNode->raftCfg.cfg.myIndex + 1) % pEpSet->numOfEps;
24,374✔
800
      }
801
    }
802
    // pEpSet->inUse = 0;
803
  }
804
  epsetSort(pEpSet);
40,280✔
805

806
  sInfo("vgId:%d, sync get retry epset numOfEps:%d inUse:%d", pSyncNode->vgId, pEpSet->numOfEps, pEpSet->inUse);
40,280!
807
  syncNodeRelease(pSyncNode);
40,280✔
808
}
809

810
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
10,533,889✔
811
  int32_t    code = 0;
10,533,889✔
812
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
10,533,889✔
813
  if (pSyncNode == NULL) {
10,534,136!
814
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
815
    if (terrno != 0) code = terrno;
×
816
    sError("sync propose error");
×
817
    TAOS_RETURN(code);
×
818
  }
819

820
  int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak, seq);
10,534,136✔
821
  syncNodeRelease(pSyncNode);
10,533,607✔
822
  return ret;
10,533,948✔
823
}
824

825
int32_t syncCheckMember(int64_t rid) {
×
826
  int32_t    code = 0;
×
827
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
×
828
  if (pSyncNode == NULL) {
×
829
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
830
    if (terrno != 0) code = terrno;
×
831
    sError("sync propose error");
×
832
    TAOS_RETURN(code);
×
833
  }
834

835
  if (pSyncNode->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_LEARNER) {
×
836
    syncNodeRelease(pSyncNode);
×
837
    return TSDB_CODE_SYN_WRONG_ROLE;
×
838
  }
839

840
  syncNodeRelease(pSyncNode);
×
841
  return 0;
×
842
}
843

844
int32_t syncIsCatchUp(int64_t rid) {
6,798✔
845
  int32_t    code = 0;
6,798✔
846
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
6,798✔
847
  if (pSyncNode == NULL) {
6,798!
848
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
849
    if (terrno != 0) code = terrno;
×
850
    sError("sync Node Acquire error since %d", errno);
×
851
    TAOS_RETURN(code);
×
852
  }
853

854
  int32_t isCatchUp = 0;
6,798✔
855
  if (pSyncNode->pLogBuf->totalIndex < 0 || pSyncNode->pLogBuf->commitIndex < 0 ||
6,798!
856
      pSyncNode->pLogBuf->totalIndex < pSyncNode->pLogBuf->commitIndex ||
1,888✔
857
      pSyncNode->pLogBuf->totalIndex - pSyncNode->pLogBuf->commitIndex > SYNC_LEARNER_CATCHUP) {
1,874✔
858
    sInfo("vgId:%d, Not catch up, wait one second, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64,
6,486!
859
          pSyncNode->vgId, pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex,
860
          pSyncNode->pLogBuf->matchIndex);
861
    isCatchUp = 0;
6,486✔
862
  } else {
863
    sInfo("vgId:%d, Catch up, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64, pSyncNode->vgId,
312!
864
          pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex, pSyncNode->pLogBuf->matchIndex);
865
    isCatchUp = 1;
312✔
866
  }
867

868
  syncNodeRelease(pSyncNode);
6,798✔
869
  return isCatchUp;
6,798✔
870
}
871

872
ESyncRole syncGetRole(int64_t rid) {
6,798✔
873
  int32_t    code = 0;
6,798✔
874
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
6,798✔
875
  if (pSyncNode == NULL) {
6,798!
876
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
877
    if (terrno != 0) code = terrno;
×
878
    sError("sync Node Acquire error since %d", errno);
×
879
    TAOS_RETURN(code);
×
880
  }
881

882
  ESyncRole role = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole;
6,798✔
883

884
  syncNodeRelease(pSyncNode);
6,798✔
885
  return role;
6,798✔
886
}
887

888
int64_t syncGetTerm(int64_t rid) {
5,667✔
889
  int32_t    code = 0;
5,667✔
890
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
5,667✔
891
  if (pSyncNode == NULL) {
5,667!
892
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
893
    if (terrno != 0) code = terrno;
×
894
    sError("sync Node Acquire error since %d", errno);
×
895
    TAOS_RETURN(code);
×
896
  }
897

898
  int64_t term = raftStoreGetTerm(pSyncNode);
5,667✔
899

900
  syncNodeRelease(pSyncNode);
5,667✔
901
  return term;
5,667✔
902
}
903

904
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
10,534,937✔
905
  int32_t code = 0;
10,534,937✔
906
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
10,534,937!
907
    code = TSDB_CODE_SYN_NOT_LEADER;
11,080✔
908
    sNWarn(pSyncNode, "sync propose not leader, type:%s", TMSG_INFO(pMsg->msgType));
11,080!
909
    TAOS_RETURN(code);
11,080✔
910
  }
911

912
  if (!pSyncNode->restoreFinish) {
10,523,857✔
913
    code = TSDB_CODE_SYN_PROPOSE_NOT_READY;
3✔
914
    sNWarn(pSyncNode, "failed to sync propose since not ready, type:%s, last:%" PRId64 ", cmt:%" PRId64,
3!
915
           TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
916
    TAOS_RETURN(code);
3✔
917
  }
918

919
  // heartbeat timeout
920
  if (pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER && syncNodeHeartbeatReplyTimeout(pSyncNode)) {
10,523,854!
921
    code = TSDB_CODE_SYN_PROPOSE_NOT_READY;
×
922
    sNError(pSyncNode, "failed to sync propose since heartbeat timeout, type:%s, last:%" PRId64 ", cmt:%" PRId64,
×
923
            TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
924
    TAOS_RETURN(code);
×
925
  }
926

927
  // optimized one replica
928
  if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) {
10,523,735✔
929
    SyncIndex retIndex;
930
    int32_t   code = syncNodeOnClientRequest(pSyncNode, pMsg, &retIndex);
10,259,564✔
931
    if (code >= 0) {
10,259,045!
932
      pMsg->info.conn.applyIndex = retIndex;
10,259,152✔
933
      pMsg->info.conn.applyTerm = raftStoreGetTerm(pSyncNode);
10,259,152✔
934

935
      // after raft member change, need to handle 1->2 switching point
936
      // at this point, need to switch entry handling thread
937
      if (pSyncNode->replicaNum == 1) {
10,259,644✔
938
        sTrace("vgId:%d, propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
10,259,633!
939
               TMSG_INFO(pMsg->msgType));
940
        return 1;
10,259,450✔
941
      } else {
942
        sTrace("vgId:%d, propose optimized msg, return to normal, index:%" PRId64
11!
943
               " type:%s, "
944
               "handle:%p",
945
               pSyncNode->vgId, retIndex, TMSG_INFO(pMsg->msgType), pMsg->info.handle);
946
        return 0;
×
947
      }
948
    } else {
949
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
950
      sError("vgId:%d, failed to propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
×
951
             TMSG_INFO(pMsg->msgType));
952
      TAOS_RETURN(code);
×
953
    }
954
  } else {
955
    SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
264,501✔
956
    uint64_t  seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);
264,507✔
957
    SRpcMsg   rpcMsg = {0};
264,514✔
958
    int32_t   code = syncBuildClientRequest(&rpcMsg, pMsg, seqNum, isWeak, pSyncNode->vgId);
264,514✔
959
    if (code != 0) {
264,510!
960
      sError("vgId:%d, failed to propose msg while serialize since %s", pSyncNode->vgId, terrstr());
×
961
      code = syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
×
962
      TAOS_RETURN(code);
×
963
    }
964

965
    sNTrace(pSyncNode, "propose msg, type:%s", TMSG_INFO(pMsg->msgType));
264,510!
966
    code = (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg);
264,510✔
967
    if (code != 0) {
264,509✔
968
      sWarn("vgId:%d, failed to propose msg while enqueue since %s", pSyncNode->vgId, terrstr());
1,380!
969
      TAOS_CHECK_RETURN(syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum));
1,383✔
970
    }
971

972
    if (seq != NULL) *seq = seqNum;
264,481✔
973
    TAOS_RETURN(code);
264,481✔
974
  }
975
}
976

977
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId) {
263,985✔
978
  pSyncTimer->pTimer = NULL;
263,985✔
979
  pSyncTimer->counter = 0;
263,985✔
980
  pSyncTimer->timerMS = pSyncNode->hbBaseLine;
263,985✔
981
  pSyncTimer->timerCb = syncNodeEqPeerHeartbeatTimer;
263,985✔
982
  pSyncTimer->destId = destId;
263,985✔
983
  pSyncTimer->timeStamp = taosGetTimestampMs();
263,985✔
984
  atomic_store_64(&pSyncTimer->logicClock, 0);
263,985✔
985
  return 0;
263,985✔
986
}
987

988
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
2,491✔
989
  int32_t code = 0;
2,491✔
990
  int64_t tsNow = taosGetTimestampMs();
2,491✔
991
  if (syncIsInit()) {
2,491!
992
    SSyncHbTimerData* pData = syncHbTimerDataAcquire(pSyncTimer->hbDataRid);
2,491✔
993
    if (pData == NULL) {
2,491!
994
      pData = taosMemoryMalloc(sizeof(SSyncHbTimerData));
2,491✔
995
      pData->rid = syncHbTimerDataAdd(pData);
2,491✔
996
    }
997
    pSyncTimer->hbDataRid = pData->rid;
2,491✔
998
    pSyncTimer->timeStamp = tsNow;
2,491✔
999

1000
    pData->syncNodeRid = pSyncNode->rid;
2,491✔
1001
    pData->pTimer = pSyncTimer;
2,491✔
1002
    pData->destId = pSyncTimer->destId;
2,491✔
1003
    pData->logicClock = pSyncTimer->logicClock;
2,491✔
1004
    pData->execTime = tsNow + pSyncTimer->timerMS;
2,491✔
1005

1006
    sTrace("vgId:%d, start hb timer, rid:%" PRId64 " addr:%" PRId64 " at %d", pSyncNode->vgId, pData->rid,
2,491!
1007
           pData->destId.addr, pSyncTimer->timerMS);
1008

1009
    bool stopped = taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS, (void*)(pData->rid), syncEnv()->pTimerManager,
2,491✔
1010
                                &pSyncTimer->pTimer);
2,491✔
1011
    if (stopped) {
2,491!
1012
      sError("vgId:%d, failed to reset hb timer success", pSyncNode->vgId);
×
1013
      return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1014
    }
1015
  } else {
1016
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1017
    sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId);
×
1018
  }
1019
  return code;
2,491✔
1020
}
1021

1022
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
25,637✔
1023
  int32_t ret = 0;
25,637✔
1024
  (void)atomic_add_fetch_64(&pSyncTimer->logicClock, 1);
25,637✔
1025
  bool stop = taosTmrStop(pSyncTimer->pTimer);
25,639✔
1026
  sDebug("vgId:%d, stop hb timer stop:%d", pSyncNode->vgId, stop);
25,638✔
1027
  pSyncTimer->pTimer = NULL;
25,638✔
1028
  syncHbTimerDataRemove(pSyncTimer->hbDataRid);
25,638✔
1029
  pSyncTimer->hbDataRid = -1;
25,638✔
1030
  return ret;
25,638✔
1031
}
1032

1033
int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) {
15,906✔
1034
  int32_t code = 0;
15,906✔
1035
  if (pNode->pLogStore == NULL) {
15,906!
1036
    sError("vgId:%d, log store not created", pNode->vgId);
×
1037
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1038
  }
1039
  if (pNode->pFsm == NULL) {
15,906!
1040
    sError("vgId:%d, pFsm not registered", pNode->vgId);
×
1041
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1042
  }
1043
  if (pNode->pFsm->FpGetSnapshotInfo == NULL) {
15,906!
1044
    sError("vgId:%d, FpGetSnapshotInfo not registered", pNode->vgId);
×
1045
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1046
  }
1047
  SSnapshot snapshot = {0};
15,906✔
1048
  // TODO check return value
1049
  (void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
15,906✔
1050

1051
  SyncIndex commitIndex = snapshot.lastApplyIndex;
15,906✔
1052
  SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
15,906✔
1053
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
15,906✔
1054
  if ((lastVer < commitIndex || firstVer > commitIndex + 1) || pNode->fsmState == SYNC_FSM_STATE_INCOMPLETE) {
15,906!
1055
    if ((code = pNode->pLogStore->syncLogRestoreFromSnapshot(pNode->pLogStore, commitIndex)) != 0) {
1!
1056
      sError("vgId:%d, failed to restore log store from snapshot since %s. lastVer:%" PRId64 ", snapshotVer:%" PRId64,
×
1057
             pNode->vgId, terrstr(), lastVer, commitIndex);
1058
      TAOS_RETURN(code);
×
1059
    }
1060
  }
1061
  TAOS_RETURN(code);
15,906✔
1062
}
1063

1064
// open/close --------------
1065
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
15,905✔
1066
  int32_t    code = 0;
15,905✔
1067
  SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode));
15,905✔
1068
  if (pSyncNode == NULL) {
15,905!
1069
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1070
    goto _error;
×
1071
  }
1072

1073
  if (!taosDirExist((char*)(pSyncInfo->path))) {
15,905✔
1074
    if (taosMkDir(pSyncInfo->path) != 0) {
12,838!
1075
      terrno = TAOS_SYSTEM_ERROR(errno);
×
1076
      sError("vgId:%d, failed to create dir:%s since %s", pSyncInfo->vgId, pSyncInfo->path, terrstr());
×
1077
      goto _error;
×
1078
    }
1079
  }
1080

1081
  memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
15,905✔
1082
  snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s%sraft_store.json", pSyncInfo->path,
15,905✔
1083
           TD_DIRSEP);
1084
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
15,905✔
1085

1086
  if (!taosCheckExistFile(pSyncNode->configPath)) {
15,905✔
1087
    // create a new raft config file
1088
    sInfo("vgId:%d, create a new raft config file", pSyncInfo->vgId);
12,836!
1089
    pSyncNode->vgId = pSyncInfo->vgId;
12,837✔
1090
    pSyncNode->raftCfg.isStandBy = pSyncInfo->isStandBy;
12,837✔
1091
    pSyncNode->raftCfg.snapshotStrategy = pSyncInfo->snapshotStrategy;
12,837✔
1092
    pSyncNode->raftCfg.lastConfigIndex = pSyncInfo->syncCfg.lastIndex;
12,837✔
1093
    pSyncNode->raftCfg.batchSize = pSyncInfo->batchSize;
12,837✔
1094
    pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
12,837✔
1095
    pSyncNode->raftCfg.configIndexCount = 1;
12,837✔
1096
    pSyncNode->raftCfg.configIndexArr[0] = -1;
12,837✔
1097

1098
    if ((code = syncWriteCfgFile(pSyncNode)) != 0) {
12,837!
1099
      terrno = code;
×
1100
      sError("vgId:%d, failed to create sync cfg file", pSyncNode->vgId);
×
1101
      goto _error;
×
1102
    }
1103
  } else {
1104
    // update syncCfg by raft_config.json
1105
    if ((code = syncReadCfgFile(pSyncNode)) != 0) {
3,068!
1106
      terrno = code;
×
1107
      sError("vgId:%d, failed to read sync cfg file", pSyncNode->vgId);
×
1108
      goto _error;
×
1109
    }
1110

1111
    if (vnodeVersion > pSyncNode->raftCfg.cfg.changeVersion) {
3,068✔
1112
      if (pSyncInfo->syncCfg.totalReplicaNum > 0 && syncIsConfigChanged(&pSyncNode->raftCfg.cfg, &pSyncInfo->syncCfg)) {
2,001!
1113
        sInfo("vgId:%d, use sync config from input options and write to cfg file", pSyncNode->vgId);
1,673!
1114
        pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
1,673✔
1115
        if ((code = syncWriteCfgFile(pSyncNode)) != 0) {
1,673!
1116
          terrno = code;
×
1117
          sError("vgId:%d, failed to write sync cfg file", pSyncNode->vgId);
×
1118
          goto _error;
×
1119
        }
1120
      } else {
1121
        sInfo("vgId:%d, use sync config from sync cfg file", pSyncNode->vgId);
328!
1122
        pSyncInfo->syncCfg = pSyncNode->raftCfg.cfg;
328✔
1123
      }
1124
    } else {
1125
      sInfo("vgId:%d, skip save sync cfg file since request ver:%d <= file ver:%d", pSyncNode->vgId, vnodeVersion,
1,067!
1126
            pSyncInfo->syncCfg.changeVersion);
1127
    }
1128
  }
1129

1130
  // init by SSyncInfo
1131
  pSyncNode->vgId = pSyncInfo->vgId;
15,906✔
1132
  SSyncCfg* pCfg = &pSyncNode->raftCfg.cfg;
15,906✔
1133
  bool      updated = false;
15,906✔
1134
  sInfo("vgId:%d, start to open sync node, totalReplicaNum:%d replicaNum:%d selfIndex:%d", pSyncNode->vgId,
15,906!
1135
        pCfg->totalReplicaNum, pCfg->replicaNum, pCfg->myIndex);
1136
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
38,901✔
1137
    SNodeInfo* pNode = &pCfg->nodeInfo[i];
22,995✔
1138
    if (tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort)) {
22,995!
1139
      updated = true;
×
1140
    }
1141
    sInfo("vgId:%d, index:%d ep:%s:%u dnode:%d cluster:%" PRId64, pSyncNode->vgId, i, pNode->nodeFqdn, pNode->nodePort,
22,995!
1142
          pNode->nodeId, pNode->clusterId);
1143
  }
1144

1145
  if (vnodeVersion > pSyncInfo->syncCfg.changeVersion) {
15,906✔
1146
    if (updated) {
2,014!
1147
      sInfo("vgId:%d, save config info since dnode info changed", pSyncNode->vgId);
×
1148
      if ((code = syncWriteCfgFile(pSyncNode)) != 0) {
×
1149
        terrno = code;
×
1150
        sError("vgId:%d, failed to write sync cfg file on dnode info updated", pSyncNode->vgId);
×
1151
        goto _error;
×
1152
      }
1153
    }
1154
  }
1155

1156
  pSyncNode->pWal = pSyncInfo->pWal;
15,906✔
1157
  pSyncNode->msgcb = pSyncInfo->msgcb;
15,906✔
1158
  pSyncNode->syncSendMSg = pSyncInfo->syncSendMSg;
15,906✔
1159
  pSyncNode->syncEqMsg = pSyncInfo->syncEqMsg;
15,906✔
1160
  pSyncNode->syncEqCtrlMsg = pSyncInfo->syncEqCtrlMsg;
15,906✔
1161

1162
  // create raft log ring buffer
1163
  code = syncLogBufferCreate(&pSyncNode->pLogBuf);
15,906✔
1164
  if (pSyncNode->pLogBuf == NULL) {
15,905!
1165
    sError("failed to init sync log buffer since %s. vgId:%d", tstrerror(code), pSyncNode->vgId);
×
1166
    goto _error;
×
1167
  }
1168

1169
  // init replicaNum, replicasId
1170
  pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
15,905✔
1171
  pSyncNode->totalReplicaNum = pSyncNode->raftCfg.cfg.totalReplicaNum;
15,905✔
1172
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
38,900✔
1173
    if (syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]) ==
22,994!
1174
        false) {
1175
      sError("vgId:%d, failed to determine raft member id, replica:%d", pSyncNode->vgId, i);
×
1176
      goto _error;
×
1177
    }
1178
  }
1179

1180
  // init internal
1181
  pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
15,906✔
1182
  pSyncNode->myRaftId = pSyncNode->replicasId[pSyncNode->raftCfg.cfg.myIndex];
15,906✔
1183

1184
  // init peersNum, peers, peersId
1185
  pSyncNode->peersNum = pSyncNode->raftCfg.cfg.totalReplicaNum - 1;
15,906✔
1186
  int32_t j = 0;
15,906✔
1187
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
38,901✔
1188
    if (i != pSyncNode->raftCfg.cfg.myIndex) {
22,995✔
1189
      pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
7,089✔
1190
      pSyncNode->peersId[j] = pSyncNode->replicasId[i];
7,089✔
1191
      syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
7,089✔
1192
      j++;
7,089✔
1193
    }
1194
  }
1195

1196
  pSyncNode->arbTerm = -1;
15,906✔
1197
  (void)taosThreadMutexInit(&pSyncNode->arbTokenMutex, NULL);
15,906✔
1198
  syncUtilGenerateArbToken(pSyncNode->myNodeInfo.nodeId, pSyncInfo->vgId, pSyncNode->arbToken);
15,906✔
1199
  sInfo("vgId:%d, generate arb token:%s", pSyncNode->vgId, pSyncNode->arbToken);
15,906!
1200

1201
  // init raft algorithm
1202
  pSyncNode->pFsm = pSyncInfo->pFsm;
15,906✔
1203
  pSyncInfo->pFsm = NULL;
15,906✔
1204
  pSyncNode->quorum = syncUtilQuorum(pSyncNode->raftCfg.cfg.replicaNum);
15,906✔
1205
  pSyncNode->leaderCache = EMPTY_RAFT_ID;
15,906✔
1206

1207
  // init life cycle outside
1208

1209
  // TLA+ Spec
1210
  // InitHistoryVars == /\ elections = {}
1211
  //                    /\ allLogs   = {}
1212
  //                    /\ voterLog  = [i \in Server |-> [j \in {} |-> <<>>]]
1213
  // InitServerVars == /\ currentTerm = [i \in Server |-> 1]
1214
  //                   /\ state       = [i \in Server |-> Follower]
1215
  //                   /\ votedFor    = [i \in Server |-> Nil]
1216
  // InitCandidateVars == /\ votesResponded = [i \in Server |-> {}]
1217
  //                      /\ votesGranted   = [i \in Server |-> {}]
1218
  // \* The values nextIndex[i][i] and matchIndex[i][i] are never read, since the
1219
  // \* leader does not send itself messages. It's still easier to include these
1220
  // \* in the functions.
1221
  // InitLeaderVars == /\ nextIndex  = [i \in Server |-> [j \in Server |-> 1]]
1222
  //                   /\ matchIndex = [i \in Server |-> [j \in Server |-> 0]]
1223
  // InitLogVars == /\ log          = [i \in Server |-> << >>]
1224
  //                /\ commitIndex  = [i \in Server |-> 0]
1225
  // Init == /\ messages = [m \in {} |-> 0]
1226
  //         /\ InitHistoryVars
1227
  //         /\ InitServerVars
1228
  //         /\ InitCandidateVars
1229
  //         /\ InitLeaderVars
1230
  //         /\ InitLogVars
1231
  //
1232

1233
  // init TLA+ server vars
1234
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
15,906✔
1235
  pSyncNode->roleTimeMs = taosGetTimestampMs();
15,906✔
1236
  if ((code = raftStoreOpen(pSyncNode)) != 0) {
15,906!
1237
    terrno = code;
×
1238
    sError("vgId:%d, failed to open raft store at path %s", pSyncNode->vgId, pSyncNode->raftStorePath);
×
1239
    goto _error;
×
1240
  }
1241

1242
  // init TLA+ candidate vars
1243
  pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode);
15,906✔
1244
  if (pSyncNode->pVotesGranted == NULL) {
15,906!
1245
    sError("vgId:%d, failed to create VotesGranted", pSyncNode->vgId);
×
1246
    goto _error;
×
1247
  }
1248
  pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode);
15,906✔
1249
  if (pSyncNode->pVotesRespond == NULL) {
15,906!
1250
    sError("vgId:%d, failed to create VotesRespond", pSyncNode->vgId);
×
1251
    goto _error;
×
1252
  }
1253

1254
  // init TLA+ leader vars
1255
  pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode);
15,906✔
1256
  if (pSyncNode->pNextIndex == NULL) {
15,906!
1257
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
×
1258
    goto _error;
×
1259
  }
1260
  pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode);
15,906✔
1261
  if (pSyncNode->pMatchIndex == NULL) {
15,906!
1262
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
×
1263
    goto _error;
×
1264
  }
1265

1266
  // init TLA+ log vars
1267
  pSyncNode->pLogStore = logStoreCreate(pSyncNode);
15,906✔
1268
  if (pSyncNode->pLogStore == NULL) {
15,905!
1269
    sError("vgId:%d, failed to create SyncLogStore", pSyncNode->vgId);
×
1270
    goto _error;
×
1271
  }
1272

1273
  SyncIndex commitIndex = SYNC_INDEX_INVALID;
15,905✔
1274
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
15,905!
1275
    SSnapshot snapshot = {0};
15,905✔
1276
    // TODO check return value
1277
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
15,905✔
1278
    if (snapshot.lastApplyIndex > commitIndex) {
15,904✔
1279
      commitIndex = snapshot.lastApplyIndex;
1,608✔
1280
      sNTrace(pSyncNode, "reset commit index by snapshot");
1,608✔
1281
    }
1282
    pSyncNode->fsmState = snapshot.state;
15,904✔
1283
    if (pSyncNode->fsmState == SYNC_FSM_STATE_INCOMPLETE) {
15,904!
1284
      sError("vgId:%d, fsm state is incomplete.", pSyncNode->vgId);
×
1285
      if (pSyncNode->replicaNum == 1) {
×
1286
        terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1287
        goto _error;
×
1288
      }
1289
    }
1290
  }
1291
  pSyncNode->commitIndex = commitIndex;
15,904✔
1292
  sInfo("vgId:%d, sync node commitIndex initialized as %" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
15,904!
1293

1294
  // restore log store on need
1295
  if ((code = syncNodeLogStoreRestoreOnNeed(pSyncNode)) < 0) {
15,906!
1296
    terrno = code;
×
1297
    sError("vgId:%d, failed to restore log store since %s.", pSyncNode->vgId, terrstr());
×
1298
    goto _error;
×
1299
  }
1300

1301
  // timer ms init
1302
  pSyncNode->pingBaseLine = PING_TIMER_MS;
15,906✔
1303
  pSyncNode->electBaseLine = tsElectInterval;
15,906✔
1304
  pSyncNode->hbBaseLine = tsHeartbeatInterval;
15,906✔
1305

1306
  // init ping timer
1307
  pSyncNode->pPingTimer = NULL;
15,906✔
1308
  pSyncNode->pingTimerMS = pSyncNode->pingBaseLine;
15,906✔
1309
  atomic_store_64(&pSyncNode->pingTimerLogicClock, 0);
15,906✔
1310
  atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0);
15,906✔
1311
  pSyncNode->FpPingTimerCB = syncNodeEqPingTimer;
15,906✔
1312
  pSyncNode->pingTimerCounter = 0;
15,906✔
1313

1314
  // init elect timer
1315
  pSyncNode->pElectTimer = NULL;
15,906✔
1316
  pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
15,906✔
1317
  atomic_store_64(&pSyncNode->electTimerLogicClock, 0);
15,906✔
1318
  pSyncNode->FpElectTimerCB = syncNodeEqElectTimer;
15,906✔
1319
  pSyncNode->electTimerCounter = 0;
15,906✔
1320

1321
  // init heartbeat timer
1322
  pSyncNode->pHeartbeatTimer = NULL;
15,906✔
1323
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
15,906✔
1324
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0);
15,906✔
1325
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0);
15,906✔
1326
#ifdef BUILD_NO_CALL
1327
  pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer;
1328
#endif
1329
  pSyncNode->heartbeatTimerCounter = 0;
15,906✔
1330

1331
  // init peer heartbeat timer
1332
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
254,496✔
1333
    if ((code = syncHbTimerInit(pSyncNode, &(pSyncNode->peerHeartbeatTimerArr[i]), (pSyncNode->replicasId)[i])) != 0) {
238,590!
1334
      errno = code;
×
1335
      goto _error;
×
1336
    }
1337
  }
1338

1339
  // tools
1340
  if ((code = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS, &pSyncNode->pSyncRespMgr)) != 0) {
15,906!
1341
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
×
1342
    goto _error;
×
1343
  }
1344
  if (pSyncNode->pSyncRespMgr == NULL) {
15,906!
1345
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
×
1346
    goto _error;
×
1347
  }
1348

1349
  // restore state
1350
  pSyncNode->restoreFinish = false;
15,906✔
1351

1352
  // snapshot senders
1353
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
254,386✔
1354
    SSyncSnapshotSender* pSender = NULL;
238,485✔
1355
    code = snapshotSenderCreate(pSyncNode, i, &pSender);
238,485✔
1356
    if (pSender == NULL) return NULL;
238,478!
1357

1358
    pSyncNode->senders[i] = pSender;
238,478✔
1359
    sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender);
238,478✔
1360
  }
1361

1362
  // snapshot receivers
1363
  code = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID, &pSyncNode->pNewNodeReceiver);
15,901✔
1364
  if (pSyncNode->pNewNodeReceiver == NULL) return NULL;
15,905!
1365
  sRDebug(pSyncNode->pNewNodeReceiver, "snapshot receiver create while open sync node, data:%p",
15,905✔
1366
          pSyncNode->pNewNodeReceiver);
1367

1368
  // is config changing
1369
  pSyncNode->changing = false;
15,905✔
1370

1371
  // replication mgr
1372
  if ((code = syncNodeLogReplInit(pSyncNode)) < 0) {
15,905!
1373
    terrno = code;
×
1374
    sError("vgId:%d, failed to init repl mgr since %s.", pSyncNode->vgId, terrstr());
×
1375
    goto _error;
×
1376
  }
1377

1378
  // peer state
1379
  if ((code = syncNodePeerStateInit(pSyncNode)) < 0) {
15,906!
1380
    terrno = code;
×
1381
    sError("vgId:%d, failed to init peer stat since %s.", pSyncNode->vgId, terrstr());
×
1382
    goto _error;
×
1383
  }
1384

1385
  //
1386
  // min match index
1387
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
15,906✔
1388

1389
  // start in syncNodeStart
1390
  // start raft
1391

1392
  int64_t timeNow = taosGetTimestampMs();
15,906✔
1393
  pSyncNode->startTime = timeNow;
15,906✔
1394
  pSyncNode->lastReplicateTime = timeNow;
15,906✔
1395

1396
  // snapshotting
1397
  atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
15,906✔
1398

1399
  // init log buffer
1400
  if ((code = syncLogBufferInit(pSyncNode->pLogBuf, pSyncNode)) < 0) {
15,906!
1401
    terrno = code;
×
1402
    sError("vgId:%d, failed to init sync log buffer since %s", pSyncNode->vgId, terrstr());
×
1403
    goto _error;
×
1404
  }
1405

1406
  pSyncNode->isStart = true;
15,906✔
1407
  pSyncNode->electNum = 0;
15,906✔
1408
  pSyncNode->becomeLeaderNum = 0;
15,906✔
1409
  pSyncNode->becomeAssignedLeaderNum = 0;
15,906✔
1410
  pSyncNode->configChangeNum = 0;
15,906✔
1411
  pSyncNode->hbSlowNum = 0;
15,906✔
1412
  pSyncNode->hbrSlowNum = 0;
15,906✔
1413
  pSyncNode->tmrRoutineNum = 0;
15,906✔
1414

1415
  sNInfo(pSyncNode, "sync node opened, node:%p electInterval:%d heartbeatInterval:%d heartbeatTimeout:%d", pSyncNode,
15,906!
1416
         tsElectInterval, tsHeartbeatInterval, tsHeartbeatTimeout);
1417
  return pSyncNode;
15,906✔
1418

1419
_error:
×
1420
  if (pSyncInfo->pFsm) {
×
1421
    taosMemoryFree(pSyncInfo->pFsm);
×
1422
    pSyncInfo->pFsm = NULL;
×
1423
  }
1424
  syncNodeClose(pSyncNode);
×
1425
  pSyncNode = NULL;
×
1426
  return NULL;
×
1427
}
1428

1429
#ifdef BUILD_NO_CALL
1430
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
1431
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
1432
    SSnapshot snapshot = {0};
1433
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
1434
    if (snapshot.lastApplyIndex > pSyncNode->commitIndex) {
1435
      pSyncNode->commitIndex = snapshot.lastApplyIndex;
1436
    }
1437
  }
1438
}
1439
#endif
1440

1441
int32_t syncNodeRestore(SSyncNode* pSyncNode) {
15,905✔
1442
  int32_t code = 0;
15,905✔
1443
  if (pSyncNode->pLogStore == NULL) {
15,905!
1444
    sError("vgId:%d, log store not created", pSyncNode->vgId);
×
1445
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1446
  }
1447
  if (pSyncNode->pLogBuf == NULL) {
15,905!
1448
    sError("vgId:%d, ring log buffer not created", pSyncNode->vgId);
×
1449
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1450
  }
1451

1452
  (void)taosThreadMutexLock(&pSyncNode->pLogBuf->mutex);
15,905✔
1453
  SyncIndex lastVer = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
15,905✔
1454
  SyncIndex commitIndex = pSyncNode->pLogStore->syncLogCommitIndex(pSyncNode->pLogStore);
15,905✔
1455
  SyncIndex endIndex = pSyncNode->pLogBuf->endIndex;
15,904✔
1456
  (void)taosThreadMutexUnlock(&pSyncNode->pLogBuf->mutex);
15,904✔
1457

1458
  if (lastVer != -1 && endIndex != lastVer + 1) {
15,905!
1459
    code = TSDB_CODE_WAL_LOG_INCOMPLETE;
×
1460
    sWarn("vgId:%d, failed to restore sync node since %s. expected lastLogIndex:%" PRId64 ", lastVer:%" PRId64 "",
×
1461
          pSyncNode->vgId, terrstr(), endIndex - 1, lastVer);
1462
    // TAOS_RETURN(code);
1463
  }
1464

1465
  // if (endIndex != lastVer + 1) return TSDB_CODE_SYN_INTERNAL_ERROR;
1466
  pSyncNode->commitIndex = TMAX(pSyncNode->commitIndex, commitIndex);
15,904✔
1467
  sInfo("vgId:%d, restore sync until commitIndex:%" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
15,904!
1468

1469
  if (pSyncNode->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
31,810!
1470
      (code = syncLogBufferCommit(pSyncNode->pLogBuf, pSyncNode, pSyncNode->commitIndex)) < 0) {
15,905✔
1471
    TAOS_RETURN(code);
×
1472
  }
1473

1474
  TAOS_RETURN(code);
15,905✔
1475
}
1476

1477
int32_t syncNodeStart(SSyncNode* pSyncNode) {
15,905✔
1478
  // start raft
1479
  sInfo("vgId:%d, begin to start sync node", pSyncNode->vgId);
15,905!
1480
  if (pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole == TAOS_SYNC_ROLE_LEARNER) {
15,905✔
1481
    syncNodeBecomeLearner(pSyncNode, "first start");
314✔
1482
  } else {
1483
    if (pSyncNode->replicaNum == 1) {
15,591✔
1484
      raftStoreNextTerm(pSyncNode);
12,451✔
1485
      syncNodeBecomeLeader(pSyncNode, "one replica start");
12,451✔
1486

1487
      // Raft 3.6.2 Committing entries from previous terms
1488
      TAOS_CHECK_RETURN(syncNodeAppendNoop(pSyncNode));
12,451!
1489
    } else {
1490
      SRaftId id = {0};
3,140✔
1491
      syncNodeBecomeFollower(pSyncNode, id, "first start");
3,140✔
1492
    }
1493
  }
1494

1495
  int32_t ret = 0;
15,905✔
1496
  ret = syncNodeStartPingTimer(pSyncNode);
15,905✔
1497
  if (ret != 0) {
15,905!
1498
    sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, tstrerror(ret));
×
1499
  }
1500
  sInfo("vgId:%d, sync node started", pSyncNode->vgId);
15,905!
1501
  return ret;
15,905✔
1502
}
1503

1504
#ifdef BUILD_NO_CALL
1505
int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) {
1506
  // state change
1507
  int32_t code = 0;
1508
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
1509
  pSyncNode->roleTimeMs = taosGetTimestampMs();
1510
  // TODO check return value
1511
  TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
1512

1513
  // reset elect timer, long enough
1514
  int32_t electMS = TIMER_MAX_MS;
1515
  code = syncNodeRestartElectTimer(pSyncNode, electMS);
1516
  if (code < 0) {
1517
    sError("vgId:%d, failed to restart elect timer since %s", pSyncNode->vgId, terrstr());
1518
    return -1;
1519
  }
1520

1521
  code = syncNodeStartPingTimer(pSyncNode);
1522
  if (code < 0) {
1523
    sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, terrstr());
1524
    return -1;
1525
  }
1526
  return code;
1527
}
1528
#endif
1529

1530
void syncNodePreClose(SSyncNode* pSyncNode) {
15,905✔
1531
  int32_t code = 0;
15,905✔
1532
  if (pSyncNode == NULL) {
15,905!
1533
    sError("failed to pre close sync node since sync node is null");
×
1534
    return;
×
1535
  }
1536
  if (pSyncNode->pFsm == NULL) {
15,905!
1537
    sError("failed to pre close sync node since fsm is null");
×
1538
    return;
×
1539
  }
1540
  if (pSyncNode->pFsm->FpApplyQueueItems == NULL) {
15,905!
1541
    sError("failed to pre close sync node since FpApplyQueueItems is null");
×
1542
    return;
×
1543
  }
1544

1545
  // stop elect timer
1546
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
15,905!
1547
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
1548
    return;
×
1549
  }
1550

1551
  // stop heartbeat timer
1552
  if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
15,904!
1553
    sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
1554
    return;
×
1555
  }
1556

1557
  // stop ping timer
1558
  if ((code = syncNodeStopPingTimer(pSyncNode)) != 0) {
15,905!
1559
    sError("vgId:%d, failed to stop ping timer since %s", pSyncNode->vgId, tstrerror(code));
×
1560
    return;
×
1561
  }
1562

1563
  // clean rsp
1564
  syncRespCleanRsp(pSyncNode->pSyncRespMgr);
15,905✔
1565
}
1566

1567
void syncNodePostClose(SSyncNode* pSyncNode) {
13,890✔
1568
  if (pSyncNode->pNewNodeReceiver != NULL) {
13,890!
1569
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
13,892!
1570
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
1571
    }
1572

1573
    sDebug("vgId:%d, snapshot receiver destroy while preclose sync node, data:%p", pSyncNode->vgId,
13,891✔
1574
           pSyncNode->pNewNodeReceiver);
1575
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
13,891✔
1576
    pSyncNode->pNewNodeReceiver = NULL;
13,892✔
1577
  }
1578
}
13,890✔
1579

1580
void syncHbTimerDataFree(SSyncHbTimerData* pData) { taosMemoryFree(pData); }
2,488✔
1581

1582
void syncNodeClose(SSyncNode* pSyncNode) {
15,904✔
1583
  int32_t code = 0;
15,904✔
1584
  if (pSyncNode == NULL) return;
15,904!
1585
  sNInfo(pSyncNode, "sync close, node:%p", pSyncNode);
15,904!
1586

1587
  syncRespCleanRsp(pSyncNode->pSyncRespMgr);
15,904✔
1588

1589
  if ((code = syncNodeStopPingTimer(pSyncNode)) != 0) {
15,905!
1590
    sError("vgId:%d, failed to stop ping timer since %s", pSyncNode->vgId, tstrerror(code));
×
1591
    return;
×
1592
  }
1593
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
15,905!
1594
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
1595
    return;
×
1596
  }
1597
  if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
15,904!
1598
    sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
1599
    return;
×
1600
  }
1601
  syncNodeLogReplDestroy(pSyncNode);
15,904✔
1602

1603
  syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
15,905✔
1604
  pSyncNode->pSyncRespMgr = NULL;
15,905✔
1605
  voteGrantedDestroy(pSyncNode->pVotesGranted);
15,905✔
1606
  pSyncNode->pVotesGranted = NULL;
15,905✔
1607
  votesRespondDestory(pSyncNode->pVotesRespond);
15,905✔
1608
  pSyncNode->pVotesRespond = NULL;
15,904✔
1609
  syncIndexMgrDestroy(pSyncNode->pNextIndex);
15,904✔
1610
  pSyncNode->pNextIndex = NULL;
15,904✔
1611
  syncIndexMgrDestroy(pSyncNode->pMatchIndex);
15,904✔
1612
  pSyncNode->pMatchIndex = NULL;
15,904✔
1613
  logStoreDestory(pSyncNode->pLogStore);
15,904✔
1614
  pSyncNode->pLogStore = NULL;
15,904✔
1615
  syncLogBufferDestroy(pSyncNode->pLogBuf);
15,904✔
1616
  pSyncNode->pLogBuf = NULL;
15,905✔
1617

1618
  (void)taosThreadMutexDestroy(&pSyncNode->arbTokenMutex);
15,905✔
1619

1620
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
254,419✔
1621
    if (pSyncNode->senders[i] != NULL) {
238,514✔
1622
      sDebug("vgId:%d, snapshot sender destroy while close, data:%p", pSyncNode->vgId, pSyncNode->senders[i]);
238,511✔
1623

1624
      if (snapshotSenderIsStart(pSyncNode->senders[i])) {
238,512!
1625
        snapshotSenderStop(pSyncNode->senders[i], false);
×
1626
      }
1627

1628
      snapshotSenderDestroy(pSyncNode->senders[i]);
238,521✔
1629
      pSyncNode->senders[i] = NULL;
238,532✔
1630
    }
1631
  }
1632

1633
  if (pSyncNode->pNewNodeReceiver != NULL) {
15,905✔
1634
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
2,013!
1635
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
1636
    }
1637

1638
    sDebug("vgId:%d, snapshot receiver destroy while close, data:%p", pSyncNode->vgId, pSyncNode->pNewNodeReceiver);
2,013✔
1639
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
2,013✔
1640
    pSyncNode->pNewNodeReceiver = NULL;
2,013✔
1641
  }
1642

1643
  if (pSyncNode->pFsm != NULL) {
15,905!
1644
    taosMemoryFree(pSyncNode->pFsm);
15,905✔
1645
  }
1646

1647
  raftStoreClose(pSyncNode);
15,905✔
1648

1649
  taosMemoryFree(pSyncNode);
15,905✔
1650
}
1651

1652
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->raftCfg.snapshotStrategy; }
×
1653

1654
// timer control --------------
1655
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
15,905✔
1656
  int32_t code = 0;
15,905✔
1657
  if (syncIsInit()) {
15,905!
1658
    bool stopped = taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, (void*)pSyncNode->rid,
15,905✔
1659
                                syncEnv()->pTimerManager, &pSyncNode->pPingTimer);
15,905✔
1660
    if (stopped) {
15,905!
1661
      sError("vgId:%d, failed to reset ping timer, ms:%d", pSyncNode->vgId, pSyncNode->pingTimerMS);
×
1662
      return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1663
    }
1664
    atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
15,905✔
1665
  } else {
1666
    sError("vgId:%d, start ping timer error, sync env is stop", pSyncNode->vgId);
×
1667
  }
1668
  return code;
15,905✔
1669
}
1670

1671
int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
31,810✔
1672
  int32_t code = 0;
31,810✔
1673
  (void)atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
31,810✔
1674
  bool stop = taosTmrStop(pSyncNode->pPingTimer);
31,810✔
1675
  sDebug("vgId:%d, stop ping timer, stop:%d", pSyncNode->vgId, stop);
31,810✔
1676
  pSyncNode->pPingTimer = NULL;
31,810✔
1677
  return code;
31,810✔
1678
}
1679

1680
int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
674,229✔
1681
  int32_t code = 0;
674,229✔
1682
  if (syncIsInit()) {
674,229!
1683
    pSyncNode->electTimerMS = ms;
674,229✔
1684

1685
    int64_t execTime = taosGetTimestampMs() + ms;
674,230✔
1686
    atomic_store_64(&(pSyncNode->electTimerParam.executeTime), execTime);
674,230✔
1687
    atomic_store_64(&(pSyncNode->electTimerParam.logicClock), pSyncNode->electTimerLogicClock);
674,229✔
1688
    pSyncNode->electTimerParam.pSyncNode = pSyncNode;
674,231✔
1689
    pSyncNode->electTimerParam.pData = NULL;
674,231✔
1690

1691
    bool stopped = taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, (void*)(pSyncNode->rid),
674,231✔
1692
                                syncEnv()->pTimerManager, &pSyncNode->pElectTimer);
674,231✔
1693
    if (stopped) sError("vgId:%d, failed to reset elect timer, ms:%d", pSyncNode->vgId, ms);
674,231!
1694
  } else {
1695
    sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
×
1696
  }
1697
  return code;
674,231✔
1698
}
1699

1700
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
719,535✔
1701
  int32_t code = 0;
719,535✔
1702
  (void)atomic_add_fetch_64(&pSyncNode->electTimerLogicClock, 1);
719,535✔
1703
  bool stop = taosTmrStop(pSyncNode->pElectTimer);
719,536✔
1704
  sDebug("vgId:%d, stop elect timer, stop:%d", pSyncNode->vgId, stop);
719,535✔
1705
  pSyncNode->pElectTimer = NULL;
719,535✔
1706

1707
  return code;
719,535✔
1708
}
1709

1710
int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
674,230✔
1711
  int32_t ret = 0;
674,230✔
1712
  TAOS_CHECK_RETURN(syncNodeStopElectTimer(pSyncNode));
674,230!
1713
  TAOS_CHECK_RETURN(syncNodeStartElectTimer(pSyncNode, ms));
674,230!
1714
  return ret;
674,231✔
1715
}
1716

1717
void syncNodeResetElectTimer(SSyncNode* pSyncNode) {
674,226✔
1718
  int32_t code = 0;
674,226✔
1719
  int32_t electMS;
1720

1721
  if (pSyncNode->raftCfg.isStandBy) {
674,226!
1722
    electMS = TIMER_MAX_MS;
×
1723
  } else {
1724
    electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
674,226✔
1725
  }
1726

1727
  // TODO check return value
1728
  if ((code = syncNodeRestartElectTimer(pSyncNode, electMS)) != 0) {
674,230!
1729
    sError("vgId:%d, failed to restart elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
1730
    return;
×
1731
  };
1732

1733
  sNTrace(pSyncNode, "reset elect timer, min:%d, max:%d, ms:%d", pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine,
674,231!
1734
          electMS);
1735
}
1736

1737
#ifdef BUILD_NO_CALL
1738
static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) {
1739
  int32_t code = 0;
1740
  if (syncIsInit()) {
1741
    TAOS_CHECK_RETURN(taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, (void*)pSyncNode->rid,
1742
                                   syncEnv()->pTimerManager, &pSyncNode->pHeartbeatTimer));
1743
    atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
1744
  } else {
1745
    sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId);
1746
  }
1747

1748
  sNTrace(pSyncNode, "start heartbeat timer, ms:%d", pSyncNode->heartbeatTimerMS);
1749
  return code;
1750
}
1751
#endif
1752

1753
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
15,189✔
1754
  int32_t ret = 0;
15,189✔
1755

1756
#if 0
1757
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
1758
  ret = syncNodeDoStartHeartbeatTimer(pSyncNode);
1759
#endif
1760

1761
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
17,680✔
1762
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
2,491✔
1763
    if (pSyncTimer != NULL) {
2,491!
1764
      TAOS_CHECK_RETURN(syncHbTimerStart(pSyncNode, pSyncTimer));
2,491!
1765
    }
1766
  }
1767

1768
  return ret;
15,189✔
1769
}
1770

1771
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
38,829✔
1772
  int32_t code = 0;
38,829✔
1773

1774
#if 0
1775
  TAOS_CHECK_RETURN(atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1));
1776
  bool stop = taosTmrStop(pSyncNode->pHeartbeatTimer);
1777
  sDebug("vgId:%d, stop heartbeat timer, stop:%d", pSyncNode->vgId, stop);
1778
  pSyncNode->pHeartbeatTimer = NULL;
1779
#endif
1780

1781
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
64,467✔
1782
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
25,636✔
1783
    if (pSyncTimer != NULL) {
25,637!
1784
      TAOS_CHECK_RETURN(syncHbTimerStop(pSyncNode, pSyncTimer));
25,637!
1785
    }
1786
  }
1787

1788
  return code;
38,831✔
1789
}
1790

1791
#ifdef BUILD_NO_CALL
1792
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) {
1793
  // TODO check return value
1794
  int32_t code = 0;
1795
  TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
1796
  TAOS_CHECK_RETURN(syncNodeStartHeartbeatTimer(pSyncNode));
1797
  return 0;
1798
}
1799
#endif
1800

1801
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pNode, SRpcMsg* pMsg) {
6,803,287✔
1802
  SEpSet* epSet = NULL;
6,803,287✔
1803
  for (int32_t i = 0; i < pNode->peersNum; ++i) {
7,265,263✔
1804
    if (destRaftId->addr == pNode->peersId[i].addr) {
7,265,192✔
1805
      epSet = &pNode->peersEpset[i];
6,803,216✔
1806
      break;
6,803,216✔
1807
    }
1808
  }
1809

1810
  int32_t code = -1;
6,803,287✔
1811
  if (pNode->syncSendMSg != NULL && epSet != NULL) {
6,803,287!
1812
    syncUtilMsgHtoN(pMsg->pCont);
6,803,221✔
1813
    pMsg->info.noResp = 1;
6,803,214✔
1814
    code = pNode->syncSendMSg(epSet, pMsg);
6,803,214✔
1815
  }
1816

1817
  if (code < 0) {
6,803,316✔
1818
    sError("vgId:%d, failed to send sync msg since %s. epset:%p dnode:%d addr:%" PRId64, pNode->vgId, tstrerror(code),
86!
1819
           epSet, DID(destRaftId), destRaftId->addr);
1820
    rpcFreeCont(pMsg->pCont);
86✔
1821
  }
1822

1823
  TAOS_RETURN(code);
6,803,316✔
1824
}
1825

1826
inline bool syncNodeInConfig(SSyncNode* pNode, const SSyncCfg* pCfg) {
2,461✔
1827
  bool b1 = false;
2,461✔
1828
  bool b2 = false;
2,461✔
1829

1830
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
2,917!
1831
    if (strcmp(pCfg->nodeInfo[i].nodeFqdn, pNode->myNodeInfo.nodeFqdn) == 0 &&
2,917!
1832
        pCfg->nodeInfo[i].nodePort == pNode->myNodeInfo.nodePort) {
2,917✔
1833
      b1 = true;
2,461✔
1834
      break;
2,461✔
1835
    }
1836
  }
1837

1838
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
2,917!
1839
    SRaftId raftId = {
2,917✔
1840
        .addr = SYNC_ADDR(&pCfg->nodeInfo[i]),
2,917✔
1841
        .vgId = pNode->vgId,
2,917✔
1842
    };
1843

1844
    if (syncUtilSameId(&raftId, &pNode->myRaftId)) {
2,917✔
1845
      b2 = true;
2,461✔
1846
      break;
2,461✔
1847
    }
1848
  }
1849

1850
  if (b1 != b2) {
2,461!
1851
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1852
    return false;
×
1853
  }
1854
  return b1;
2,461✔
1855
}
1856

1857
static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg) {
3,554✔
1858
  if (pOldCfg->totalReplicaNum != pNewCfg->totalReplicaNum) return true;
3,554✔
1859
  if (pOldCfg->myIndex != pNewCfg->myIndex) return true;
2,398✔
1860
  for (int32_t i = 0; i < pOldCfg->totalReplicaNum; ++i) {
5,618✔
1861
    const SNodeInfo* pOldInfo = &pOldCfg->nodeInfo[i];
4,027✔
1862
    const SNodeInfo* pNewInfo = &pNewCfg->nodeInfo[i];
4,027✔
1863
    if (strcmp(pOldInfo->nodeFqdn, pNewInfo->nodeFqdn) != 0) return true;
4,027!
1864
    if (pOldInfo->nodePort != pNewInfo->nodePort) return true;
4,027✔
1865
    if (pOldInfo->nodeRole != pNewInfo->nodeRole) return true;
4,025✔
1866
  }
1867

1868
  return false;
1,591✔
1869
}
1870

1871
int32_t syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) {
1,881✔
1872
  int32_t  code = 0;
1,881✔
1873
  SSyncCfg oldConfig = pSyncNode->raftCfg.cfg;
1,881✔
1874
  if (!syncIsConfigChanged(&oldConfig, pNewConfig)) {
1,881✔
1875
    sInfo("vgId:1, sync not reconfig since not changed");
1,591!
1876
    return 0;
1,591✔
1877
  }
1878

1879
  pSyncNode->raftCfg.cfg = *pNewConfig;
290✔
1880
  pSyncNode->raftCfg.lastConfigIndex = lastConfigChangeIndex;
290✔
1881

1882
  pSyncNode->configChangeNum++;
290✔
1883

1884
  bool IamInOld = syncNodeInConfig(pSyncNode, &oldConfig);
290✔
1885
  bool IamInNew = syncNodeInConfig(pSyncNode, pNewConfig);
290✔
1886

1887
  bool isDrop = false;
290✔
1888
  bool isAdd = false;
290✔
1889

1890
  if (IamInOld && !IamInNew) {
290!
1891
    isDrop = true;
×
1892
  } else {
1893
    isDrop = false;
290✔
1894
  }
1895

1896
  if (!IamInOld && IamInNew) {
290!
1897
    isAdd = true;
×
1898
  } else {
1899
    isAdd = false;
290✔
1900
  }
1901

1902
  // log begin config change
1903
  sNInfo(pSyncNode, "begin do config change, from %d to %d, from %" PRId64 " to %" PRId64 ", replicas:%d",
290!
1904
         pSyncNode->vgId, oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum, oldConfig.lastIndex,
1905
         pNewConfig->lastIndex);
1906

1907
  if (IamInNew) {
290!
1908
    pSyncNode->raftCfg.isStandBy = 0;  // change isStandBy to normal
290✔
1909
  }
1910
  if (isDrop) {
290!
1911
    pSyncNode->raftCfg.isStandBy = 1;  // set standby
×
1912
  }
1913

1914
  // add last config index
1915
  SRaftCfg* pCfg = &pSyncNode->raftCfg;
290✔
1916
  if (pCfg->configIndexCount >= MAX_CONFIG_INDEX_COUNT) {
290!
1917
    sNError(pSyncNode, "failed to add cfg index:%d since out of range", pCfg->configIndexCount);
×
1918
    terrno = TSDB_CODE_OUT_OF_RANGE;
×
1919
    return -1;
×
1920
  }
1921

1922
  pCfg->configIndexArr[pCfg->configIndexCount] = lastConfigChangeIndex;
290✔
1923
  pCfg->configIndexCount++;
290✔
1924

1925
  if (IamInNew) {
290!
1926
    //-----------------------------------------
1927
    int32_t ret = 0;
290✔
1928

1929
    // save snapshot senders
1930
    SRaftId oldReplicasId[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
1931
    memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId));
290✔
1932
    SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
1933
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
4,640✔
1934
      oldSenders[i] = pSyncNode->senders[i];
4,350✔
1935
      sSTrace(oldSenders[i], "snapshot sender save old");
4,350!
1936
    }
1937

1938
    // init internal
1939
    pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
290✔
1940
    if (syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId) == false) return terrno;
290!
1941

1942
    // init peersNum, peers, peersId
1943
    pSyncNode->peersNum = pSyncNode->raftCfg.cfg.totalReplicaNum - 1;
290✔
1944
    int32_t j = 0;
290✔
1945
    for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
1,050✔
1946
      if (i != pSyncNode->raftCfg.cfg.myIndex) {
760✔
1947
        pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
470✔
1948
        syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
470✔
1949
        j++;
470✔
1950
      }
1951
    }
1952
    for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
760✔
1953
      if (syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]) == false)
470!
1954
        return terrno;
×
1955
    }
1956

1957
    // init replicaNum, replicasId
1958
    pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
290✔
1959
    pSyncNode->totalReplicaNum = pSyncNode->raftCfg.cfg.totalReplicaNum;
290✔
1960
    for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
1,050✔
1961
      if (syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]) ==
760!
1962
          false)
1963
        return terrno;
×
1964
    }
1965

1966
    // update quorum first
1967
    pSyncNode->quorum = syncUtilQuorum(pSyncNode->raftCfg.cfg.replicaNum);
290✔
1968

1969
    syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode);
290✔
1970
    syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode);
290✔
1971
    voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode);
290✔
1972
    votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode);
290✔
1973

1974
    // reset snapshot senders
1975

1976
    // clear new
1977
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
4,640✔
1978
      pSyncNode->senders[i] = NULL;
4,350✔
1979
    }
1980

1981
    // reset new
1982
    for (int32_t i = 0; i < pSyncNode->totalReplicaNum; ++i) {
1,050✔
1983
      // reset sender
1984
      bool reset = false;
760✔
1985
      for (int32_t j = 0; j < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++j) {
3,250✔
1986
        if (syncUtilSameId(&(pSyncNode->replicasId)[i], &oldReplicasId[j]) && oldSenders[j] != NULL) {
3,113!
1987
          sNTrace(pSyncNode, "snapshot sender reset for:%" PRId64 ", newIndex:%d, dnode:%d, %p",
623!
1988
                  (pSyncNode->replicasId)[i].addr, i, DID(&pSyncNode->replicasId[i]), oldSenders[j]);
1989

1990
          pSyncNode->senders[i] = oldSenders[j];
623✔
1991
          oldSenders[j] = NULL;
623✔
1992
          reset = true;
623✔
1993

1994
          // reset replicaIndex
1995
          int32_t oldreplicaIndex = pSyncNode->senders[i]->replicaIndex;
623✔
1996
          pSyncNode->senders[i]->replicaIndex = i;
623✔
1997

1998
          sNTrace(pSyncNode, "snapshot sender udpate replicaIndex from %d to %d, dnode:%d, %p, reset:%d",
623!
1999
                  oldreplicaIndex, i, DID(&pSyncNode->replicasId[i]), pSyncNode->senders[i], reset);
2000

2001
          break;
623✔
2002
        }
2003
      }
2004
    }
2005

2006
    // create new
2007
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
4,640✔
2008
      if (pSyncNode->senders[i] == NULL) {
4,350✔
2009
        TAOS_CHECK_RETURN(snapshotSenderCreate(pSyncNode, i, &pSyncNode->senders[i]));
3,727!
2010
        if (pSyncNode->senders[i] == NULL) {
3,727!
2011
          // will be created later while send snapshot
2012
          sSError(pSyncNode->senders[i], "snapshot sender create failed while reconfig");
×
2013
        } else {
2014
          sSDebug(pSyncNode->senders[i], "snapshot sender create while reconfig, data:%p", pSyncNode->senders[i]);
3,727!
2015
        }
2016
      } else {
2017
        sSDebug(pSyncNode->senders[i], "snapshot sender already exist, data:%p", pSyncNode->senders[i]);
623!
2018
      }
2019
    }
2020

2021
    // free old
2022
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
4,640✔
2023
      if (oldSenders[i] != NULL) {
4,350✔
2024
        sSDebug(oldSenders[i], "snapshot sender destroy old, data:%p replica-index:%d", oldSenders[i], i);
3,727!
2025
        snapshotSenderDestroy(oldSenders[i]);
3,727✔
2026
        oldSenders[i] = NULL;
3,727✔
2027
      }
2028
    }
2029

2030
    // persist cfg
2031
    TAOS_CHECK_RETURN(syncWriteCfgFile(pSyncNode));
290!
2032
  } else {
2033
    // persist cfg
2034
    TAOS_CHECK_RETURN(syncWriteCfgFile(pSyncNode));
×
2035
    sNInfo(pSyncNode, "do not config change from %d to %d", oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum);
×
2036
  }
2037

2038
_END:
×
2039
  // log end config change
2040
  sNInfo(pSyncNode, "end do config change, from %d to %d", oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum);
290!
2041
  return 0;
290✔
2042
}
2043

2044
// raft state change --------------
2045
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
56,311✔
2046
  if (term > raftStoreGetTerm(pSyncNode)) {
56,311!
2047
    raftStoreSetTerm(pSyncNode, term);
×
2048
  }
2049
}
56,311✔
2050

2051
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm, SRaftId id) {
569,207✔
2052
  SyncTerm currentTerm = raftStoreGetTerm(pSyncNode);
569,207✔
2053
  if (currentTerm > newTerm) {
569,207!
2054
    sNTrace(pSyncNode, "step down, ignore, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
×
2055
    return;
×
2056
  }
2057

2058
  do {
2059
    sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
569,207!
2060
  } while (0);
2061

2062
  if (pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
569,207!
2063
    (void)taosThreadMutexLock(&pSyncNode->arbTokenMutex);
×
2064
    syncUtilGenerateArbToken(pSyncNode->myNodeInfo.nodeId, pSyncNode->vgId, pSyncNode->arbToken);
×
2065
    sInfo("vgId:%d, step down as assigned leader, new arbToken:%s", pSyncNode->vgId, pSyncNode->arbToken);
×
2066
    (void)taosThreadMutexUnlock(&pSyncNode->arbTokenMutex);
×
2067
  }
2068

2069
  if (currentTerm < newTerm) {
569,203✔
2070
    raftStoreSetTerm(pSyncNode, newTerm);
2,175✔
2071
    char tmpBuf[64];
2072
    snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm);
2,175✔
2073
    syncNodeBecomeFollower(pSyncNode, id, tmpBuf);
2,175✔
2074
    raftStoreClearVote(pSyncNode);
2,175✔
2075
  } else {
2076
    if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
567,028✔
2077
      syncNodeBecomeFollower(pSyncNode, id, "step down");
8✔
2078
    }
2079
  }
2080
}
2081

2082
void syncNodeLeaderChangeRsp(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); }
5,327✔
2083

2084
void syncNodeBecomeFollower(SSyncNode* pSyncNode, SRaftId leaderId, const char* debugStr) {
5,328✔
2085
  int32_t code = 0;  // maybe clear leader cache
5,328✔
2086
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
5,328✔
2087
    pSyncNode->leaderCache = EMPTY_RAFT_ID;
17✔
2088
  }
2089

2090
  pSyncNode->hbSlowNum = 0;
5,328✔
2091

2092
  pSyncNode->leaderCache = leaderId;  // state change
5,328✔
2093
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
5,328✔
2094
  pSyncNode->roleTimeMs = taosGetTimestampMs();
5,328✔
2095
  if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
5,328!
2096
    sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
2097
    return;
×
2098
  }
2099

2100
  // trace log
2101
  sNTrace(pSyncNode, "become follower %s", debugStr);
5,327!
2102

2103
  // send rsp to client
2104
  syncNodeLeaderChangeRsp(pSyncNode);
5,327✔
2105

2106
  // call back
2107
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeFollowerCb != NULL) {
5,328!
2108
    pSyncNode->pFsm->FpBecomeFollowerCb(pSyncNode->pFsm);
5,328✔
2109
  }
2110

2111
  // min match index
2112
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
5,328✔
2113

2114
  // reset log buffer
2115
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
5,328!
2116
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2117
    return;
×
2118
  }
2119

2120
  // reset elect timer
2121
  syncNodeResetElectTimer(pSyncNode);
5,328✔
2122

2123
  sInfo("vgId:%d, become follower. %s", pSyncNode->vgId, debugStr);
5,328!
2124
}
2125

2126
void syncNodeBecomeLearner(SSyncNode* pSyncNode, const char* debugStr) {
314✔
2127
  pSyncNode->hbSlowNum = 0;
314✔
2128

2129
  // state change
2130
  pSyncNode->state = TAOS_SYNC_STATE_LEARNER;
314✔
2131
  pSyncNode->roleTimeMs = taosGetTimestampMs();
314✔
2132

2133
  // trace log
2134
  sNTrace(pSyncNode, "become learner %s", debugStr);
314!
2135

2136
  // call back
2137
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLearnerCb != NULL) {
314!
2138
    pSyncNode->pFsm->FpBecomeLearnerCb(pSyncNode->pFsm);
314✔
2139
  }
2140

2141
  // min match index
2142
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
314✔
2143

2144
  // reset log buffer
2145
  int32_t code = 0;
314✔
2146
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
314!
2147
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2148
    return;
×
2149
  };
2150
}
2151

2152
// TLA+ Spec
2153
// \* Candidate i transitions to leader.
2154
// BecomeLeader(i) ==
2155
//     /\ state[i] = Candidate
2156
//     /\ votesGranted[i] \in Quorum
2157
//     /\ state'      = [state EXCEPT ![i] = Leader]
2158
//     /\ nextIndex'  = [nextIndex EXCEPT ![i] =
2159
//                          [j \in Server |-> Len(log[i]) + 1]]
2160
//     /\ matchIndex' = [matchIndex EXCEPT ![i] =
2161
//                          [j \in Server |-> 0]]
2162
//     /\ elections'  = elections \cup
2163
//                          {[eterm     |-> currentTerm[i],
2164
//                            eleader   |-> i,
2165
//                            elog      |-> log[i],
2166
//                            evotes    |-> votesGranted[i],
2167
//                            evoterLog |-> voterLog[i]]}
2168
//     /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
2169
//
2170
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
13,496✔
2171
  int32_t code = 0;
13,496✔
2172
  pSyncNode->becomeLeaderNum++;
13,496✔
2173
  pSyncNode->hbrSlowNum = 0;
13,496✔
2174

2175
  // reset restoreFinish
2176
  pSyncNode->restoreFinish = false;
13,496✔
2177

2178
  // state change
2179
  pSyncNode->state = TAOS_SYNC_STATE_LEADER;
13,496✔
2180
  pSyncNode->roleTimeMs = taosGetTimestampMs();
13,496✔
2181

2182
  // set leader cache
2183
  pSyncNode->leaderCache = pSyncNode->myRaftId;
13,496✔
2184

2185
  for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
28,988✔
2186
    SyncIndex lastIndex;
2187
    SyncTerm  lastTerm;
2188
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
15,492✔
2189
    if (code != 0) {
15,492!
2190
      sError("vgId:%d, failed to become leader since %s", pSyncNode->vgId, tstrerror(code));
×
2191
      return;
×
2192
    }
2193
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
15,492✔
2194
  }
2195

2196
  for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
28,988✔
2197
    // maybe overwrite myself, no harm
2198
    // just do it!
2199
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
15,492✔
2200
  }
2201

2202
  // init peer mgr
2203
  if ((code = syncNodePeerStateInit(pSyncNode)) != 0) {
13,496!
2204
    sError("vgId:%d, failed to init peer state since %s", pSyncNode->vgId, tstrerror(code));
×
2205
    return;
×
2206
  }
2207

2208
#if 0
2209
  // update sender private term
2210
  SSyncSnapshotSender* pMySender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->myRaftId));
2211
  if (pMySender != NULL) {
2212
    for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
2213
      if (pSyncNode->senders[i]->privateTerm > pMySender->privateTerm) {
2214
        pMySender->privateTerm = pSyncNode->senders[i]->privateTerm;
2215
      }
2216
    }
2217
    (pMySender->privateTerm) += 100;
2218
  }
2219
#endif
2220

2221
  // close receiver
2222
  if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
13,496!
2223
    snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
2224
  }
2225

2226
  // stop elect timer
2227
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
13,496!
2228
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
2229
    return;
×
2230
  }
2231

2232
  // start heartbeat timer
2233
  if ((code = syncNodeStartHeartbeatTimer(pSyncNode)) != 0) {
13,496!
2234
    sError("vgId:%d, failed to start heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
2235
    return;
×
2236
  }
2237

2238
  // send heartbeat right now
2239
  if ((code = syncNodeHeartbeatPeers(pSyncNode)) != 0) {
13,496!
2240
    sError("vgId:%d, failed to send heartbeat to peers since %s", pSyncNode->vgId, tstrerror(code));
×
2241
    return;
×
2242
  }
2243

2244
  // call back
2245
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLeaderCb != NULL) {
13,496!
2246
    pSyncNode->pFsm->FpBecomeLeaderCb(pSyncNode->pFsm);
13,496✔
2247
  }
2248

2249
  // min match index
2250
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
13,496✔
2251

2252
  // reset log buffer
2253
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
13,496!
2254
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2255
    return;
×
2256
  }
2257

2258
  // trace log
2259
  sNInfo(pSyncNode, "become leader %s", debugStr);
13,496!
2260
}
2261

2262
void syncNodeBecomeAssignedLeader(SSyncNode* pSyncNode) {
×
2263
  int32_t code = 0;
×
2264
  pSyncNode->becomeAssignedLeaderNum++;
×
2265
  pSyncNode->hbrSlowNum = 0;
×
2266

2267
  // reset restoreFinish
2268
  // pSyncNode->restoreFinish = false;
2269

2270
  // state change
2271
  pSyncNode->state = TAOS_SYNC_STATE_ASSIGNED_LEADER;
×
2272
  pSyncNode->roleTimeMs = taosGetTimestampMs();
×
2273

2274
  // set leader cache
2275
  pSyncNode->leaderCache = pSyncNode->myRaftId;
×
2276

2277
  for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
×
2278
    SyncIndex lastIndex;
2279
    SyncTerm  lastTerm;
2280
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
×
2281
    if (code != 0) {
×
2282
      sError("vgId:%d, failed to become assigned leader since %s", pSyncNode->vgId, tstrerror(code));
×
2283
      return;
×
2284
    }
2285
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
×
2286
  }
2287

2288
  for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
×
2289
    // maybe overwrite myself, no harm
2290
    // just do it!
2291
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
×
2292
  }
2293

2294
  // init peer mgr
2295
  if ((code = syncNodePeerStateInit(pSyncNode)) != 0) {
×
2296
    sError("vgId:%d, failed to init peer state since %s", pSyncNode->vgId, tstrerror(code));
×
2297
    return;
×
2298
  }
2299

2300
  // close receiver
2301
  if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
×
2302
    snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
2303
  }
2304

2305
  // stop elect timer
2306
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
×
2307
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
2308
    return;
×
2309
  }
2310

2311
  // start heartbeat timer
2312
  if ((code = syncNodeStartHeartbeatTimer(pSyncNode)) != 0) {
×
2313
    sError("vgId:%d, failed to start heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
2314
    return;
×
2315
  }
2316

2317
  // send heartbeat right now
2318
  if ((code = syncNodeHeartbeatPeers(pSyncNode)) != 0) {
×
2319
    sError("vgId:%d, failed to send heartbeat to peers since %s", pSyncNode->vgId, tstrerror(code));
×
2320
    return;
×
2321
  }
2322

2323
  // call back
2324
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeAssignedLeaderCb != NULL) {
×
2325
    pSyncNode->pFsm->FpBecomeAssignedLeaderCb(pSyncNode->pFsm);
×
2326
  }
2327

2328
  // min match index
2329
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
×
2330

2331
  // reset log buffer
2332
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
×
2333
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2334
    return;
×
2335
  }
2336

2337
  // trace log
2338
  sNInfo(pSyncNode, "become assigned leader");
×
2339
}
2340

2341
void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
1,045✔
2342
  if (pSyncNode->state != TAOS_SYNC_STATE_CANDIDATE) {
1,045!
2343
    sError("vgId:%d, failed leader from candidate since node state is wrong:%d", pSyncNode->vgId, pSyncNode->state);
×
2344
    return;
×
2345
  }
2346
  bool granted = voteGrantedMajority(pSyncNode->pVotesGranted);
1,045✔
2347
  if (!granted) {
1,045!
2348
    sError("vgId:%d, not granted by majority.", pSyncNode->vgId);
×
2349
    return;
×
2350
  }
2351
  syncNodeBecomeLeader(pSyncNode, "candidate to leader");
1,045✔
2352

2353
  sNTrace(pSyncNode, "state change syncNodeCandidate2Leader");
1,045!
2354

2355
  int32_t ret = syncNodeAppendNoop(pSyncNode);
1,045✔
2356
  if (ret < 0) {
1,045!
2357
    sError("vgId:%d, failed to append noop entry since %s", pSyncNode->vgId, terrstr());
×
2358
  }
2359

2360
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
1,045✔
2361

2362
  sInfo("vgId:%d, become leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64 "", pSyncNode->vgId,
1,045!
2363
        raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
2364
}
2365

2366
bool syncNodeIsMnode(SSyncNode* pSyncNode) { return (pSyncNode->vgId == 1); }
444,557✔
2367

2368
int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) {
29,395✔
2369
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
470,400✔
2370
    pSyncNode->peerStates[i].lastSendIndex = SYNC_INDEX_INVALID;
441,005✔
2371
    pSyncNode->peerStates[i].lastSendTime = 0;
441,005✔
2372
  }
2373

2374
  return 0;
29,395✔
2375
}
2376

2377
void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
1,250✔
2378
  if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
1,250!
2379
    sError("vgId:%d, failed candidate from follower since node state is wrong:%d", pSyncNode->vgId, pSyncNode->state);
×
2380
    return;
×
2381
  }
2382
  pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
1,250✔
2383
  pSyncNode->roleTimeMs = taosGetTimestampMs();
1,250✔
2384
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
1,250✔
2385
  sInfo("vgId:%d, become candidate from follower. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
1,250!
2386
        pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
2387

2388
  sNTrace(pSyncNode, "follower to candidate");
1,250!
2389
}
2390

2391
int32_t syncNodeAssignedLeader2Leader(SSyncNode* pSyncNode) {
×
2392
  if (pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) return TSDB_CODE_SYN_INTERNAL_ERROR;
×
2393
  syncNodeBecomeLeader(pSyncNode, "assigned leader to leader");
×
2394

2395
  sNTrace(pSyncNode, "assigned leader to leader");
×
2396

2397
  int32_t ret = syncNodeAppendNoop(pSyncNode);
×
2398
  if (ret < 0) {
×
2399
    sError("vgId:%d, failed to append noop entry since %s", pSyncNode->vgId, tstrerror(ret));
×
2400
  }
2401

2402
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
×
2403
  sInfo("vgId:%d, become leader from assigned leader. term:%" PRId64 ", commit index:%" PRId64
×
2404
        "assigned commit index:%" PRId64 ", last index:%" PRId64,
2405
        pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, pSyncNode->assignedCommitIndex,
2406
        lastIndex);
2407
  return 0;
×
2408
}
2409

2410
// just called by syncNodeVoteForSelf
2411
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
1,329✔
2412
  SyncTerm storeTerm = raftStoreGetTerm(pSyncNode);
1,329✔
2413
  if (term != storeTerm) {
1,329!
2414
    sError("vgId:%d, failed to vote for term, term:%" PRId64 ", storeTerm:%" PRId64, pSyncNode->vgId, term, storeTerm);
×
2415
    return;
×
2416
  }
2417
  bool voted = raftStoreHasVoted(pSyncNode);
1,329✔
2418
  if (voted) {
1,329!
2419
    sError("vgId:%d, failed to vote for term since not voted", pSyncNode->vgId);
×
2420
    return;
×
2421
  }
2422

2423
  raftStoreVote(pSyncNode, pRaftId);
1,329✔
2424
}
2425

2426
// simulate get vote from outside
2427
void syncNodeVoteForSelf(SSyncNode* pSyncNode, SyncTerm currentTerm) {
1,329✔
2428
  syncNodeVoteForTerm(pSyncNode, currentTerm, &pSyncNode->myRaftId);
1,329✔
2429

2430
  SRpcMsg rpcMsg = {0};
1,329✔
2431
  int32_t ret = syncBuildRequestVoteReply(&rpcMsg, pSyncNode->vgId);
1,329✔
2432
  if (ret != 0) return;
1,329!
2433

2434
  SyncRequestVoteReply* pMsg = rpcMsg.pCont;
1,329✔
2435
  pMsg->srcId = pSyncNode->myRaftId;
1,329✔
2436
  pMsg->destId = pSyncNode->myRaftId;
1,329✔
2437
  pMsg->term = currentTerm;
1,329✔
2438
  pMsg->voteGranted = true;
1,329✔
2439

2440
  voteGrantedVote(pSyncNode->pVotesGranted, pMsg);
1,329✔
2441
  votesRespondAdd(pSyncNode->pVotesRespond, pMsg);
1,329✔
2442
  rpcFreeCont(rpcMsg.pCont);
1,329✔
2443
}
2444

2445
// return if has a snapshot
2446
bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
20,168✔
2447
  bool      ret = false;
20,168✔
2448
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
20,168✔
2449
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
20,168!
2450
    // TODO check return value
2451
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
20,168✔
2452
    if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) {
20,168✔
2453
      ret = true;
2,741✔
2454
    }
2455
  }
2456
  return ret;
20,168✔
2457
}
2458

2459
// return max(logLastIndex, snapshotLastIndex)
2460
// if no snapshot and log, return -1
2461
SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode) {
20,171✔
2462
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
20,171✔
2463
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
20,171!
2464
    // TODO check return value
2465
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
20,171✔
2466
  }
2467
  SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
20,171✔
2468

2469
  SyncIndex lastIndex = logLastIndex > snapshot.lastApplyIndex ? logLastIndex : snapshot.lastApplyIndex;
20,171✔
2470
  return lastIndex;
20,171✔
2471
}
2472

2473
// return the last term of snapshot and log
2474
// if error, return SYNC_TERM_INVALID (by syncLogLastTerm)
2475
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) {
20,168✔
2476
  SyncTerm lastTerm = 0;
20,168✔
2477
  if (syncNodeHasSnapshot(pSyncNode)) {
20,168✔
2478
    // has snapshot
2479
    SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
2,741✔
2480
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
2,741!
2481
      // TODO check return value
2482
      (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
2,741✔
2483
    }
2484

2485
    SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
2,741✔
2486
    if (logLastIndex > snapshot.lastApplyIndex) {
2,741✔
2487
      lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
1,723✔
2488
    } else {
2489
      lastTerm = snapshot.lastApplyTerm;
1,018✔
2490
    }
2491

2492
  } else {
2493
    // no snapshot
2494
    lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
17,427✔
2495
  }
2496

2497
  return lastTerm;
20,168✔
2498
}
2499

2500
// get last index and term along with snapshot
2501
int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm) {
17,932✔
2502
  *pLastIndex = syncNodeGetLastIndex(pSyncNode);
17,932✔
2503
  *pLastTerm = syncNodeGetLastTerm(pSyncNode);
17,932✔
2504
  return 0;
17,932✔
2505
}
2506

2507
#ifdef BUILD_NO_CALL
2508
// return append-entries first try index
2509
SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) {
2510
  SyncIndex syncStartIndex = syncNodeGetLastIndex(pSyncNode) + 1;
2511
  return syncStartIndex;
2512
}
2513

2514
// if index > 0, return index - 1
2515
// else, return -1
2516
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
2517
  SyncIndex preIndex = index - 1;
2518
  if (preIndex < SYNC_INDEX_INVALID) {
2519
    preIndex = SYNC_INDEX_INVALID;
2520
  }
2521

2522
  return preIndex;
2523
}
2524

2525
// if index < 0, return SYNC_TERM_INVALID
2526
// if index == 0, return 0
2527
// if index > 0, return preTerm
2528
// if error, return SYNC_TERM_INVALID
2529
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
2530
  if (index < SYNC_INDEX_BEGIN) {
2531
    return SYNC_TERM_INVALID;
2532
  }
2533

2534
  if (index == SYNC_INDEX_BEGIN) {
2535
    return 0;
2536
  }
2537

2538
  SyncTerm  preTerm = 0;
2539
  SyncIndex preIndex = index - 1;
2540

2541
  SSyncRaftEntry* pPreEntry = NULL;
2542
  SLRUCache*      pCache = pSyncNode->pLogStore->pCache;
2543
  LRUHandle*      h = taosLRUCacheLookup(pCache, &preIndex, sizeof(preIndex));
2544
  int32_t         code = 0;
2545
  if (h) {
2546
    pPreEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
2547
    code = 0;
2548

2549
    pSyncNode->pLogStore->cacheHit++;
2550
    sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", preIndex, pPreEntry->bytes, pPreEntry);
2551

2552
  } else {
2553
    pSyncNode->pLogStore->cacheMiss++;
2554
    sNTrace(pSyncNode, "miss cache index:%" PRId64, preIndex);
2555

2556
    code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry);
2557
  }
2558

2559
  SSnapshot snapshot = {.data = NULL,
2560
                        .lastApplyIndex = SYNC_INDEX_INVALID,
2561
                        .lastApplyTerm = SYNC_TERM_INVALID,
2562
                        .lastConfigIndex = SYNC_INDEX_INVALID};
2563

2564
  if (code == 0) {
2565
    if (pPreEntry == NULL) return -1;
2566
    preTerm = pPreEntry->term;
2567

2568
    if (h) {
2569
      taosLRUCacheRelease(pCache, h, false);
2570
    } else {
2571
      syncEntryDestroy(pPreEntry);
2572
    }
2573

2574
    return preTerm;
2575
  } else {
2576
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
2577
      // TODO check return value
2578
      (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
2579
      if (snapshot.lastApplyIndex == preIndex) {
2580
        return snapshot.lastApplyTerm;
2581
      }
2582
    }
2583
  }
2584

2585
  sNError(pSyncNode, "sync node get pre term error, index:%" PRId64 ", snap-index:%" PRId64 ", snap-term:%" PRId64,
2586
          index, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
2587
  return SYNC_TERM_INVALID;
2588
}
2589

2590
// get pre index and term of "index"
2591
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm) {
2592
  *pPreIndex = syncNodeGetPreIndex(pSyncNode, index);
2593
  *pPreTerm = syncNodeGetPreTerm(pSyncNode, index);
2594
  return 0;
2595
}
2596
#endif
2597

2598
static void syncNodeEqPingTimer(void* param, void* tmrId) {
205,150✔
2599
  if (!syncIsInit()) return;
205,150!
2600

2601
  int64_t    rid = (int64_t)param;
205,150✔
2602
  SSyncNode* pNode = syncNodeAcquire(rid);
205,150✔
2603

2604
  if (pNode == NULL) return;
205,150!
2605

2606
  if (atomic_load_64(&pNode->pingTimerLogicClockUser) <= atomic_load_64(&pNode->pingTimerLogicClock)) {
205,150!
2607
    SRpcMsg rpcMsg = {0};
205,150✔
2608
    int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock),
205,150✔
2609
                                    pNode->pingTimerMS, pNode);
2610
    if (code != 0) {
205,150!
2611
      sError("failed to build ping msg");
×
2612
      rpcFreeCont(rpcMsg.pCont);
×
2613
      goto _out;
×
2614
    }
2615

2616
    // sTrace("enqueue ping msg");
2617
    code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
205,150✔
2618
    if (code != 0) {
205,150✔
2619
      sError("failed to sync enqueue ping msg since %s", terrstr());
1!
2620
      rpcFreeCont(rpcMsg.pCont);
1✔
2621
      goto _out;
1✔
2622
    }
2623

2624
  _out:
205,149✔
2625
    if (taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager,
205,150!
2626
                     &pNode->pPingTimer))
2627
      sError("failed to reset ping timer");
×
2628
  }
2629
  syncNodeRelease(pNode);
205,150✔
2630
}
2631

2632
static void syncNodeEqElectTimer(void* param, void* tmrId) {
1,333✔
2633
  if (!syncIsInit()) return;
1,336!
2634

2635
  int64_t    rid = (int64_t)param;
1,333✔
2636
  SSyncNode* pNode = syncNodeAcquire(rid);
1,333✔
2637

2638
  if (pNode == NULL) return;
1,333✔
2639

2640
  if (pNode->syncEqMsg == NULL) {
1,331!
2641
    syncNodeRelease(pNode);
×
2642
    return;
×
2643
  }
2644

2645
  int64_t tsNow = taosGetTimestampMs();
1,331✔
2646
  if (tsNow < pNode->electTimerParam.executeTime) {
1,331✔
2647
    syncNodeRelease(pNode);
1✔
2648
    return;
1✔
2649
  }
2650

2651
  SRpcMsg rpcMsg = {0};
1,330✔
2652
  int32_t code =
2653
      syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pNode->electTimerParam.logicClock, pNode->electTimerMS, pNode);
1,330✔
2654

2655
  if (code != 0) {
1,330!
2656
    sError("failed to build elect msg");
×
2657
    syncNodeRelease(pNode);
×
2658
    return;
×
2659
  }
2660

2661
  SyncTimeout* pTimeout = rpcMsg.pCont;
1,330✔
2662
  sNTrace(pNode, "enqueue elect msg lc:%" PRId64, pTimeout->logicClock);
1,330!
2663

2664
  code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
1,330✔
2665
  if (code != 0) {
1,330!
2666
    sError("failed to sync enqueue elect msg since %s", terrstr());
×
2667
    rpcFreeCont(rpcMsg.pCont);
×
2668
    syncNodeRelease(pNode);
×
2669
    return;
×
2670
  }
2671

2672
  syncNodeRelease(pNode);
1,330✔
2673
}
2674

2675
#ifdef BUILD_NO_CALL
2676
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
2677
  if (!syncIsInit()) return;
2678

2679
  int64_t    rid = (int64_t)param;
2680
  SSyncNode* pNode = syncNodeAcquire(rid);
2681

2682
  if (pNode == NULL) return;
2683

2684
  if (pNode->totalReplicaNum > 1) {
2685
    if (atomic_load_64(&pNode->heartbeatTimerLogicClockUser) <= atomic_load_64(&pNode->heartbeatTimerLogicClock)) {
2686
      SRpcMsg rpcMsg = {0};
2687
      int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pNode->heartbeatTimerLogicClock),
2688
                                      pNode->heartbeatTimerMS, pNode);
2689

2690
      if (code != 0) {
2691
        sError("failed to build heartbeat msg");
2692
        goto _out;
2693
      }
2694

2695
      sTrace("vgId:%d, enqueue heartbeat timer", pNode->vgId);
2696
      code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
2697
      if (code != 0) {
2698
        sError("failed to enqueue heartbeat msg since %s", terrstr());
2699
        rpcFreeCont(rpcMsg.pCont);
2700
        goto _out;
2701
      }
2702

2703
    _out:
2704
      if (taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager,
2705
                       &pNode->pHeartbeatTimer) != 0)
2706
        return;
2707

2708
    } else {
2709
      sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRId64 ", heartbeatTimerLogicClockUser:%" PRId64,
2710
             pNode->heartbeatTimerLogicClock, pNode->heartbeatTimerLogicClockUser);
2711
    }
2712
  }
2713
}
2714
#endif
2715

2716
static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
45,326✔
2717
  int32_t code = 0;
45,326✔
2718
  int64_t hbDataRid = (int64_t)param;
45,326✔
2719
  int64_t tsNow = taosGetTimestampMs();
45,326✔
2720

2721
  SSyncHbTimerData* pData = syncHbTimerDataAcquire(hbDataRid);
45,326✔
2722
  if (pData == NULL) {
45,326!
2723
    sError("hb timer get pData NULL, %" PRId64, hbDataRid);
×
2724
    return;
×
2725
  }
2726

2727
  SSyncNode* pSyncNode = syncNodeAcquire(pData->syncNodeRid);
45,326✔
2728
  if (pSyncNode == NULL) {
45,326✔
2729
    syncHbTimerDataRelease(pData);
2✔
2730
    sError("hb timer get pSyncNode NULL");
2!
2731
    return;
2✔
2732
  }
2733

2734
  SSyncTimer* pSyncTimer = pData->pTimer;
45,324✔
2735

2736
  if (!pSyncNode->isStart) {
45,324!
2737
    syncNodeRelease(pSyncNode);
×
2738
    syncHbTimerDataRelease(pData);
×
2739
    sError("vgId:%d, hb timer sync node already stop", pSyncNode->vgId);
×
2740
    return;
×
2741
  }
2742

2743
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
45,324!
2744
    syncNodeRelease(pSyncNode);
×
2745
    syncHbTimerDataRelease(pData);
×
2746
    sError("vgId:%d, hb timer sync node not leader", pSyncNode->vgId);
×
2747
    return;
×
2748
  }
2749

2750
  sTrace("vgId:%d, peer hb timer execution, rid:%" PRId64 " addr:%" PRId64, pSyncNode->vgId, hbDataRid,
45,324!
2751
         pData->destId.addr);
2752

2753
  if (pSyncNode->totalReplicaNum > 1) {
45,324✔
2754
    int64_t timerLogicClock = atomic_load_64(&pSyncTimer->logicClock);
45,322✔
2755
    int64_t msgLogicClock = atomic_load_64(&pData->logicClock);
45,322✔
2756

2757
    if (timerLogicClock == msgLogicClock) {
45,322✔
2758
      if (tsNow > pData->execTime) {
45,320✔
2759
        pData->execTime += pSyncTimer->timerMS;
45,279✔
2760

2761
        SRpcMsg rpcMsg = {0};
45,279✔
2762
        if ((code = syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId)) != 0) {
45,279!
2763
          sError("vgId:%d, failed to build heartbeat msg since %s", pSyncNode->vgId, tstrerror(code));
×
2764
          syncNodeRelease(pSyncNode);
×
2765
          syncHbTimerDataRelease(pData);
×
2766
          return;
×
2767
        }
2768

2769
        pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode);
45,279✔
2770

2771
        SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
45,279✔
2772
        pSyncMsg->srcId = pSyncNode->myRaftId;
45,279✔
2773
        pSyncMsg->destId = pData->destId;
45,279✔
2774
        pSyncMsg->term = raftStoreGetTerm(pSyncNode);
45,279✔
2775
        pSyncMsg->commitIndex = pSyncNode->commitIndex;
45,279✔
2776
        pSyncMsg->minMatchIndex = pSyncNode->minMatchIndex;
45,279✔
2777
        pSyncMsg->privateTerm = 0;
45,279✔
2778
        pSyncMsg->timeStamp = tsNow;
45,279✔
2779

2780
        // update reset time
2781
        int64_t timerElapsed = tsNow - pSyncTimer->timeStamp;
45,279✔
2782
        pSyncTimer->timeStamp = tsNow;
45,279✔
2783

2784
        // send msg
2785
        TRACE_SET_MSGID(&(rpcMsg.info.traceId), tGenIdPI64());
45,279✔
2786
        STraceId* trace = &(rpcMsg.info.traceId);
45,279✔
2787
        sGTrace("vgId:%d, send sync-heartbeat to dnode:%d", pSyncNode->vgId, DID(&(pSyncMsg->destId)));
45,279!
2788
        syncLogSendHeartbeat(pSyncNode, pSyncMsg, false, timerElapsed, pData->execTime);
45,279✔
2789
        int ret = syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
45,279✔
2790
        if (ret != 0) {
45,279✔
2791
          sError("vgId:%d, failed to send heartbeat since %s", pSyncNode->vgId, tstrerror(ret));
86!
2792
        }
2793
      }
2794

2795
      if (syncIsInit()) {
45,320!
2796
        sTrace("vgId:%d, reset peer hb timer at %d", pSyncNode->vgId, pSyncTimer->timerMS);
45,320!
2797
        bool stopped = taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS, (void*)hbDataRid,
45,320✔
2798
                                    syncEnv()->pTimerManager, &pSyncTimer->pTimer);
45,320✔
2799
        if (stopped) sError("vgId:%d, reset peer hb timer error, %s", pSyncNode->vgId, tstrerror(code));
45,320!
2800

2801
      } else {
2802
        sError("sync env is stop, reset peer hb timer error");
×
2803
      }
2804

2805
    } else {
2806
      sTrace("vgId:%d, do not send hb, timerLogicClock:%" PRId64 ", msgLogicClock:%" PRId64 "", pSyncNode->vgId,
2!
2807
             timerLogicClock, msgLogicClock);
2808
    }
2809
  }
2810

2811
  syncHbTimerDataRelease(pData);
45,324✔
2812
  syncNodeRelease(pSyncNode);
45,324✔
2813
}
2814

2815
#ifdef BUILD_NO_CALL
2816
static void deleteCacheEntry(const void* key, size_t keyLen, void* value, void* ud) {
2817
  (void)ud;
2818
  taosMemoryFree(value);
2819
}
2820

2821
int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) {
2822
  SSyncLogStoreData* pData = pLogStore->data;
2823
  sNTrace(pData->pSyncNode, "in cache index:%" PRId64 ", bytes:%u, %p", pEntry->index, pEntry->bytes, pEntry);
2824

2825
  int32_t   code = 0;
2826
  int32_t   entryLen = sizeof(*pEntry) + pEntry->dataLen;
2827
  LRUStatus status = taosLRUCacheInsert(pLogStore->pCache, &pEntry->index, sizeof(pEntry->index), pEntry, entryLen,
2828
                                        deleteCacheEntry, h, TAOS_LRU_PRIORITY_LOW, NULL);
2829
  if (status != TAOS_LRU_STATUS_OK) {
2830
    code = -1;
2831
  }
2832

2833
  return code;
2834
}
2835
#endif
2836

2837
void syncBuildConfigFromReq(SAlterVnodeReplicaReq* pReq, SSyncCfg* cfg) {  // TODO SAlterVnodeReplicaReq name is proper?
×
2838
  cfg->replicaNum = 0;
×
2839
  cfg->totalReplicaNum = 0;
×
2840
  int32_t code = 0;
×
2841

2842
  for (int i = 0; i < pReq->replica; ++i) {
×
2843
    SNodeInfo* pNode = &cfg->nodeInfo[i];
×
2844
    pNode->nodeId = pReq->replicas[i].id;
×
2845
    pNode->nodePort = pReq->replicas[i].port;
×
2846
    tstrncpy(pNode->nodeFqdn, pReq->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
×
2847
    pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
×
2848
    bool update = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
×
2849
    sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d, update:%d", pReq->vgId, i, pNode->nodeFqdn,
×
2850
          pNode->nodePort, pNode->nodeId, pNode->nodeRole, update);
2851
    cfg->replicaNum++;
×
2852
  }
2853
  if (pReq->selfIndex != -1) {
×
2854
    cfg->myIndex = pReq->selfIndex;
×
2855
  }
2856
  for (int i = cfg->replicaNum; i < pReq->replica + pReq->learnerReplica; ++i) {
×
2857
    SNodeInfo* pNode = &cfg->nodeInfo[i];
×
2858
    pNode->nodeId = pReq->learnerReplicas[cfg->totalReplicaNum].id;
×
2859
    pNode->nodePort = pReq->learnerReplicas[cfg->totalReplicaNum].port;
×
2860
    pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
2861
    tstrncpy(pNode->nodeFqdn, pReq->learnerReplicas[cfg->totalReplicaNum].fqdn, sizeof(pNode->nodeFqdn));
×
2862
    bool update = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
×
2863
    sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d, update:%d", pReq->vgId, i, pNode->nodeFqdn,
×
2864
          pNode->nodePort, pNode->nodeId, pNode->nodeRole, update);
2865
    cfg->totalReplicaNum++;
×
2866
  }
2867
  cfg->totalReplicaNum += pReq->replica;
×
2868
  if (pReq->learnerSelfIndex != -1) {
×
2869
    cfg->myIndex = pReq->replica + pReq->learnerSelfIndex;
×
2870
  }
2871
  cfg->changeVersion = pReq->changeVersion;
×
2872
}
×
2873

2874
int32_t syncNodeCheckChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry) {
×
2875
  int32_t code = 0;
×
2876
  if (pEntry->originalRpcType != TDMT_SYNC_CONFIG_CHANGE) {
×
2877
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
2878
  }
2879

2880
  SMsgHead* head = (SMsgHead*)pEntry->data;
×
2881
  void*     pReq = POINTER_SHIFT(head, sizeof(SMsgHead));
×
2882

2883
  SAlterVnodeTypeReq req = {0};
×
2884
  if (tDeserializeSAlterVnodeReplicaReq(pReq, head->contLen, &req) != 0) {
×
2885
    code = TSDB_CODE_INVALID_MSG;
×
2886
    TAOS_RETURN(code);
×
2887
  }
2888

2889
  SSyncCfg cfg = {0};
×
2890
  syncBuildConfigFromReq(&req, &cfg);
×
2891

2892
  if (cfg.totalReplicaNum >= 1 &&
×
2893
      (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER)) {
×
2894
    bool incfg = false;
×
2895
    for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
×
2896
      if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
×
2897
          ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
×
2898
        incfg = true;
×
2899
        break;
×
2900
      }
2901
    }
2902

2903
    if (!incfg) {
×
2904
      SyncTerm currentTerm = raftStoreGetTerm(ths);
×
2905
      SRaftId  id = EMPTY_RAFT_ID;
×
2906
      syncNodeStepDown(ths, currentTerm, id);
×
2907
      return 1;
×
2908
    }
2909
  }
2910
  return 0;
×
2911
}
2912

2913
void syncNodeLogConfigInfo(SSyncNode* ths, SSyncCfg* cfg, char* str) {
×
2914
  sInfo("vgId:%d, %s. SyncNode, replicaNum:%d, peersNum:%d, lastConfigIndex:%" PRId64
×
2915
        ", changeVersion:%d, "
2916
        "restoreFinish:%d",
2917
        ths->vgId, str, ths->replicaNum, ths->peersNum, ths->raftCfg.lastConfigIndex, ths->raftCfg.cfg.changeVersion,
2918
        ths->restoreFinish);
2919

2920
  sInfo("vgId:%d, %s, myNodeInfo, clusterId:%" PRId64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str,
×
2921
        ths->myNodeInfo.clusterId, ths->myNodeInfo.nodeId, ths->myNodeInfo.nodeFqdn, ths->myNodeInfo.nodePort,
2922
        ths->myNodeInfo.nodeRole);
2923

2924
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
2925
    sInfo("vgId:%d, %s, peersNodeInfo%d, clusterId:%" PRId64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str,
×
2926
          i, ths->peersNodeInfo[i].clusterId, ths->peersNodeInfo[i].nodeId, ths->peersNodeInfo[i].nodeFqdn,
2927
          ths->peersNodeInfo[i].nodePort, ths->peersNodeInfo[i].nodeRole);
2928
  }
2929

2930
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
2931
    char    buf[256];
2932
    int32_t len = 256;
×
2933
    int32_t n = 0;
×
2934
    n += tsnprintf(buf + n, len - n, "%s", "{");
×
2935
    for (int i = 0; i < ths->peersEpset->numOfEps; i++) {
×
2936
      n += tsnprintf(buf + n, len - n, "%s:%d%s", ths->peersEpset->eps[i].fqdn, ths->peersEpset->eps[i].port,
×
2937
                    (i + 1 < ths->peersEpset->numOfEps ? ", " : ""));
×
2938
    }
2939
    n += tsnprintf(buf + n, len - n, "%s", "}");
×
2940

2941
    sInfo("vgId:%d, %s, peersEpset%d, %s, inUse:%d", ths->vgId, str, i, buf, ths->peersEpset->inUse);
×
2942
  }
2943

2944
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
2945
    sInfo("vgId:%d, %s, peersId%d, addr:%" PRId64, ths->vgId, str, i, ths->peersId[i].addr);
×
2946
  }
2947

2948
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
2949
    sInfo("vgId:%d, %s, nodeInfo%d, clusterId:%" PRId64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str, i,
×
2950
          ths->raftCfg.cfg.nodeInfo[i].clusterId, ths->raftCfg.cfg.nodeInfo[i].nodeId,
2951
          ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, ths->raftCfg.cfg.nodeInfo[i].nodePort,
2952
          ths->raftCfg.cfg.nodeInfo[i].nodeRole);
2953
  }
2954

2955
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
2956
    sInfo("vgId:%d, %s, replicasId%d, addr:%" PRId64, ths->vgId, str, i, ths->replicasId[i].addr);
×
2957
  }
2958
}
×
2959

2960
int32_t syncNodeRebuildPeerAndCfg(SSyncNode* ths, SSyncCfg* cfg) {
×
2961
  int32_t i = 0;
×
2962

2963
  // change peersNodeInfo
2964
  i = 0;
×
2965
  for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
2966
    if (!(strcmp(ths->myNodeInfo.nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
2967
          ths->myNodeInfo.nodePort == cfg->nodeInfo[j].nodePort)) {
×
2968
      ths->peersNodeInfo[i].nodeRole = cfg->nodeInfo[j].nodeRole;
×
2969
      ths->peersNodeInfo[i].clusterId = cfg->nodeInfo[j].clusterId;
×
2970
      tstrncpy(ths->peersNodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn, TSDB_FQDN_LEN);
×
2971
      ths->peersNodeInfo[i].nodeId = cfg->nodeInfo[j].nodeId;
×
2972
      ths->peersNodeInfo[i].nodePort = cfg->nodeInfo[j].nodePort;
×
2973

2974
      syncUtilNodeInfo2EpSet(&ths->peersNodeInfo[i], &ths->peersEpset[i]);
×
2975

2976
      if (syncUtilNodeInfo2RaftId(&ths->peersNodeInfo[i], ths->vgId, &ths->peersId[i]) == false) {
×
2977
        sError("vgId:%d, failed to determine raft member id, peer:%d", ths->vgId, i);
×
2978
        return terrno;
×
2979
      }
2980

2981
      i++;
×
2982
    }
2983
  }
2984
  ths->peersNum = i;
×
2985

2986
  // change cfg nodeInfo
2987
  ths->raftCfg.cfg.replicaNum = 0;
×
2988
  i = 0;
×
2989
  for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
2990
    if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
2991
      ths->raftCfg.cfg.replicaNum++;
×
2992
    }
2993
    ths->raftCfg.cfg.nodeInfo[i].nodeRole = cfg->nodeInfo[j].nodeRole;
×
2994
    ths->raftCfg.cfg.nodeInfo[i].clusterId = cfg->nodeInfo[j].clusterId;
×
2995
    tstrncpy(ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn, TSDB_FQDN_LEN);
×
2996
    ths->raftCfg.cfg.nodeInfo[i].nodeId = cfg->nodeInfo[j].nodeId;
×
2997
    ths->raftCfg.cfg.nodeInfo[i].nodePort = cfg->nodeInfo[j].nodePort;
×
2998
    if ((strcmp(ths->myNodeInfo.nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
2999
         ths->myNodeInfo.nodePort == cfg->nodeInfo[j].nodePort)) {
×
3000
      ths->raftCfg.cfg.myIndex = i;
×
3001
    }
3002
    i++;
×
3003
  }
3004
  ths->raftCfg.cfg.totalReplicaNum = i;
×
3005

3006
  return 0;
×
3007
}
3008

3009
void syncNodeChangePeerAndCfgToVoter(SSyncNode* ths, SSyncCfg* cfg) {
×
3010
  // change peersNodeInfo
3011
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
3012
    for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
3013
      if (strcmp(ths->peersNodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
3014
          ths->peersNodeInfo[i].nodePort == cfg->nodeInfo[j].nodePort) {
×
3015
        if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3016
          ths->peersNodeInfo[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3017
        }
3018
      }
3019
    }
3020
  }
3021

3022
  // change cfg nodeInfo
3023
  ths->raftCfg.cfg.replicaNum = 0;
×
3024
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3025
    for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
3026
      if (strcmp(ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
3027
          ths->raftCfg.cfg.nodeInfo[i].nodePort == cfg->nodeInfo[j].nodePort) {
×
3028
        if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3029
          ths->raftCfg.cfg.nodeInfo[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3030
          ths->raftCfg.cfg.replicaNum++;
×
3031
        }
3032
      }
3033
    }
3034
  }
3035
}
×
3036

3037
int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum) {
×
3038
  int32_t code = 0;
×
3039
  // 1.rebuild replicasId, remove deleted one
3040
  SRaftId oldReplicasId[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
3041
  memcpy(oldReplicasId, ths->replicasId, sizeof(oldReplicasId));
×
3042

3043
  ths->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3044
  ths->totalReplicaNum = ths->raftCfg.cfg.totalReplicaNum;
×
3045
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3046
    if (syncUtilNodeInfo2RaftId(&ths->raftCfg.cfg.nodeInfo[i], ths->vgId, &ths->replicasId[i]) == false) return terrno;
×
3047
  }
3048

3049
  // 2.rebuild MatchIndex, remove deleted one
3050
  SSyncIndexMgr* oldIndex = ths->pMatchIndex;
×
3051

3052
  ths->pMatchIndex = syncIndexMgrCreate(ths);
×
3053
  if (ths->pMatchIndex == NULL) {
×
3054
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
3055
    if (terrno != 0) code = terrno;
×
3056
    TAOS_RETURN(code);
×
3057
  }
3058

3059
  syncIndexMgrCopyIfExist(ths->pMatchIndex, oldIndex, oldReplicasId);
×
3060

3061
  syncIndexMgrDestroy(oldIndex);
×
3062

3063
  // 3.rebuild NextIndex, remove deleted one
3064
  SSyncIndexMgr* oldNextIndex = ths->pNextIndex;
×
3065

3066
  ths->pNextIndex = syncIndexMgrCreate(ths);
×
3067
  if (ths->pNextIndex == NULL) {
×
3068
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
3069
    if (terrno != 0) code = terrno;
×
3070
    TAOS_RETURN(code);
×
3071
  }
3072

3073
  syncIndexMgrCopyIfExist(ths->pNextIndex, oldNextIndex, oldReplicasId);
×
3074

3075
  syncIndexMgrDestroy(oldNextIndex);
×
3076

3077
  // 4.rebuild pVotesGranted, pVotesRespond, no need to keep old vote state, only rebuild
3078
  voteGrantedUpdate(ths->pVotesGranted, ths);
×
3079
  votesRespondUpdate(ths->pVotesRespond, ths);
×
3080

3081
  // 5.rebuild logReplMgr
3082
  for (int i = 0; i < oldtotalReplicaNum; ++i) {
×
3083
    sDebug("vgId:%d, old logReplMgrs i:%d, peerId:%d, restoreed:%d, [%" PRId64 " %" PRId64 ", %" PRId64 ")", ths->vgId,
×
3084
           i, ths->logReplMgrs[i]->peerId, ths->logReplMgrs[i]->restored, ths->logReplMgrs[i]->startIndex,
3085
           ths->logReplMgrs[i]->matchIndex, ths->logReplMgrs[i]->endIndex);
3086
  }
3087

3088
  SSyncLogReplMgr* oldLogReplMgrs = NULL;
×
3089
  int64_t          length = sizeof(SSyncLogReplMgr) * (TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA);
×
3090
  oldLogReplMgrs = taosMemoryMalloc(length);
×
3091
  if (NULL == oldLogReplMgrs) return terrno;
×
3092
  memset(oldLogReplMgrs, 0, length);
×
3093

3094
  for (int i = 0; i < oldtotalReplicaNum; i++) {
×
3095
    oldLogReplMgrs[i] = *(ths->logReplMgrs[i]);
×
3096
  }
3097

3098
  syncNodeLogReplDestroy(ths);
×
3099
  if ((code = syncNodeLogReplInit(ths)) != 0) {
×
3100
    taosMemoryFree(oldLogReplMgrs);
×
3101
    TAOS_RETURN(code);
×
3102
  }
3103

3104
  for (int i = 0; i < ths->totalReplicaNum; ++i) {
×
3105
    for (int j = 0; j < oldtotalReplicaNum; j++) {
×
3106
      if (syncUtilSameId(&ths->replicasId[i], &oldReplicasId[j])) {
×
3107
        *(ths->logReplMgrs[i]) = oldLogReplMgrs[j];
×
3108
        ths->logReplMgrs[i]->peerId = i;
×
3109
      }
3110
    }
3111
  }
3112

3113
  for (int i = 0; i < ths->totalReplicaNum; ++i) {
×
3114
    sDebug("vgId:%d, new logReplMgrs i:%d, peerId:%d, restoreed:%d, [%" PRId64 " %" PRId64 ", %" PRId64 ")", ths->vgId,
×
3115
           i, ths->logReplMgrs[i]->peerId, ths->logReplMgrs[i]->restored, ths->logReplMgrs[i]->startIndex,
3116
           ths->logReplMgrs[i]->matchIndex, ths->logReplMgrs[i]->endIndex);
3117
  }
3118

3119
  // 6.rebuild sender
3120
  for (int i = 0; i < oldtotalReplicaNum; ++i) {
×
3121
    sDebug("vgId:%d, old sender i:%d, replicaIndex:%d, lastSendTime:%" PRId64, ths->vgId, i,
×
3122
           ths->senders[i]->replicaIndex, ths->senders[i]->lastSendTime)
3123
  }
3124

3125
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
×
3126
    if (ths->senders[i] != NULL) {
×
3127
      sDebug("vgId:%d, snapshot sender destroy while close, data:%p", ths->vgId, ths->senders[i]);
×
3128

3129
      if (snapshotSenderIsStart(ths->senders[i])) {
×
3130
        snapshotSenderStop(ths->senders[i], false);
×
3131
      }
3132

3133
      snapshotSenderDestroy(ths->senders[i]);
×
3134
      ths->senders[i] = NULL;
×
3135
    }
3136
  }
3137

3138
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
×
3139
    SSyncSnapshotSender* pSender = NULL;
×
3140
    int32_t              code = snapshotSenderCreate(ths, i, &pSender);
×
3141
    if (pSender == NULL) return terrno = code;
×
3142

3143
    ths->senders[i] = pSender;
×
3144
    sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender);
×
3145
  }
3146

3147
  for (int i = 0; i < ths->totalReplicaNum; i++) {
×
3148
    sDebug("vgId:%d, new sender i:%d, replicaIndex:%d, lastSendTime:%" PRId64, ths->vgId, i,
×
3149
           ths->senders[i]->replicaIndex, ths->senders[i]->lastSendTime)
3150
  }
3151

3152
  // 7.rebuild synctimer
3153
  if ((code = syncNodeStopHeartbeatTimer(ths)) != 0) {
×
3154
    taosMemoryFree(oldLogReplMgrs);
×
3155
    TAOS_RETURN(code);
×
3156
  }
3157

3158
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
×
3159
    if ((code = syncHbTimerInit(ths, &ths->peerHeartbeatTimerArr[i], ths->replicasId[i])) != 0) {
×
3160
      taosMemoryFree(oldLogReplMgrs);
×
3161
      TAOS_RETURN(code);
×
3162
    }
3163
  }
3164

3165
  if ((code = syncNodeStartHeartbeatTimer(ths)) != 0) {
×
3166
    taosMemoryFree(oldLogReplMgrs);
×
3167
    TAOS_RETURN(code);
×
3168
  }
3169

3170
  // 8.rebuild peerStates
3171
  SPeerState oldState[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA] = {0};
×
3172
  for (int i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; i++) {
×
3173
    oldState[i] = ths->peerStates[i];
×
3174
  }
3175

3176
  for (int i = 0; i < ths->totalReplicaNum; i++) {
×
3177
    for (int j = 0; j < oldtotalReplicaNum; j++) {
×
3178
      if (syncUtilSameId(&ths->replicasId[i], &oldReplicasId[j])) {
×
3179
        ths->peerStates[i] = oldState[j];
×
3180
      }
3181
    }
3182
  }
3183

3184
  taosMemoryFree(oldLogReplMgrs);
×
3185

3186
  return 0;
×
3187
}
3188

3189
void syncNodeChangeToVoter(SSyncNode* ths) {
×
3190
  // replicasId, only need to change replicaNum when 1->3
3191
  ths->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3192
  sDebug("vgId:%d, totalReplicaNum:%d", ths->vgId, ths->totalReplicaNum);
×
3193
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
×
3194
    sDebug("vgId:%d, i:%d, replicaId.addr:%" PRIx64, ths->vgId, i, ths->replicasId[i].addr);
×
3195
  }
3196

3197
  // pMatchIndex, pNextIndex, only need to change replicaNum when 1->3
3198
  ths->pMatchIndex->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3199
  ths->pNextIndex->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3200

3201
  sDebug("vgId:%d, pMatchIndex->totalReplicaNum:%d", ths->vgId, ths->pMatchIndex->totalReplicaNum);
×
3202
  for (int32_t i = 0; i < ths->pMatchIndex->totalReplicaNum; ++i) {
×
3203
    sDebug("vgId:%d, i:%d, match.index:%" PRId64, ths->vgId, i, ths->pMatchIndex->index[i]);
×
3204
  }
3205

3206
  // pVotesGranted, pVotesRespond
3207
  voteGrantedUpdate(ths->pVotesGranted, ths);
×
3208
  votesRespondUpdate(ths->pVotesRespond, ths);
×
3209

3210
  // logRepMgrs
3211
  // no need to change logRepMgrs when 1->3
3212
}
×
3213

3214
void syncNodeResetPeerAndCfg(SSyncNode* ths) {
×
3215
  SNodeInfo node = {0};
×
3216
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
3217
    memcpy(&ths->peersNodeInfo[i], &node, sizeof(SNodeInfo));
×
3218
  }
3219

3220
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3221
    memcpy(&ths->raftCfg.cfg.nodeInfo[i], &node, sizeof(SNodeInfo));
×
3222
  }
3223
}
×
3224

3225
int32_t syncNodeChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry, char* str) {
×
3226
  int32_t code = 0;
×
3227
  if (pEntry->originalRpcType != TDMT_SYNC_CONFIG_CHANGE) {
×
3228
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3229
  }
3230

3231
  SMsgHead* head = (SMsgHead*)pEntry->data;
×
3232
  void*     pReq = POINTER_SHIFT(head, sizeof(SMsgHead));
×
3233

3234
  SAlterVnodeTypeReq req = {0};
×
3235
  if (tDeserializeSAlterVnodeReplicaReq(pReq, head->contLen, &req) != 0) {
×
3236
    code = TSDB_CODE_INVALID_MSG;
×
3237
    TAOS_RETURN(code);
×
3238
  }
3239

3240
  SSyncCfg cfg = {0};
×
3241
  syncBuildConfigFromReq(&req, &cfg);
×
3242

3243
  if (cfg.changeVersion <= ths->raftCfg.cfg.changeVersion) {
×
3244
    sInfo(
×
3245
        "vgId:%d, skip conf change entry since lower version. "
3246
        "this entry, index:%" PRId64 ", term:%" PRId64
3247
        ", totalReplicaNum:%d, changeVersion:%d; "
3248
        "current node, replicaNum:%d, peersNum:%d, lastConfigIndex:%" PRId64 ", changeVersion:%d",
3249
        ths->vgId, pEntry->index, pEntry->term, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum, ths->peersNum,
3250
        ths->raftCfg.lastConfigIndex, ths->raftCfg.cfg.changeVersion);
3251
    return 0;
×
3252
  }
3253

3254
  if (strcmp(str, "Commit") == 0) {
×
3255
    sInfo(
×
3256
        "vgId:%d, change config from %s. "
3257
        "this, i:%" PRId64
3258
        ", trNum:%d, vers:%d; "
3259
        "node, rNum:%d, pNum:%d, trNum:%d, "
3260
        "buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
3261
        "), "
3262
        "cond:(next i:%" PRId64 ", t:%" PRId64 " ==%s)",
3263
        ths->vgId, str, pEntry->index - 1, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum, ths->peersNum,
3264
        ths->totalReplicaNum, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex, ths->pLogBuf->matchIndex,
3265
        ths->pLogBuf->endIndex, pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType));
3266
  } else {
3267
    sInfo(
×
3268
        "vgId:%d, change config from %s. "
3269
        "this, i:%" PRId64 ", t:%" PRId64
3270
        ", trNum:%d, vers:%d; "
3271
        "node, rNum:%d, pNum:%d, trNum:%d, "
3272
        "buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
3273
        "), "
3274
        "cond:(pre i:%" PRId64 "==ci:%" PRId64 ", bci:%" PRId64 ")",
3275
        ths->vgId, str, pEntry->index, pEntry->term, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum,
3276
        ths->peersNum, ths->totalReplicaNum, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
3277
        ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex, pEntry->index - 1, ths->commitIndex,
3278
        ths->pLogBuf->commitIndex);
3279
  }
3280

3281
  syncNodeLogConfigInfo(ths, &cfg, "before config change");
×
3282

3283
  int32_t oldTotalReplicaNum = ths->totalReplicaNum;
×
3284

3285
  if (cfg.totalReplicaNum == 1 || cfg.totalReplicaNum == 2) {  // remove replica
×
3286

3287
    bool incfg = false;
×
3288
    for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
×
3289
      if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
×
3290
          ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
×
3291
        incfg = true;
×
3292
        break;
×
3293
      }
3294
    }
3295

3296
    if (incfg) {  // remove other
×
3297
      syncNodeResetPeerAndCfg(ths);
×
3298

3299
      // no need to change myNodeInfo
3300

3301
      if ((code = syncNodeRebuildPeerAndCfg(ths, &cfg)) != 0) {
×
3302
        TAOS_RETURN(code);
×
3303
      };
3304

3305
      if ((code = syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum)) != 0) {
×
3306
        TAOS_RETURN(code);
×
3307
      };
3308
    } else {  // remove myself
3309
      // no need to do anything actually, to change the following to reduce distruptive server chance
3310

3311
      syncNodeResetPeerAndCfg(ths);
×
3312

3313
      // change myNodeInfo
3314
      ths->myNodeInfo.nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3315

3316
      // change peer and cfg
3317
      ths->peersNum = 0;
×
3318
      memcpy(&ths->raftCfg.cfg.nodeInfo[0], &ths->myNodeInfo, sizeof(SNodeInfo));
×
3319
      ths->raftCfg.cfg.replicaNum = 0;
×
3320
      ths->raftCfg.cfg.totalReplicaNum = 1;
×
3321

3322
      // change other
3323
      if ((code = syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum)) != 0) {
×
3324
        TAOS_RETURN(code);
×
3325
      }
3326

3327
      // change state
3328
      ths->state = TAOS_SYNC_STATE_LEARNER;
×
3329
    }
3330

3331
    ths->restoreFinish = false;
×
3332
  } else {                            // add replica, or change replica type
3333
    if (ths->totalReplicaNum == 3) {  // change replica type
×
3334
      sInfo("vgId:%d, begin change replica type", ths->vgId);
×
3335

3336
      // change myNodeInfo
3337
      for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
×
3338
        if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
×
3339
            ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
×
3340
          if (cfg.nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3341
            ths->myNodeInfo.nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3342
          }
3343
        }
3344
      }
3345

3346
      // change peer and cfg
3347
      syncNodeChangePeerAndCfgToVoter(ths, &cfg);
×
3348

3349
      // change other
3350
      syncNodeChangeToVoter(ths);
×
3351

3352
      // change state
3353
      if (ths->state == TAOS_SYNC_STATE_LEARNER) {
×
3354
        if (ths->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3355
          ths->state = TAOS_SYNC_STATE_FOLLOWER;
×
3356
        }
3357
      }
3358

3359
      ths->restoreFinish = false;
×
3360
    } else {  // add replica
3361
      sInfo("vgId:%d, begin add replica", ths->vgId);
×
3362

3363
      // no need to change myNodeInfo
3364

3365
      // change peer and cfg
3366
      if ((code = syncNodeRebuildPeerAndCfg(ths, &cfg)) != 0) {
×
3367
        TAOS_RETURN(code);
×
3368
      };
3369

3370
      // change other
3371
      if ((code = syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum)) != 0) {
×
3372
        TAOS_RETURN(code);
×
3373
      };
3374

3375
      // no need to change state
3376

3377
      if (ths->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_LEARNER) {
×
3378
        ths->restoreFinish = false;
×
3379
      }
3380
    }
3381
  }
3382

3383
  ths->quorum = syncUtilQuorum(ths->replicaNum);
×
3384

3385
  ths->raftCfg.lastConfigIndex = pEntry->index;
×
3386
  ths->raftCfg.cfg.lastIndex = pEntry->index;
×
3387
  ths->raftCfg.cfg.changeVersion = cfg.changeVersion;
×
3388

3389
  syncNodeLogConfigInfo(ths, &cfg, "after config change");
×
3390

3391
  if ((code = syncWriteCfgFile(ths)) != 0) {
×
3392
    sError("vgId:%d, failed to create sync cfg file", ths->vgId);
×
3393
    TAOS_RETURN(code);
×
3394
  };
3395

3396
  TAOS_RETURN(code);
×
3397
}
3398

3399
int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) {
10,536,033✔
3400
  int32_t code = -1;
10,536,033✔
3401
  if (pEntry->dataLen < sizeof(SMsgHead)) {
10,536,033!
3402
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
3403
    sError("vgId:%d, cannot append an invalid client request with no msg head. type:%s, dataLen:%d", ths->vgId,
×
3404
           TMSG_INFO(pEntry->originalRpcType), pEntry->dataLen);
3405
    syncEntryDestroy(pEntry);
×
3406
    pEntry = NULL;
×
3407
    goto _out;
×
3408
  }
3409

3410
  // append to log buffer
3411
  if ((code = syncLogBufferAppend(ths->pLogBuf, ths, pEntry)) < 0) {
10,536,033✔
3412
    sError("vgId:%d, failed to enqueue sync log buffer, index:%" PRId64, ths->vgId, pEntry->index);
150!
3413
    int32_t ret = 0;
150✔
3414
    if ((ret = syncFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, terrno, false)) != 0) {
150!
3415
      sError("vgId:%d, failed to execute fsm, since %s", ths->vgId, tstrerror(ret));
×
3416
    }
3417
    syncEntryDestroy(pEntry);
×
3418
    pEntry = NULL;
×
3419
    goto _out;
×
3420
  }
3421

3422
  code = 0;
10,535,943✔
3423
_out:;
10,535,943✔
3424
  // proceed match index, with replicating on needed
3425
  SyncIndex matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, NULL, "Append");
10,535,943✔
3426

3427
  if (pEntry != NULL)
10,535,855✔
3428
    sTrace("vgId:%d, append raft entry. index:%" PRId64 ", term:%" PRId64 " pBuf: [%" PRId64 " %" PRId64 " %" PRId64
10,535,764✔
3429
           ", %" PRId64 ")",
3430
           ths->vgId, pEntry->index, pEntry->term, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
3431
           ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex);
3432

3433
  if (code == 0 && ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
10,535,857!
3434
    TAOS_CHECK_RETURN(syncNodeUpdateAssignedCommitIndex(ths, matchIndex));
×
3435

3436
    if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
×
3437
        syncLogBufferCommit(ths->pLogBuf, ths, ths->assignedCommitIndex) < 0) {
×
3438
      sError("vgId:%d, failed to commit until commitIndex:%" PRId64 "", ths->vgId, ths->commitIndex);
×
3439
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
3440
    }
3441
  }
3442

3443
  // multi replica
3444
  if (ths->replicaNum > 1) {
10,535,857✔
3445
    TAOS_RETURN(code);
167,585✔
3446
  }
3447

3448
  // single replica
3449
  SyncIndex returnIndex = syncNodeUpdateCommitIndex(ths, matchIndex);
10,368,272✔
3450
  sTrace("vgId:%d, update commit return index %" PRId64 "", ths->vgId, returnIndex);
10,368,389✔
3451

3452
  if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
20,736,987!
3453
      (code = syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex)) < 0) {
10,368,388✔
3454
    sError("vgId:%d, failed to commit until commitIndex:%" PRId64 "", ths->vgId, ths->commitIndex);
×
3455
  }
3456

3457
  TAOS_RETURN(code);
10,368,599✔
3458
}
3459

3460
bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode) {
10,523,804✔
3461
  if (pSyncNode->totalReplicaNum == 1) {
10,523,804✔
3462
    return false;
10,089,595✔
3463
  }
3464

3465
  int32_t toCount = 0;
434,209✔
3466
  int64_t tsNow = taosGetTimestampMs();
434,387✔
3467
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1,029,270✔
3468
    if (pSyncNode->peersNodeInfo[i].nodeRole == TAOS_SYNC_ROLE_LEARNER) {
594,894✔
3469
      continue;
266,906✔
3470
    }
3471
    int64_t recvTime = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
327,988✔
3472
    if (recvTime == 0 || recvTime == -1) {
327,977!
3473
      continue;
×
3474
    }
3475

3476
    if (tsNow - recvTime > tsHeartbeatTimeout) {
327,977✔
3477
      toCount++;
836✔
3478
    }
3479
  }
3480

3481
  bool b = (toCount >= pSyncNode->quorum ? true : false);
434,376✔
3482

3483
  return b;
434,376✔
3484
}
3485

3486
bool syncNodeSnapshotSending(SSyncNode* pSyncNode) {
×
3487
  if (pSyncNode == NULL) return false;
×
3488
  bool b = false;
×
3489
  for (int32_t i = 0; i < pSyncNode->totalReplicaNum; ++i) {
×
3490
    if (pSyncNode->senders[i] != NULL && pSyncNode->senders[i]->start) {
×
3491
      b = true;
×
3492
      break;
×
3493
    }
3494
  }
3495
  return b;
×
3496
}
3497

3498
bool syncNodeSnapshotRecving(SSyncNode* pSyncNode) {
×
3499
  if (pSyncNode == NULL) return false;
×
3500
  if (pSyncNode->pNewNodeReceiver == NULL) return false;
×
3501
  if (pSyncNode->pNewNodeReceiver->start) return true;
×
3502
  return false;
×
3503
}
3504

3505
static int32_t syncNodeAppendNoop(SSyncNode* ths) {
13,496✔
3506
  int32_t   code = 0;
13,496✔
3507
  SyncIndex index = syncLogBufferGetEndIndex(ths->pLogBuf);
13,496✔
3508
  SyncTerm  term = raftStoreGetTerm(ths);
13,496✔
3509

3510
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
13,496✔
3511
  if (pEntry == NULL) {
13,496!
3512
    code = TSDB_CODE_OUT_OF_MEMORY;
×
3513
    TAOS_RETURN(code);
×
3514
  }
3515

3516
  code = syncNodeAppend(ths, pEntry);
13,496✔
3517
  TAOS_RETURN(code);
13,496✔
3518
}
3519

3520
#ifdef BUILD_NO_CALL
3521
static int32_t syncNodeAppendNoopOld(SSyncNode* ths) {
3522
  int32_t ret = 0;
3523

3524
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
3525
  SyncTerm        term = raftStoreGetTerm(ths);
3526
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
3527
  if (pEntry == NULL) return -1;
3528

3529
  LRUHandle* h = NULL;
3530

3531
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
3532
    int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry, false);
3533
    if (code != 0) {
3534
      sError("append noop error");
3535
      return -1;
3536
    }
3537

3538
    syncCacheEntry(ths->pLogStore, pEntry, &h);
3539
  }
3540

3541
  if (h) {
3542
    taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
3543
  } else {
3544
    syncEntryDestroy(pEntry);
3545
  }
3546

3547
  return ret;
3548
}
3549
#endif
3550

3551
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
43,732✔
3552
  SyncHeartbeat* pMsg = pRpcMsg->pCont;
43,732✔
3553
  bool           resetElect = false;
43,732✔
3554

3555
  const STraceId* trace = &pRpcMsg->info.traceId;
43,732✔
3556
  char            tbuf[40] = {0};
43,732✔
3557
  TRACE_TO_STR(trace, tbuf);
43,732!
3558

3559
  int64_t tsMs = taosGetTimestampMs();
43,732✔
3560
  int64_t timeDiff = tsMs - pMsg->timeStamp;
43,732✔
3561
  syncLogRecvHeartbeat(ths, pMsg, timeDiff, tbuf);
43,732✔
3562

3563
  if (!syncNodeInRaftGroup(ths, &pMsg->srcId)) {
43,732✔
3564
    sWarn(
7!
3565
        "vgId:%d, drop heartbeat msg from dnode:%d, because it come from another cluster:%d, differ from current "
3566
        "cluster:%d",
3567
        ths->vgId, DID(&(pMsg->srcId)), CID(&(pMsg->srcId)), CID(&(ths->myRaftId)));
3568
    return 0;
7✔
3569
  }
3570

3571
  SRpcMsg rpcMsg = {0};
43,725✔
3572
  TAOS_CHECK_RETURN(syncBuildHeartbeatReply(&rpcMsg, ths->vgId));
43,725!
3573
  SyncTerm currentTerm = raftStoreGetTerm(ths);
43,725✔
3574

3575
  SyncHeartbeatReply* pMsgReply = rpcMsg.pCont;
43,725✔
3576
  pMsgReply->destId = pMsg->srcId;
43,725✔
3577
  pMsgReply->srcId = ths->myRaftId;
43,725✔
3578
  pMsgReply->term = currentTerm;
43,725✔
3579
  pMsgReply->privateTerm = 8864;  // magic number
43,725✔
3580
  pMsgReply->startTime = ths->startTime;
43,725✔
3581
  pMsgReply->timeStamp = tsMs;
43,725✔
3582

3583
  sGTrace("vgId:%d, process sync-heartbeat msg from dnode:%d, cluster:%d, Msgterm:%" PRId64 " currentTerm:%" PRId64,
43,725!
3584
          ths->vgId, DID(&(pMsg->srcId)), CID(&(pMsg->srcId)), pMsg->term, currentTerm);
3585

3586
  if (pMsg->term > currentTerm && ths->state == TAOS_SYNC_STATE_LEARNER) {
43,725✔
3587
    raftStoreSetTerm(ths, pMsg->term);
310✔
3588
    currentTerm = pMsg->term;
310✔
3589
  }
3590

3591
  if (pMsg->term == currentTerm &&
43,725✔
3592
      (ths->state != TAOS_SYNC_STATE_LEADER && ths->state != TAOS_SYNC_STATE_ASSIGNED_LEADER)) {
43,562!
3593
    syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs);
43,562✔
3594
    resetElect = true;
43,562✔
3595

3596
    ths->minMatchIndex = pMsg->minMatchIndex;
43,562✔
3597

3598
    if (ths->state == TAOS_SYNC_STATE_FOLLOWER || ths->state == TAOS_SYNC_STATE_LEARNER) {
43,562✔
3599
      SRpcMsg rpcMsgLocalCmd = {0};
43,557✔
3600
      TAOS_CHECK_RETURN(syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId));
43,557!
3601

3602
      SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
43,557✔
3603
      pSyncMsg->cmd =
43,557✔
3604
          (ths->state == TAOS_SYNC_STATE_LEARNER) ? SYNC_LOCAL_CMD_LEARNER_CMT : SYNC_LOCAL_CMD_FOLLOWER_CMT;
43,557✔
3605
      pSyncMsg->commitIndex = pMsg->commitIndex;
43,557✔
3606
      pSyncMsg->currentTerm = pMsg->term;
43,557✔
3607

3608
      if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
43,557!
3609
        int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
43,557✔
3610
        if (code != 0) {
43,557!
3611
          sError("vgId:%d, failed to enqueue commit msg from heartbeat since %s, code:%d", ths->vgId, terrstr(), code);
×
3612
          rpcFreeCont(rpcMsgLocalCmd.pCont);
×
3613
        } else {
3614
          sTrace("vgId:%d, enqueue commit msg from heartbeat, commit-index:%" PRId64 ", term:%" PRId64, ths->vgId,
43,557!
3615
                 pMsg->commitIndex, pMsg->term);
3616
        }
3617
      }
3618
    }
3619
  }
3620

3621
  if (pMsg->term >= currentTerm &&
43,725!
3622
      (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER)) {
43,725!
3623
    SRpcMsg rpcMsgLocalCmd = {0};
×
3624
    TAOS_CHECK_RETURN(syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId));
×
3625

3626
    SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
×
3627
    pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN;
×
3628
    pSyncMsg->currentTerm = pMsg->term;
×
3629
    pSyncMsg->commitIndex = pMsg->commitIndex;
×
3630

3631
    if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
×
3632
      int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
×
3633
      if (code != 0) {
×
3634
        sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code);
×
3635
        rpcFreeCont(rpcMsgLocalCmd.pCont);
×
3636
      } else {
3637
        sTrace("vgId:%d, sync enqueue step-down msg, new-term:%" PRId64, ths->vgId, pMsg->term);
×
3638
      }
3639
    }
3640
  }
3641

3642
  // reply
3643
  TAOS_CHECK_RETURN(syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg));
43,725!
3644

3645
  if (resetElect) syncNodeResetElectTimer(ths);
43,725✔
3646
  return 0;
43,725✔
3647
}
3648

3649
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
43,562✔
3650
  int32_t         code = 0;
43,562✔
3651
  const STraceId* trace = &pRpcMsg->info.traceId;
43,562✔
3652
  char            tbuf[40] = {0};
43,562✔
3653
  TRACE_TO_STR(trace, tbuf);
43,562!
3654

3655
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
43,563✔
3656
  SSyncLogReplMgr*    pMgr = syncNodeGetLogReplMgr(ths, &pMsg->srcId);
43,563✔
3657
  if (pMgr == NULL) {
43,563!
3658
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
3659
    if (terrno != 0) code = terrno;
×
3660
    sError("vgId:%d, failed to get log repl mgr for the peer at addr 0x016%" PRIx64 "", ths->vgId, pMsg->srcId.addr);
×
3661
    TAOS_RETURN(code);
×
3662
  }
3663

3664
  int64_t tsMs = taosGetTimestampMs();
43,563✔
3665
  syncLogRecvHeartbeatReply(ths, pMsg, tsMs - pMsg->timeStamp, tbuf);
43,563✔
3666

3667
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);
43,563✔
3668

3669
  return syncLogReplProcessHeartbeatReply(pMgr, ths, pMsg);
43,562✔
3670
}
3671

3672
#ifdef BUILD_NO_CALL
3673
int32_t syncNodeOnHeartbeatReplyOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
3674
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
3675

3676
  const STraceId* trace = &pRpcMsg->info.traceId;
3677
  char            tbuf[40] = {0};
3678
  TRACE_TO_STR(trace, tbuf);
3679

3680
  int64_t tsMs = taosGetTimestampMs();
3681
  int64_t timeDiff = tsMs - pMsg->timeStamp;
3682
  syncLogRecvHeartbeatReply(ths, pMsg, timeDiff, tbuf);
3683

3684
  // update last reply time, make decision whether the other node is alive or not
3685
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);
3686
  return 0;
3687
}
3688
#endif
3689

3690
int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
43,557✔
3691
  SyncLocalCmd* pMsg = pRpcMsg->pCont;
43,557✔
3692
  syncLogRecvLocalCmd(ths, pMsg, "");
43,557✔
3693

3694
  if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
43,557!
3695
    SRaftId id = EMPTY_RAFT_ID;
×
3696
    syncNodeStepDown(ths, pMsg->currentTerm, id);
×
3697

3698
  } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT || pMsg->cmd == SYNC_LOCAL_CMD_LEARNER_CMT) {
87,114!
3699
    if (syncLogBufferIsEmpty(ths->pLogBuf)) {
43,557!
3700
      sError("vgId:%d, sync log buffer is empty.", ths->vgId);
×
3701
      return 0;
×
3702
    }
3703
    SyncTerm matchTerm = syncLogBufferGetLastMatchTerm(ths->pLogBuf);
43,557✔
3704
    if (matchTerm < 0) {
43,557!
3705
      return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3706
    }
3707
    if (pMsg->currentTerm == matchTerm) {
43,557✔
3708
      SyncIndex returnIndex = syncNodeUpdateCommitIndex(ths, pMsg->commitIndex);
39,671✔
3709
      sTrace("vgId:%d, update commit return index %" PRId64 "", ths->vgId, returnIndex);
39,671!
3710
    }
3711
    if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE && syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) {
43,557!
3712
      sError("vgId:%d, failed to commit raft log since %s. commit index:%" PRId64 "", ths->vgId, terrstr(),
34!
3713
             ths->commitIndex);
3714
    }
3715
  } else {
3716
    sError("error local cmd");
×
3717
  }
3718

3719
  return 0;
43,557✔
3720
}
3721

3722
// TLA+ Spec
3723
// ClientRequest(i, v) ==
3724
//     /\ state[i] = Leader
3725
//     /\ LET entry == [term  |-> currentTerm[i],
3726
//                      value |-> v]
3727
//            newLog == Append(log[i], entry)
3728
//        IN  log' = [log EXCEPT ![i] = newLog]
3729
//     /\ UNCHANGED <<messages, serverVars, candidateVars,
3730
//                    leaderVars, commitIndex>>
3731
//
3732

3733
int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex) {
10,522,590✔
3734
  sNTrace(ths, "on client request");
10,522,590✔
3735

3736
  int32_t code = 0;
10,522,590✔
3737

3738
  SyncIndex       index = syncLogBufferGetEndIndex(ths->pLogBuf);
10,522,590✔
3739
  SyncTerm        term = raftStoreGetTerm(ths);
10,522,773✔
3740
  SSyncRaftEntry* pEntry = NULL;
10,522,763✔
3741
  if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
10,522,763✔
3742
    pEntry = syncEntryBuildFromClientRequest(pMsg->pCont, term, index);
263,131✔
3743
  } else {
3744
    pEntry = syncEntryBuildFromRpcMsg(pMsg, term, index);
10,259,632✔
3745
  }
3746

3747
  if (pEntry == NULL) {
10,522,696!
3748
    sError("vgId:%d, failed to process client request since %s.", ths->vgId, terrstr());
×
3749
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3750
  }
3751

3752
  // 1->2, config change is add in write thread, and will continue in sync thread
3753
  // need save message for it
3754
  if (pMsg->msgType == TDMT_SYNC_CONFIG_CHANGE) {
10,522,696!
3755
    SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
×
3756
    uint64_t  seqNum = syncRespMgrAdd(ths->pSyncRespMgr, &stub);
×
3757
    pEntry->seqNum = seqNum;
×
3758
  }
3759

3760
  if (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
10,522,696!
3761
    if (pRetIndex) {
10,522,696✔
3762
      (*pRetIndex) = index;
10,259,471✔
3763
    }
3764

3765
    if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) {
10,522,696!
3766
      int32_t code = syncNodeCheckChangeConfig(ths, pEntry);
×
3767
      if (code < 0) {
×
3768
        sError("vgId:%d, failed to check change config since %s.", ths->vgId, terrstr());
×
3769
        syncEntryDestroy(pEntry);
×
3770
        pEntry = NULL;
×
3771
        TAOS_RETURN(code);
×
3772
      }
3773

3774
      if (code > 0) {
×
3775
        SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
×
3776
        int32_t num = syncRespMgrGetAndDel(ths->pSyncRespMgr, pEntry->seqNum, &rsp.info);
×
3777
        sDebug("vgId:%d, get response stub for config change, seqNum:%" PRIu64 ", num:%d", ths->vgId, pEntry->seqNum,
×
3778
               num);
3779
        if (rsp.info.handle != NULL) {
×
3780
          tmsgSendRsp(&rsp);
×
3781
        }
3782
        syncEntryDestroy(pEntry);
×
3783
        pEntry = NULL;
×
3784
        TAOS_RETURN(code);
×
3785
      }
3786
    }
3787

3788
    code = syncNodeAppend(ths, pEntry);
10,522,696✔
3789
    return code;
10,522,231✔
3790
  } else {
3791
    syncEntryDestroy(pEntry);
×
3792
    pEntry = NULL;
×
3793
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3794
  }
3795
}
3796

3797
const char* syncStr(ESyncState state) {
1,451,267✔
3798
  switch (state) {
1,451,267!
3799
    case TAOS_SYNC_STATE_FOLLOWER:
116,669✔
3800
      return "follower";
116,669✔
3801
    case TAOS_SYNC_STATE_CANDIDATE:
4,135✔
3802
      return "candidate";
4,135✔
3803
    case TAOS_SYNC_STATE_LEADER:
1,318,611✔
3804
      return "leader";
1,318,611✔
3805
    case TAOS_SYNC_STATE_ERROR:
×
3806
      return "error";
×
3807
    case TAOS_SYNC_STATE_OFFLINE:
3,997✔
3808
      return "offline";
3,997✔
3809
    case TAOS_SYNC_STATE_LEARNER:
7,904✔
3810
      return "learner";
7,904✔
3811
    case TAOS_SYNC_STATE_ASSIGNED_LEADER:
×
3812
      return "assigned leader";
×
3813
    default:
×
3814
      return "unknown";
×
3815
  }
3816
}
3817

3818
int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) {
1,881✔
3819
  for (int32_t i = 0; i < pNewCfg->totalReplicaNum; ++i) {
2,121!
3820
    SRaftId raftId = {
2,121✔
3821
        .addr = SYNC_ADDR(&pNewCfg->nodeInfo[i]),
2,121✔
3822
        .vgId = ths->vgId,
2,121✔
3823
    };
3824

3825
    if (syncUtilSameId(&(ths->myRaftId), &raftId)) {
2,121✔
3826
      pNewCfg->myIndex = i;
1,881✔
3827
      return 0;
1,881✔
3828
    }
3829
  }
3830

3831
  return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3832
}
3833

3834
bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) {
10,523,784✔
3835
  return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1);
10,523,784!
3836
}
3837

3838
bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId) {
6,732,570✔
3839
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
10,521,946✔
3840
    if (syncUtilSameId(&((ths->replicasId)[i]), pRaftId)) {
10,521,943✔
3841
      return true;
6,732,564✔
3842
    }
3843
  }
3844
  return false;
3✔
3845
}
3846

3847
SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId) {
86,153✔
3848
  SSyncSnapshotSender* pSender = NULL;
86,153✔
3849
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
288,859✔
3850
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
202,690✔
3851
      pSender = (ths->senders)[i];
86,177✔
3852
    }
3853
  }
3854
  return pSender;
86,169✔
3855
}
3856

3857
SSyncTimer* syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId) {
28,128✔
3858
  SSyncTimer* pTimer = NULL;
28,128✔
3859
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
117,148✔
3860
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
89,015✔
3861
      pTimer = &((ths->peerHeartbeatTimerArr)[i]);
28,128✔
3862
    }
3863
  }
3864
  return pTimer;
28,133✔
3865
}
3866

3867
SPeerState* syncNodeGetPeerState(SSyncNode* ths, const SRaftId* pDestId) {
20,662✔
3868
  SPeerState* pState = NULL;
20,662✔
3869
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
45,735✔
3870
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
25,073✔
3871
      pState = &((ths->peerStates)[i]);
20,662✔
3872
    }
3873
  }
3874
  return pState;
20,662✔
3875
}
3876

3877
#ifdef BUILD_NO_CALL
3878
bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const SyncAppendEntries* pMsg) {
3879
  SPeerState* pState = syncNodeGetPeerState(ths, pDestId);
3880
  if (pState == NULL) {
3881
    sError("vgId:%d, replica maybe dropped", ths->vgId);
3882
    return false;
3883
  }
3884

3885
  SyncIndex sendIndex = pMsg->prevLogIndex + 1;
3886
  int64_t   tsNow = taosGetTimestampMs();
3887

3888
  if (pState->lastSendIndex == sendIndex && tsNow - pState->lastSendTime < SYNC_APPEND_ENTRIES_TIMEOUT_MS) {
3889
    return false;
3890
  }
3891

3892
  return true;
3893
}
3894

3895
bool syncNodeCanChange(SSyncNode* pSyncNode) {
3896
  if (pSyncNode->changing) {
3897
    sError("sync cannot change");
3898
    return false;
3899
  }
3900

3901
  if ((pSyncNode->commitIndex >= SYNC_INDEX_BEGIN)) {
3902
    SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
3903
    if (pSyncNode->commitIndex != lastIndex) {
3904
      sError("sync cannot change2");
3905
      return false;
3906
    }
3907
  }
3908

3909
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
3910
    SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->peersId)[i]);
3911
    if (pSender != NULL && pSender->start) {
3912
      sError("sync cannot change3");
3913
      return false;
3914
    }
3915
  }
3916

3917
  return true;
3918
}
3919
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc