• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3768

28 Mar 2025 10:15AM UTC coverage: 33.726% (-0.3%) from 33.993%
#3768

push

travis-ci

happyguoxy
test:alter lcov result

144891 of 592084 branches covered (24.47%)

Branch coverage included in aggregate %.

218795 of 486283 relevant lines covered (44.99%)

765715.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

48.79
/source/libs/sync/src/syncMain.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "sync.h"
18
#include "syncAppendEntries.h"
19
#include "syncAppendEntriesReply.h"
20
#include "syncCommit.h"
21
#include "syncElection.h"
22
#include "syncEnv.h"
23
#include "syncIndexMgr.h"
24
#include "syncInt.h"
25
#include "syncMessage.h"
26
#include "syncPipeline.h"
27
#include "syncRaftCfg.h"
28
#include "syncRaftLog.h"
29
#include "syncRaftStore.h"
30
#include "syncReplication.h"
31
#include "syncRequestVote.h"
32
#include "syncRequestVoteReply.h"
33
#include "syncRespMgr.h"
34
#include "syncSnapshot.h"
35
#include "syncTimeout.h"
36
#include "syncUtil.h"
37
#include "syncVoteMgr.h"
38
#include "tglobal.h"
39
#include "tmisce.h"
40
#include "tref.h"
41

42
static void    syncNodeEqPingTimer(void* param, void* tmrId);
43
static void    syncNodeEqElectTimer(void* param, void* tmrId);
44
static void    syncNodeEqHeartbeatTimer(void* param, void* tmrId);
45
static int32_t syncNodeAppendNoop(SSyncNode* ths);
46
static void    syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId);
47
static bool    syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg);
48
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId);
49
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
50
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
51
static int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg);
52
static bool    syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config);
53
static int32_t syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex);
54
static bool    syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg);
55

56
static bool    syncNodeCanChange(SSyncNode* pSyncNode);
57
static int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode);
58
static int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader);
59
static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry);
60

61
static ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
62

63
int64_t syncOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
340✔
64
  sInfo("vgId:%d, start to open sync", pSyncInfo->vgId);
340✔
65
  SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo, vnodeVersion);
340✔
66
  if (pSyncNode == NULL) {
340!
67
    sError("vgId:%d, failed to open sync node", pSyncInfo->vgId);
×
68
    return -1;
×
69
  }
70

71
  pSyncNode->rid = syncNodeAdd(pSyncNode);
340✔
72
  if (pSyncNode->rid < 0) {
340!
73
    syncNodeClose(pSyncNode);
×
74
    return -1;
×
75
  }
76

77
  pSyncNode->pingBaseLine = pSyncInfo->pingMs;
340✔
78
  pSyncNode->pingTimerMS = pSyncInfo->pingMs;
340✔
79
  pSyncNode->electBaseLine = pSyncInfo->electMs;
340✔
80
  pSyncNode->hbBaseLine = pSyncInfo->heartbeatMs;
340✔
81
  pSyncNode->heartbeatTimerMS = pSyncInfo->heartbeatMs;
340✔
82
  pSyncNode->msgcb = pSyncInfo->msgcb;
340✔
83
  sInfo("vgId:%d, sync opened", pSyncInfo->vgId);
340✔
84
  return pSyncNode->rid;
340✔
85
}
86

87
int32_t syncStart(int64_t rid) {
337✔
88
  int32_t    code = 0;
337✔
89
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
337✔
90
  if (pSyncNode == NULL) {
337!
91
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
92
    if (terrno != 0) code = terrno;
×
93
    sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid);
×
94
    TAOS_RETURN(code);
×
95
  }
96
  sInfo("vgId:%d, begin to start sync", pSyncNode->vgId);
337✔
97

98
  if ((code = syncNodeRestore(pSyncNode)) < 0) {
337!
99
    sError("vgId:%d, failed to restore sync log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
100
    goto _err;
×
101
  }
102

103
  if ((code = syncNodeStart(pSyncNode)) < 0) {
337!
104
    sError("vgId:%d, failed to start sync node since %s", pSyncNode->vgId, tstrerror(code));
×
105
    goto _err;
×
106
  }
107

108
  syncNodeRelease(pSyncNode);
337✔
109

110
  sInfo("vgId:%d, sync started", pSyncNode->vgId);
337✔
111

112
  TAOS_RETURN(code);
337✔
113

114
_err:
×
115
  syncNodeRelease(pSyncNode);
×
116
  TAOS_RETURN(code);
×
117
}
118

119
int32_t syncNodeGetConfig(int64_t rid, SSyncCfg* cfg) {
647✔
120
  int32_t    code = 0;
647✔
121
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
647✔
122

123
  if (pSyncNode == NULL) {
647!
124
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
125
    if (terrno != 0) code = terrno;
×
126
    sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid);
×
127
    TAOS_RETURN(code);
×
128
  }
129

130
  *cfg = pSyncNode->raftCfg.cfg;
647✔
131

132
  syncNodeRelease(pSyncNode);
647✔
133

134
  return 0;
647✔
135
}
136

137
void syncStop(int64_t rid) {
337✔
138
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
337✔
139
  if (pSyncNode != NULL) {
337!
140
    pSyncNode->isStart = false;
337✔
141
    syncNodeRelease(pSyncNode);
337✔
142
    syncNodeRemove(rid);
337✔
143
  }
144
}
337✔
145

146
void syncPreStop(int64_t rid) {
337✔
147
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
337✔
148
  if (pSyncNode != NULL) {
337!
149
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
337!
150
      sInfo("vgId:%d, stop snapshot receiver", pSyncNode->vgId);
×
151
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
152
    }
153
    syncNodePreClose(pSyncNode);
337✔
154
    syncNodeRelease(pSyncNode);
337✔
155
  }
156
}
337✔
157

158
void syncPostStop(int64_t rid) {
277✔
159
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
277✔
160
  if (pSyncNode != NULL) {
277!
161
    syncNodePostClose(pSyncNode);
277✔
162
    syncNodeRelease(pSyncNode);
277✔
163
  }
164
}
277✔
165

166
static bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pCfg) {
59✔
167
  if (!syncNodeInConfig(pSyncNode, pCfg)) return false;
59!
168
  return abs(pCfg->replicaNum - pSyncNode->replicaNum) <= 1;
59!
169
}
170

171
int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) {
64✔
172
  int32_t    code = 0;
64✔
173
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
64✔
174
  if (pSyncNode == NULL) {
64!
175
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
176
    if (terrno != 0) code = terrno;
×
177
    TAOS_RETURN(code);
×
178
  }
179

180
  if (pSyncNode->raftCfg.lastConfigIndex >= pNewCfg->lastIndex) {
64✔
181
    syncNodeRelease(pSyncNode);
5✔
182
    sInfo("vgId:%d, no need Reconfig, current index:%" PRId64 ", new index:%" PRId64, pSyncNode->vgId,
5!
183
          pSyncNode->raftCfg.lastConfigIndex, pNewCfg->lastIndex);
184
    return 0;
5✔
185
  }
186

187
  if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) {
59!
188
    syncNodeRelease(pSyncNode);
×
189
    code = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
×
190
    sError("vgId:%d, failed to reconfig since invalid new config", pSyncNode->vgId);
×
191
    TAOS_RETURN(code);
×
192
  }
193

194
  TAOS_CHECK_RETURN(syncNodeUpdateNewConfigIndex(pSyncNode, pNewCfg));
59!
195

196
  if (syncNodeDoConfigChange(pSyncNode, pNewCfg, pNewCfg->lastIndex) != 0) {
59!
197
    code = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
×
198
    sError("vgId:%d, failed to reconfig since do change error", pSyncNode->vgId);
×
199
    TAOS_RETURN(code);
×
200
  }
201

202
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER || pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
59!
203
    // TODO check return value
204
    TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
51!
205

206
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
816✔
207
      TAOS_CHECK_RETURN(syncHbTimerInit(pSyncNode, &pSyncNode->peerHeartbeatTimerArr[i], pSyncNode->replicasId[i]));
765!
208
    }
209

210
    TAOS_CHECK_RETURN(syncNodeStartHeartbeatTimer(pSyncNode));
51!
211
    // syncNodeReplicate(pSyncNode);
212
  }
213

214
  syncNodeRelease(pSyncNode);
59✔
215
  TAOS_RETURN(code);
59✔
216
}
217

218
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
156,855✔
219
  int32_t code = -1;
156,855✔
220
  if (!syncIsInit()) {
156,855!
221
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
222
    if (terrno != 0) code = terrno;
×
223
    TAOS_RETURN(code);
×
224
  }
225

226
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
156,857✔
227
  if (pSyncNode == NULL) {
156,854!
228
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
229
    if (terrno != 0) code = terrno;
×
230
    TAOS_RETURN(code);
×
231
  }
232

233
  switch (pMsg->msgType) {
156,854!
234
    case TDMT_SYNC_HEARTBEAT:
4,089✔
235
      code = syncNodeOnHeartbeat(pSyncNode, pMsg);
4,089✔
236
      break;
4,089✔
237
    case TDMT_SYNC_HEARTBEAT_REPLY:
4,089✔
238
      code = syncNodeOnHeartbeatReply(pSyncNode, pMsg);
4,089✔
239
      break;
4,089✔
240
    case TDMT_SYNC_TIMEOUT:
2,364✔
241
      code = syncNodeOnTimeout(pSyncNode, pMsg);
2,364✔
242
      break;
2,365✔
243
    case TDMT_SYNC_TIMEOUT_ELECTION:
30✔
244
      code = syncNodeOnTimeout(pSyncNode, pMsg);
30✔
245
      break;
30✔
246
    case TDMT_SYNC_CLIENT_REQUEST:
30,028✔
247
      code = syncNodeOnClientRequest(pSyncNode, pMsg, NULL);
30,028✔
248
      break;
30,031✔
249
    case TDMT_SYNC_REQUEST_VOTE:
50✔
250
      code = syncNodeOnRequestVote(pSyncNode, pMsg);
50✔
251
      break;
50✔
252
    case TDMT_SYNC_REQUEST_VOTE_REPLY:
50✔
253
      code = syncNodeOnRequestVoteReply(pSyncNode, pMsg);
50✔
254
      break;
50✔
255
    case TDMT_SYNC_APPEND_ENTRIES:
56,034✔
256
      code = syncNodeOnAppendEntries(pSyncNode, pMsg);
56,034✔
257
      break;
56,035✔
258
    case TDMT_SYNC_APPEND_ENTRIES_REPLY:
56,032✔
259
      code = syncNodeOnAppendEntriesReply(pSyncNode, pMsg);
56,032✔
260
      break;
56,033✔
261
    case TDMT_SYNC_SNAPSHOT_SEND:
×
262
      code = syncNodeOnSnapshot(pSyncNode, pMsg);
×
263
      break;
×
264
    case TDMT_SYNC_SNAPSHOT_RSP:
×
265
      code = syncNodeOnSnapshotRsp(pSyncNode, pMsg);
×
266
      break;
×
267
    case TDMT_SYNC_LOCAL_CMD:
4,088✔
268
      code = syncNodeOnLocalCmd(pSyncNode, pMsg);
4,088✔
269
      break;
4,088✔
270
    case TDMT_SYNC_FORCE_FOLLOWER:
×
271
      code = syncForceBecomeFollower(pSyncNode, pMsg);
×
272
      break;
×
273
    case TDMT_SYNC_SET_ASSIGNED_LEADER:
×
274
      code = syncBecomeAssignedLeader(pSyncNode, pMsg);
×
275
      break;
×
276
    default:
×
277
      code = TSDB_CODE_MSG_NOT_PROCESSED;
×
278
  }
279

280
  syncNodeRelease(pSyncNode);
156,860✔
281
  if (code != 0) {
156,860✔
282
    sDebug("vgId:%d, failed to process sync msg:%p type:%s since %s", pSyncNode->vgId, pMsg, TMSG_INFO(pMsg->msgType),
1!
283
           tstrerror(code));
284
  }
285
  TAOS_RETURN(code);
156,860✔
286
}
287

288
int32_t syncLeaderTransfer(int64_t rid) {
337✔
289
  int32_t    code = 0;
337✔
290
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
337✔
291
  if (pSyncNode == NULL) {
337!
292
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
293
    if (terrno != 0) code = terrno;
×
294
    TAOS_RETURN(code);
×
295
  }
296

297
  int32_t ret = syncNodeLeaderTransfer(pSyncNode);
337✔
298
  syncNodeRelease(pSyncNode);
337✔
299
  return ret;
337✔
300
}
301

302
int32_t syncForceBecomeFollower(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
×
303
  SRaftId id = {0};
×
304
  syncNodeBecomeFollower(ths, id, "force election");
×
305

306
  SRpcMsg rsp = {
×
307
      .code = 0,
308
      .pCont = pRpcMsg->info.rsp,
×
309
      .contLen = pRpcMsg->info.rspLen,
×
310
      .info = pRpcMsg->info,
311
  };
312
  tmsgSendRsp(&rsp);
×
313

314
  return 0;
×
315
}
316

317
int32_t syncBecomeAssignedLeader(SSyncNode* ths, SRpcMsg* pRpcMsg) {
×
318
  int32_t code = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
×
319
  void*   pHead = NULL;
×
320
  int32_t contLen = 0;
×
321

322
  SVArbSetAssignedLeaderReq req = {0};
×
323
  if (tDeserializeSVArbSetAssignedLeaderReq((char*)pRpcMsg->pCont + sizeof(SMsgHead), pRpcMsg->contLen, &req) != 0) {
×
324
    sError("vgId:%d, failed to deserialize SVArbSetAssignedLeaderReq", ths->vgId);
×
325
    code = TSDB_CODE_INVALID_MSG;
×
326
    goto _OVER;
×
327
  }
328

329
  if (ths->arbTerm > req.arbTerm) {
×
330
    sInfo("vgId:%d, skip to set assigned leader, msg with lower term, local:%" PRId64 "msg:%" PRId64, ths->vgId,
×
331
          ths->arbTerm, req.arbTerm);
332
    goto _OVER;
×
333
  }
334

335
  ths->arbTerm = TMAX(req.arbTerm, ths->arbTerm);
×
336

337
  if (!req.force) {
×
338
    if (strncmp(req.memberToken, ths->arbToken, TSDB_ARB_TOKEN_SIZE) != 0) {
×
339
      sInfo("vgId:%d, skip to set assigned leader, token mismatch, local:%s, msg:%s", ths->vgId, ths->arbToken,
×
340
            req.memberToken);
341
      goto _OVER;
×
342
    }
343
  }
344

345
  if (ths->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
×
346
    code = TSDB_CODE_SUCCESS;
×
347
    raftStoreNextTerm(ths);
×
348
    if (terrno != TSDB_CODE_SUCCESS) {
×
349
      code = terrno;
×
350
      sError("vgId:%d, failed to set next term since:%s", ths->vgId, tstrerror(code));
×
351
      goto _OVER;
×
352
    }
353
    syncNodeBecomeAssignedLeader(ths);
×
354

355
    if ((code = syncNodeAppendNoop(ths)) < 0) {
×
356
      sError("vgId:%d, assigned leader failed to append noop entry since %s", ths->vgId, tstrerror(code));
×
357
    }
358
  }
359

360
  SVArbSetAssignedLeaderRsp rsp = {0};
×
361
  rsp.arbToken = req.arbToken;
×
362
  rsp.memberToken = req.memberToken;
×
363
  rsp.vgId = ths->vgId;
×
364

365
  contLen = tSerializeSVArbSetAssignedLeaderRsp(NULL, 0, &rsp);
×
366
  if (contLen <= 0) {
×
367
    code = TSDB_CODE_OUT_OF_MEMORY;
×
368
    sError("vgId:%d, failed to serialize SVArbSetAssignedLeaderRsp", ths->vgId);
×
369
    goto _OVER;
×
370
  }
371
  pHead = rpcMallocCont(contLen);
×
372
  if (!pHead) {
×
373
    code = terrno;
×
374
    sError("vgId:%d, failed to malloc memory for SVArbSetAssignedLeaderRsp", ths->vgId);
×
375
    goto _OVER;
×
376
  }
377
  if (tSerializeSVArbSetAssignedLeaderRsp(pHead, contLen, &rsp) <= 0) {
×
378
    code = TSDB_CODE_OUT_OF_MEMORY;
×
379
    sError("vgId:%d, failed to serialize SVArbSetAssignedLeaderRsp", ths->vgId);
×
380
    rpcFreeCont(pHead);
×
381
    goto _OVER;
×
382
  }
383

384
  code = TSDB_CODE_SUCCESS;
×
385

386
_OVER:;
×
387
  SRpcMsg rspMsg = {
×
388
      .code = code,
389
      .pCont = pHead,
390
      .contLen = contLen,
391
      .info = pRpcMsg->info,
392
  };
393

394
  tmsgSendRsp(&rspMsg);
×
395

396
  tFreeSVArbSetAssignedLeaderReq(&req);
×
397
  TAOS_RETURN(code);
×
398
}
399

400
int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) {
×
401
  int32_t    code = 0;
×
402
  SSyncNode* pNode = syncNodeAcquire(rid);
×
403
  if (pNode == NULL) {
×
404
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
405
    if (terrno != 0) code = terrno;
×
406
    TAOS_RETURN(code);
×
407
  }
408

409
  SRpcMsg rpcMsg = {0, .info.notFreeAhandle = 1};
×
410
  int32_t ret = syncRespMgrGetAndDel(pNode->pSyncRespMgr, seq, &rpcMsg.info);
×
411
  rpcMsg.code = TSDB_CODE_SYN_TIMEOUT;
×
412

413
  syncNodeRelease(pNode);
×
414
  if (ret == 1) {
×
415
    sInfo("send timeout response, seq:%" PRId64 " handle:%p ahandle:%p", seq, rpcMsg.info.handle, rpcMsg.info.ahandle);
×
416
    code = rpcSendResponse(&rpcMsg);
×
417
    return code;
×
418
  } else {
419
    sError("no message handle to send timeout response, seq:%" PRId64, seq);
×
420
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
421
  }
422
}
423

424
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) {
4,190✔
425
  SyncIndex minMatchIndex = SYNC_INDEX_INVALID;
4,190✔
426

427
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
12,508✔
428
    SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
8,318✔
429
    if (minMatchIndex == SYNC_INDEX_INVALID) {
8,318✔
430
      minMatchIndex = matchIndex;
4,238✔
431
    } else if (matchIndex > 0 && matchIndex < minMatchIndex) {
4,080✔
432
      minMatchIndex = matchIndex;
16✔
433
    }
434
  }
435
  return minMatchIndex;
4,190✔
436
}
437

438
static SyncIndex syncLogRetentionIndex(SSyncNode* pSyncNode, int64_t bytes) {
540✔
439
  return pSyncNode->pLogStore->syncLogIndexRetention(pSyncNode->pLogStore, bytes);
540✔
440
}
441

442
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
750✔
443
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
750✔
444
  int32_t    code = 0;
750✔
445
  if (pSyncNode == NULL) {
750✔
446
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
1✔
447
    if (terrno != 0) code = terrno;
1!
448
    sError("sync begin snapshot error");
1!
449
    TAOS_RETURN(code);
1✔
450
  }
451

452
  SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
749✔
453
  SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
749✔
454
  bool      isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);
749✔
455

456
  if (isEmpty || !(lastApplyIndex >= beginIndex && lastApplyIndex <= endIndex)) {
749!
457
    sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", empty:%d, do not delete wal", lastApplyIndex, isEmpty);
×
458
    syncNodeRelease(pSyncNode);
×
459
    return 0;
×
460
  }
461

462
  int64_t logRetention = 0;
749✔
463

464
  if (syncNodeIsMnode(pSyncNode)) {
749✔
465
    // mnode
466
    logRetention = tsMndLogRetention;
102✔
467
  } else {
468
    // vnode
469
    if (pSyncNode->replicaNum > 1) {
647✔
470
      logRetention = SYNC_VNODE_LOG_RETENTION;
529✔
471
    }
472
  }
473

474
  if (pSyncNode->totalReplicaNum > 1) {
749✔
475
    if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER &&
540✔
476
        pSyncNode->state != TAOS_SYNC_STATE_LEARNER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
5!
477
      sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " candidate or unknown state, do not delete wal",
×
478
              lastApplyIndex);
479
      syncNodeRelease(pSyncNode);
×
480
      return 0;
×
481
    }
482
    SyncIndex retentionIndex =
540✔
483
        TMAX(pSyncNode->minMatchIndex, syncLogRetentionIndex(pSyncNode, SYNC_WAL_LOG_RETENTION_SIZE));
540✔
484
    logRetention += TMAX(0, lastApplyIndex - retentionIndex);
540✔
485
  }
486

487
_DEL_WAL:
209✔
488

489
  do {
490
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
749✔
491
    SyncIndex          snapshotVer = walGetSnapshotVer(pData->pWal);
749✔
492
    SyncIndex          walCommitVer = walGetCommittedVer(pData->pWal);
749✔
493
    SyncIndex          wallastVer = walGetLastVer(pData->pWal);
749✔
494
    if (lastApplyIndex <= walCommitVer) {
749!
495
      SyncIndex snapshottingIndex = atomic_load_64(&pSyncNode->snapshottingIndex);
749✔
496

497
      if (snapshottingIndex == SYNC_INDEX_INVALID) {
749!
498
        atomic_store_64(&pSyncNode->snapshottingIndex, lastApplyIndex);
749✔
499
        pSyncNode->snapshottingTime = taosGetTimestampMs();
749✔
500

501
        code = walBeginSnapshot(pData->pWal, lastApplyIndex, logRetention);
749✔
502
        if (code == 0) {
749!
503
          sNTrace(pSyncNode, "wal snapshot begin, index:%" PRId64 ", last apply index:%" PRId64,
749✔
504
                  pSyncNode->snapshottingIndex, lastApplyIndex);
505
        } else {
506
          sNError(pSyncNode, "wal snapshot begin error since:%s, index:%" PRId64 ", last apply index:%" PRId64,
×
507
                  terrstr(), pSyncNode->snapshottingIndex, lastApplyIndex);
508
          atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
×
509
        }
510

511
      } else {
512
        sNTrace(pSyncNode, "snapshotting for %" PRId64 ", do not delete wal for new-snapshot-index:%" PRId64,
×
513
                snapshottingIndex, lastApplyIndex);
514
      }
515
    }
516
  } while (0);
517

518
  syncNodeRelease(pSyncNode);
749✔
519
  TAOS_RETURN(code);
749✔
520
}
521

522
int32_t syncEndSnapshot(int64_t rid) {
749✔
523
  int32_t    code = 0;
749✔
524
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
749✔
525
  if (pSyncNode == NULL) {
749!
526
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
527
    if (terrno != 0) code = terrno;
×
528
    sError("sync end snapshot error");
×
529
    TAOS_RETURN(code);
×
530
  }
531

532
  if (atomic_load_64(&pSyncNode->snapshottingIndex) != SYNC_INDEX_INVALID) {
749!
533
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
749✔
534
    code = walEndSnapshot(pData->pWal);
749✔
535
    if (code != 0) {
749!
536
      sNError(pSyncNode, "wal snapshot end error since:%s", tstrerror(code));
×
537
      syncNodeRelease(pSyncNode);
×
538
      TAOS_RETURN(code);
×
539
    } else {
540
      sNTrace(pSyncNode, "wal snapshot end, index:%" PRId64, atomic_load_64(&pSyncNode->snapshottingIndex));
749✔
541
      atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
749✔
542
    }
543
  }
544

545
  syncNodeRelease(pSyncNode);
749✔
546
  TAOS_RETURN(code);
749✔
547
}
548

549
bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) {
28,587✔
550
  if (pSyncNode == NULL) {
28,587!
551
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
552
    sError("sync ready for read error");
×
553
    return false;
×
554
  }
555

556
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
28,587!
557
    terrno = TSDB_CODE_SYN_NOT_LEADER;
4,082✔
558
    return false;
4,082✔
559
  }
560

561
  if (!pSyncNode->restoreFinish) {
24,505✔
562
    terrno = TSDB_CODE_SYN_RESTORING;
527✔
563
    return false;
527✔
564
  }
565

566
  return true;
23,978✔
567
}
568

569
bool syncIsReadyForRead(int64_t rid) {
17,434✔
570
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
17,434✔
571
  if (pSyncNode == NULL) {
17,435!
572
    sError("sync ready for read error");
×
573
    return false;
×
574
  }
575

576
  bool ready = syncNodeIsReadyForRead(pSyncNode);
17,435✔
577

578
  syncNodeRelease(pSyncNode);
17,433✔
579
  return ready;
17,433✔
580
}
581

582
#ifdef BUILD_NO_CALL
583
bool syncSnapshotSending(int64_t rid) {
584
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
585
  if (pSyncNode == NULL) {
586
    return false;
587
  }
588

589
  bool b = syncNodeSnapshotSending(pSyncNode);
590
  syncNodeRelease(pSyncNode);
591
  return b;
592
}
593

594
bool syncSnapshotRecving(int64_t rid) {
595
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
596
  if (pSyncNode == NULL) {
597
    return false;
598
  }
599

600
  bool b = syncNodeSnapshotRecving(pSyncNode);
601
  syncNodeRelease(pSyncNode);
602
  return b;
603
}
604
#endif
605

606
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) {
337✔
607
  if (pSyncNode->peersNum == 0) {
337✔
608
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
249✔
609
    return 0;
249✔
610
  }
611

612
  int32_t ret = 0;
88✔
613
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && pSyncNode->replicaNum > 1) {
88✔
614
    SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0];
29✔
615
    if (pSyncNode->peersNum == 2) {
29✔
616
      SyncIndex matchIndex0 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
21✔
617
      SyncIndex matchIndex1 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[1]));
21✔
618
      if (matchIndex1 > matchIndex0) {
21✔
619
        newLeader = (pSyncNode->peersNodeInfo)[1];
1✔
620
      }
621
    }
622
    ret = syncNodeLeaderTransferTo(pSyncNode, newLeader);
29✔
623
  }
624

625
  return ret;
88✔
626
}
627

628
int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) {
29✔
629
  if (pSyncNode->replicaNum == 1) {
29!
630
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
×
631
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
632
  }
633

634
  sNTrace(pSyncNode, "begin leader transfer to %s:%u", newLeader.nodeFqdn, newLeader.nodePort);
29!
635

636
  SRpcMsg rpcMsg = {0};
29✔
637
  TAOS_CHECK_RETURN(syncBuildLeaderTransfer(&rpcMsg, pSyncNode->vgId));
29!
638

639
  SyncLeaderTransfer* pMsg = rpcMsg.pCont;
29✔
640
  pMsg->newLeaderId.addr = SYNC_ADDR(&newLeader);
29✔
641
  pMsg->newLeaderId.vgId = pSyncNode->vgId;
29✔
642
  pMsg->newNodeInfo = newLeader;
29✔
643

644
  int32_t ret = syncNodePropose(pSyncNode, &rpcMsg, false, NULL);
29✔
645
  rpcFreeCont(rpcMsg.pCont);
29✔
646
  return ret;
29✔
647
}
648

649
SSyncState syncGetState(int64_t rid) {
42,016✔
650
  SSyncState state = {.state = TAOS_SYNC_STATE_ERROR};
42,016✔
651

652
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
42,016✔
653
  if (pSyncNode != NULL) {
42,023!
654
    state.state = pSyncNode->state;
42,023✔
655
    state.roleTimeMs = pSyncNode->roleTimeMs;
42,023✔
656
    state.startTimeMs = pSyncNode->startTime;
42,023✔
657
    state.restored = pSyncNode->restoreFinish;
42,023✔
658
    if (pSyncNode->vgId != 1) {
42,023✔
659
      state.canRead = syncNodeIsReadyForRead(pSyncNode);
11,153✔
660
    } else {
661
      state.canRead = state.restored;
30,870✔
662
    }
663
    /*
664
    double progress = 0;
665
    if(pSyncNode->pLogBuf->totalIndex > 0 && pSyncNode->pLogBuf->commitIndex > 0){
666
      progress = (double)pSyncNode->pLogBuf->commitIndex/(double)pSyncNode->pLogBuf->totalIndex;
667
      state.progress = (int32_t)(progress * 100);
668
    }
669
    else{
670
      state.progress = -1;
671
    }
672
    sDebug("vgId:%d, learner progress state, commitIndex:%" PRId64 " totalIndex:%" PRId64 ", "
673
            "progress:%lf, progress:%d",
674
          pSyncNode->vgId,
675
         pSyncNode->pLogBuf->commitIndex, pSyncNode->pLogBuf->totalIndex, progress, state.progress);
676
    */
677
    state.term = raftStoreGetTerm(pSyncNode);
42,023✔
678
    syncNodeRelease(pSyncNode);
42,024✔
679
  }
680

681
  return state;
42,014✔
682
}
683

684
void syncGetCommitIndex(int64_t rid, int64_t* syncCommitIndex) {
9,372✔
685
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
9,372✔
686
  if (pSyncNode != NULL) {
9,372!
687
    *syncCommitIndex = pSyncNode->commitIndex;
9,372✔
688
    syncNodeRelease(pSyncNode);
9,372✔
689
  }
690
}
9,372✔
691

692
int32_t syncGetArbToken(int64_t rid, char* outToken) {
1,437✔
693
  int32_t    code = 0;
1,437✔
694
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
1,437✔
695
  if (pSyncNode == NULL) {
1,437!
696
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
697
    if (terrno != 0) code = terrno;
×
698
    TAOS_RETURN(code);
×
699
  }
700

701
  memset(outToken, 0, TSDB_ARB_TOKEN_SIZE);
1,437✔
702
  (void)taosThreadMutexLock(&pSyncNode->arbTokenMutex);
1,437✔
703
  tstrncpy(outToken, pSyncNode->arbToken, TSDB_ARB_TOKEN_SIZE);
1,437✔
704
  (void)taosThreadMutexUnlock(&pSyncNode->arbTokenMutex);
1,437✔
705

706
  syncNodeRelease(pSyncNode);
1,437✔
707
  TAOS_RETURN(code);
1,437✔
708
}
709

710
int32_t syncCheckSynced(int64_t rid) {
×
711
  int32_t    code = 0;
×
712
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
×
713
  if (pSyncNode == NULL) {
×
714
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
715
    if (terrno != 0) code = terrno;
×
716
    TAOS_RETURN(code);
×
717
  }
718

719
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
×
720
    code = TSDB_CODE_VND_ARB_NOT_SYNCED;
×
721
    syncNodeRelease(pSyncNode);
×
722
    TAOS_RETURN(code);
×
723
  }
724

725
  bool isSync = pSyncNode->commitIndex >= pSyncNode->assignedCommitIndex;
×
726
  code = (isSync ? TSDB_CODE_SUCCESS : TSDB_CODE_VND_ARB_NOT_SYNCED);
×
727

728
  syncNodeRelease(pSyncNode);
×
729
  TAOS_RETURN(code);
×
730
}
731

732
int32_t syncUpdateArbTerm(int64_t rid, SyncTerm arbTerm) {
×
733
  int32_t    code = 0;
×
734
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
×
735
  if (pSyncNode == NULL) {
×
736
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
737
    if (terrno != 0) code = terrno;
×
738
    TAOS_RETURN(code);
×
739
  }
740

741
  pSyncNode->arbTerm = TMAX(arbTerm, pSyncNode->arbTerm);
×
742
  syncNodeRelease(pSyncNode);
×
743
  TAOS_RETURN(code);
×
744
}
745

746
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex) {
88,805✔
747
  if (pSyncNode->raftCfg.configIndexCount < 1) {
88,805!
748
    sError("vgId:%d, failed get snapshot config index, configIndexCount:%d", pSyncNode->vgId,
×
749
           pSyncNode->raftCfg.configIndexCount);
750
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
751
    return -2;
×
752
  }
753
  SyncIndex lastIndex = (pSyncNode->raftCfg.configIndexArr)[0];
88,805✔
754

755
  for (int32_t i = 0; i < pSyncNode->raftCfg.configIndexCount; ++i) {
178,303✔
756
    if ((pSyncNode->raftCfg.configIndexArr)[i] > lastIndex &&
89,498✔
757
        (pSyncNode->raftCfg.configIndexArr)[i] <= snapshotLastApplyIndex) {
692!
758
      lastIndex = (pSyncNode->raftCfg.configIndexArr)[i];
692✔
759
    }
760
  }
761
  sTrace("vgId:%d, index:%" PRId64 ", get last snapshot config index:%" PRId64, pSyncNode->vgId, snapshotLastApplyIndex,
88,805✔
762
         lastIndex);
763

764
  return lastIndex;
88,806✔
765
}
766

767
static SRaftId syncGetRaftIdByEp(SSyncNode* pSyncNode, const SEp* pEp) {
1,380✔
768
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
2,239✔
769
    if (strcmp(pEp->fqdn, (pSyncNode->peersNodeInfo)[i].nodeFqdn) == 0 &&
1,551!
770
        pEp->port == (pSyncNode->peersNodeInfo)[i].nodePort) {
1,551✔
771
      return pSyncNode->peersId[i];
692✔
772
    }
773
  }
774
  return EMPTY_RAFT_ID;
688✔
775
}
776

777
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
692✔
778
  pEpSet->numOfEps = 0;
692✔
779

780
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
692✔
781
  if (pSyncNode == NULL) return;
692!
782

783
  int index = -1;
692✔
784

785
  int j = 0;
692✔
786
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
2,076✔
787
    if (pSyncNode->raftCfg.cfg.nodeInfo[i].nodeRole == TAOS_SYNC_ROLE_LEARNER) continue;
1,384✔
788
    SEp* pEp = &pEpSet->eps[j];
1,380✔
789
    tstrncpy(pEp->fqdn, pSyncNode->raftCfg.cfg.nodeInfo[i].nodeFqdn, TSDB_FQDN_LEN);
1,380✔
790
    pEp->port = (pSyncNode->raftCfg.cfg.nodeInfo)[i].nodePort;
1,380✔
791
    pEpSet->numOfEps++;
1,380✔
792
    sDebug("vgId:%d, sync get retry epset, index:%d %s:%d", pSyncNode->vgId, i, pEp->fqdn, pEp->port);
1,380✔
793
    SRaftId id = syncGetRaftIdByEp(pSyncNode, pEp);
1,380✔
794
    if (id.addr == pSyncNode->leaderCache.addr && id.vgId == pSyncNode->leaderCache.vgId && id.addr != 0 &&
1,380!
795
        id.vgId != 0)
48!
796
      index = j;
48✔
797
    j++;
1,380✔
798
  }
799
  if (pEpSet->numOfEps > 0) {
692!
800
    if (index != -1) {
692✔
801
      pEpSet->inUse = index;
48✔
802
    } else {
803
      if (pSyncNode->myRaftId.addr == pSyncNode->leaderCache.addr &&
644✔
804
          pSyncNode->myRaftId.vgId == pSyncNode->leaderCache.vgId) {
188!
805
        pEpSet->inUse = pSyncNode->raftCfg.cfg.myIndex;
188✔
806
      } else {
807
        pEpSet->inUse = (pSyncNode->raftCfg.cfg.myIndex + 1) % pEpSet->numOfEps;
456✔
808
      }
809
    }
810
    // pEpSet->inUse = 0;
811
  }
812
  epsetSort(pEpSet);
692✔
813

814
  sInfo("vgId:%d, sync get retry epset numOfEps:%d inUse:%d", pSyncNode->vgId, pEpSet->numOfEps, pEpSet->inUse);
692!
815
  syncNodeRelease(pSyncNode);
692✔
816
}
817

818
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
63,415✔
819
  int32_t    code = 0;
63,415✔
820
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
63,415✔
821
  if (pSyncNode == NULL) {
63,414✔
822
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
1✔
823
    if (terrno != 0) code = terrno;
1!
824
    sError("sync propose error");
1!
825
    TAOS_RETURN(code);
1✔
826
  }
827

828
  int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak, seq);
63,413✔
829
  syncNodeRelease(pSyncNode);
63,417✔
830
  return ret;
63,417✔
831
}
832

833
int32_t syncCheckMember(int64_t rid) {
×
834
  int32_t    code = 0;
×
835
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
×
836
  if (pSyncNode == NULL) {
×
837
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
838
    if (terrno != 0) code = terrno;
×
839
    sError("sync propose error");
×
840
    TAOS_RETURN(code);
×
841
  }
842

843
  if (pSyncNode->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_LEARNER) {
×
844
    syncNodeRelease(pSyncNode);
×
845
    return TSDB_CODE_SYN_WRONG_ROLE;
×
846
  }
847

848
  syncNodeRelease(pSyncNode);
×
849
  return 0;
×
850
}
851

852
int32_t syncIsCatchUp(int64_t rid) {
126✔
853
  int32_t    code = 0;
126✔
854
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
126✔
855
  if (pSyncNode == NULL) {
126!
856
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
857
    if (terrno != 0) code = terrno;
×
858
    sError("sync Node Acquire error since %d", ERRNO);
×
859
    TAOS_RETURN(code);
×
860
  }
861

862
  int32_t isCatchUp = 0;
126✔
863
  if (pSyncNode->pLogBuf->totalIndex < 0 || pSyncNode->pLogBuf->commitIndex < 0 ||
126!
864
      pSyncNode->pLogBuf->totalIndex < pSyncNode->pLogBuf->commitIndex ||
26!
865
      pSyncNode->pLogBuf->totalIndex - pSyncNode->pLogBuf->commitIndex > SYNC_LEARNER_CATCHUP) {
26✔
866
    sInfo("vgId:%d, Not catch up, wait one second, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64,
118!
867
          pSyncNode->vgId, pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex,
868
          pSyncNode->pLogBuf->matchIndex);
869
    isCatchUp = 0;
118✔
870
  } else {
871
    sInfo("vgId:%d, Catch up, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64, pSyncNode->vgId,
8!
872
          pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex, pSyncNode->pLogBuf->matchIndex);
873
    isCatchUp = 1;
8✔
874
  }
875

876
  syncNodeRelease(pSyncNode);
126✔
877
  return isCatchUp;
126✔
878
}
879

880
ESyncRole syncGetRole(int64_t rid) {
126✔
881
  int32_t    code = 0;
126✔
882
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
126✔
883
  if (pSyncNode == NULL) {
126!
884
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
885
    if (terrno != 0) code = terrno;
×
886
    sError("sync Node Acquire error since %d", ERRNO);
×
887
    TAOS_RETURN(code);
×
888
  }
889

890
  ESyncRole role = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole;
126✔
891

892
  syncNodeRelease(pSyncNode);
126✔
893
  return role;
126✔
894
}
895

896
int64_t syncGetTerm(int64_t rid) {
572✔
897
  int32_t    code = 0;
572✔
898
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
572✔
899
  if (pSyncNode == NULL) {
572!
900
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
901
    if (terrno != 0) code = terrno;
×
902
    sError("sync Node Acquire error since %d", ERRNO);
×
903
    TAOS_RETURN(code);
×
904
  }
905

906
  int64_t term = raftStoreGetTerm(pSyncNode);
572✔
907

908
  syncNodeRelease(pSyncNode);
572✔
909
  return term;
572✔
910
}
911

912
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
63,438✔
913
  int32_t code = 0;
63,438✔
914
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
63,438!
915
    code = TSDB_CODE_SYN_NOT_LEADER;
7✔
916
    sNWarn(pSyncNode, "sync propose not leader, type:%s", TMSG_INFO(pMsg->msgType));
7!
917
    TAOS_RETURN(code);
7✔
918
  }
919

920
  if (!pSyncNode->restoreFinish) {
63,431!
921
    code = TSDB_CODE_SYN_PROPOSE_NOT_READY;
×
922
    sNWarn(pSyncNode, "failed to sync propose since not ready, type:%s, last:%" PRId64 ", cmt:%" PRId64,
×
923
           TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
924
    TAOS_RETURN(code);
×
925
  }
926

927
  // heartbeat timeout
928
  if (pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER && syncNodeHeartbeatReplyTimeout(pSyncNode)) {
63,431!
929
    code = TSDB_CODE_SYN_PROPOSE_NOT_READY;
×
930
    sNError(pSyncNode, "failed to sync propose since heartbeat timeout, type:%s, last:%" PRId64 ", cmt:%" PRId64,
×
931
            TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
932
    TAOS_RETURN(code);
×
933
  }
934

935
  // optimized one replica
936
  if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) {
63,428✔
937
    SyncIndex retIndex;
938
    int32_t   code = syncNodeOnClientRequest(pSyncNode, pMsg, &retIndex);
33,375✔
939
    if (code >= 0) {
33,373!
940
      pMsg->info.conn.applyIndex = retIndex;
33,373✔
941
      pMsg->info.conn.applyTerm = raftStoreGetTerm(pSyncNode);
33,373✔
942

943
      // after raft member change, need to handle 1->2 switching point
944
      // at this point, need to switch entry handling thread
945
      if (pSyncNode->replicaNum == 1) {
33,378✔
946
        sGDebug(&pMsg->info.traceId, "vgId:%d, index:%" PRId64 ", propose optimized msg type:%s", pSyncNode->vgId,
33,377!
947
                retIndex, TMSG_INFO(pMsg->msgType));
948
        return 1;
33,376✔
949
      } else {
950
        sGDebug(&pMsg->info.traceId,
1!
951
                "vgId:%d, index:%" PRId64 ", propose optimized msg, return to normal, type:%s, handle:%p",
952
                pSyncNode->vgId, retIndex, TMSG_INFO(pMsg->msgType), pMsg->info.handle);
953
        return 0;
×
954
      }
955
    } else {
956
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
957
      sError("vgId:%d, failed to propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
×
958
             TMSG_INFO(pMsg->msgType));
959
      TAOS_RETURN(code);
×
960
    }
961
  } else {
962
    SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
30,060✔
963
    uint64_t  seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);
30,062✔
964
    SRpcMsg   rpcMsg = {.info.traceId = pMsg->info.traceId};
30,062✔
965
    int32_t   code = syncBuildClientRequest(&rpcMsg, pMsg, seqNum, isWeak, pSyncNode->vgId);
30,062✔
966
    if (code != 0) {
30,061!
967
      sError("vgId:%d, failed to propose msg while serialize since %s", pSyncNode->vgId, terrstr());
×
968
      code = syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
×
969
      TAOS_RETURN(code);
×
970
    }
971

972
    sGDebug(&pMsg->info.traceId, "vgId:%d, msg:%p, propose msg, type:%s", pSyncNode->vgId, pMsg, TMSG_INFO(pMsg->msgType));
30,061!
973
    code = (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg);
30,061✔
974
    if (code != 0) {
30,062✔
975
      sWarn("vgId:%d, failed to propose msg while enqueue since %s", pSyncNode->vgId, terrstr());
31!
976
      TAOS_CHECK_RETURN(syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum));
31!
977
    }
978

979
    if (seq != NULL) *seq = seqNum;
30,062✔
980
    TAOS_RETURN(code);
30,062✔
981
  }
982
}
983

984
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId) {
5,865✔
985
  pSyncTimer->pTimer = NULL;
5,865✔
986
  pSyncTimer->counter = 0;
5,865✔
987
  pSyncTimer->timerMS = pSyncNode->hbBaseLine;
5,865✔
988
  pSyncTimer->timerCb = syncNodeEqPeerHeartbeatTimer;
5,865✔
989
  pSyncTimer->destId = destId;
5,865✔
990
  pSyncTimer->timeStamp = taosGetTimestampMs();
5,865✔
991
  atomic_store_64(&pSyncTimer->logicClock, 0);
5,865✔
992
  return 0;
5,865✔
993
}
994

995
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
62✔
996
  int32_t code = 0;
62✔
997
  int64_t tsNow = taosGetTimestampMs();
62✔
998
  if (syncIsInit()) {
62!
999
    SSyncHbTimerData* pData = syncHbTimerDataAcquire(pSyncTimer->hbDataRid);
62✔
1000
    if (pData == NULL) {
62!
1001
      pData = taosMemoryMalloc(sizeof(SSyncHbTimerData));
62!
1002
      pData->rid = syncHbTimerDataAdd(pData);
62✔
1003
    }
1004
    pSyncTimer->hbDataRid = pData->rid;
62✔
1005
    pSyncTimer->timeStamp = tsNow;
62✔
1006

1007
    pData->syncNodeRid = pSyncNode->rid;
62✔
1008
    pData->pTimer = pSyncTimer;
62✔
1009
    pData->destId = pSyncTimer->destId;
62✔
1010
    pData->logicClock = pSyncTimer->logicClock;
62✔
1011
    pData->execTime = tsNow + pSyncTimer->timerMS;
62✔
1012

1013
    sTrace("vgId:%d, start hb timer, rid:%" PRId64 " addr:0x%" PRIx64 " at %d", pSyncNode->vgId, pData->rid,
62!
1014
           pData->destId.addr, pSyncTimer->timerMS);
1015

1016
    bool stopped = taosTmrResetPriority(pSyncTimer->timerCb, pSyncTimer->timerMS, (void*)(pData->rid),
62✔
1017
                                        syncEnv()->pTimerManager, &pSyncTimer->pTimer, 2);
62✔
1018
    if (stopped) {
62!
1019
      sError("vgId:%d, failed to reset hb timer success", pSyncNode->vgId);
×
1020
      return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1021
    }
1022
  } else {
1023
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1024
    sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId);
×
1025
  }
1026
  return code;
62✔
1027
}
1028

1029
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
527✔
1030
  int32_t ret = 0;
527✔
1031
  (void)atomic_add_fetch_64(&pSyncTimer->logicClock, 1);
527✔
1032
  bool stop = taosTmrStop(pSyncTimer->pTimer);
527✔
1033
  sDebug("vgId:%d, stop hb timer stop:%d", pSyncNode->vgId, stop);
527!
1034
  pSyncTimer->pTimer = NULL;
527✔
1035
  syncHbTimerDataRemove(pSyncTimer->hbDataRid);
527✔
1036
  pSyncTimer->hbDataRid = -1;
527✔
1037
  return ret;
527✔
1038
}
1039

1040
int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) {
340✔
1041
  int32_t code = 0;
340✔
1042
  if (pNode->pLogStore == NULL) {
340!
1043
    sError("vgId:%d, log store not created", pNode->vgId);
×
1044
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1045
  }
1046
  if (pNode->pFsm == NULL) {
340!
1047
    sError("vgId:%d, pFsm not registered", pNode->vgId);
×
1048
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1049
  }
1050
  if (pNode->pFsm->FpGetSnapshotInfo == NULL) {
340!
1051
    sError("vgId:%d, FpGetSnapshotInfo not registered", pNode->vgId);
×
1052
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1053
  }
1054
  SSnapshot snapshot = {0};
340✔
1055
  // TODO check return value
1056
  (void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
340✔
1057

1058
  SyncIndex commitIndex = snapshot.lastApplyIndex;
340✔
1059
  SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
340✔
1060
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
340✔
1061
  if ((lastVer < commitIndex || firstVer > commitIndex + 1) || pNode->fsmState == SYNC_FSM_STATE_INCOMPLETE) {
340!
1062
    if ((code = pNode->pLogStore->syncLogRestoreFromSnapshot(pNode->pLogStore, commitIndex)) != 0) {
1!
1063
      sError("vgId:%d, failed to restore log store from snapshot since %s. lastVer:%" PRId64 ", snapshotVer:%" PRId64,
×
1064
             pNode->vgId, terrstr(), lastVer, commitIndex);
1065
      TAOS_RETURN(code);
×
1066
    }
1067
  }
1068
  TAOS_RETURN(code);
340✔
1069
}
1070

1071
// open/close --------------
1072
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
340✔
1073
  int32_t    code = 0;
340✔
1074
  SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode));
340!
1075
  if (pSyncNode == NULL) {
340!
1076
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1077
    goto _error;
×
1078
  }
1079

1080
  if (!taosDirExist((char*)(pSyncInfo->path))) {
340✔
1081
    if (taosMkDir(pSyncInfo->path) != 0) {
260!
1082
      terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
1083
      sError("vgId:%d, failed to create dir:%s since %s", pSyncInfo->vgId, pSyncInfo->path, terrstr());
×
1084
      goto _error;
×
1085
    }
1086
  }
1087

1088
  memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
340✔
1089
  snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s%sraft_store.json", pSyncInfo->path,
340✔
1090
           TD_DIRSEP);
1091
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
340✔
1092

1093
  if (!taosCheckExistFile(pSyncNode->configPath)) {
340✔
1094
    // create a new raft config file
1095
    sInfo("vgId:%d, create a new raft config file", pSyncInfo->vgId);
260✔
1096
    pSyncNode->vgId = pSyncInfo->vgId;
260✔
1097
    pSyncNode->raftCfg.isStandBy = pSyncInfo->isStandBy;
260✔
1098
    pSyncNode->raftCfg.snapshotStrategy = pSyncInfo->snapshotStrategy;
260✔
1099
    pSyncNode->raftCfg.lastConfigIndex = pSyncInfo->syncCfg.lastIndex;
260✔
1100
    pSyncNode->raftCfg.batchSize = pSyncInfo->batchSize;
260✔
1101
    pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
260✔
1102
    pSyncNode->raftCfg.configIndexCount = 1;
260✔
1103
    pSyncNode->raftCfg.configIndexArr[0] = -1;
260✔
1104

1105
    if ((code = syncWriteCfgFile(pSyncNode)) != 0) {
260!
1106
      terrno = code;
×
1107
      sError("vgId:%d, failed to create sync cfg file", pSyncNode->vgId);
×
1108
      goto _error;
×
1109
    }
1110
  } else {
1111
    // update syncCfg by raft_config.json
1112
    if ((code = syncReadCfgFile(pSyncNode)) != 0) {
80!
1113
      terrno = code;
×
1114
      sError("vgId:%d, failed to read sync cfg file", pSyncNode->vgId);
×
1115
      goto _error;
×
1116
    }
1117

1118
    if (vnodeVersion > pSyncNode->raftCfg.cfg.changeVersion) {
80✔
1119
      if (pSyncInfo->syncCfg.totalReplicaNum > 0 && syncIsConfigChanged(&pSyncNode->raftCfg.cfg, &pSyncInfo->syncCfg)) {
47!
1120
        sInfo("vgId:%d, use sync config from input options and write to cfg file", pSyncNode->vgId);
36!
1121
        pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
36✔
1122
        if ((code = syncWriteCfgFile(pSyncNode)) != 0) {
36!
1123
          terrno = code;
×
1124
          sError("vgId:%d, failed to write sync cfg file", pSyncNode->vgId);
×
1125
          goto _error;
×
1126
        }
1127
      } else {
1128
        sInfo("vgId:%d, use sync config from sync cfg file", pSyncNode->vgId);
11!
1129
        pSyncInfo->syncCfg = pSyncNode->raftCfg.cfg;
11✔
1130
      }
1131
    } else {
1132
      sInfo("vgId:%d, skip save sync cfg file since request ver:%d <= file ver:%d", pSyncNode->vgId, vnodeVersion,
33!
1133
            pSyncInfo->syncCfg.changeVersion);
1134
    }
1135
  }
1136

1137
  // init by SSyncInfo
1138
  pSyncNode->vgId = pSyncInfo->vgId;
340✔
1139
  SSyncCfg* pCfg = &pSyncNode->raftCfg.cfg;
340✔
1140
  bool      updated = false;
340✔
1141
  sInfo("vgId:%d, start to open sync node, totalReplicaNum:%d replicaNum:%d selfIndex:%d", pSyncNode->vgId,
340✔
1142
        pCfg->totalReplicaNum, pCfg->replicaNum, pCfg->myIndex);
1143
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
825✔
1144
    SNodeInfo* pNode = &pCfg->nodeInfo[i];
485✔
1145
    if (tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort)) {
485!
1146
      updated = true;
×
1147
    }
1148
    sInfo("vgId:%d, index:%d ep:%s:%u dnode:%d cluster:%" PRId64, pSyncNode->vgId, i, pNode->nodeFqdn, pNode->nodePort,
485✔
1149
          pNode->nodeId, pNode->clusterId);
1150
  }
1151

1152
  if (vnodeVersion > pSyncInfo->syncCfg.changeVersion) {
340✔
1153
    if (updated) {
63!
1154
      sInfo("vgId:%d, save config info since dnode info changed", pSyncNode->vgId);
×
1155
      if ((code = syncWriteCfgFile(pSyncNode)) != 0) {
×
1156
        terrno = code;
×
1157
        sError("vgId:%d, failed to write sync cfg file on dnode info updated", pSyncNode->vgId);
×
1158
        goto _error;
×
1159
      }
1160
    }
1161
  }
1162

1163
  pSyncNode->pWal = pSyncInfo->pWal;
340✔
1164
  pSyncNode->msgcb = pSyncInfo->msgcb;
340✔
1165
  pSyncNode->syncSendMSg = pSyncInfo->syncSendMSg;
340✔
1166
  pSyncNode->syncEqMsg = pSyncInfo->syncEqMsg;
340✔
1167
  pSyncNode->syncEqCtrlMsg = pSyncInfo->syncEqCtrlMsg;
340✔
1168

1169
  // create raft log ring buffer
1170
  code = syncLogBufferCreate(&pSyncNode->pLogBuf);
340✔
1171
  if (pSyncNode->pLogBuf == NULL) {
340!
1172
    sError("failed to init sync log buffer since %s. vgId:%d", tstrerror(code), pSyncNode->vgId);
×
1173
    goto _error;
×
1174
  }
1175

1176
  // init replicaNum, replicasId
1177
  pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
340✔
1178
  pSyncNode->totalReplicaNum = pSyncNode->raftCfg.cfg.totalReplicaNum;
340✔
1179
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
825✔
1180
    if (syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]) ==
485!
1181
        false) {
1182
      sError("vgId:%d, failed to determine raft member id, replica:%d", pSyncNode->vgId, i);
×
1183
      goto _error;
×
1184
    }
1185
  }
1186

1187
  // init internal
1188
  pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
340✔
1189
  pSyncNode->myRaftId = pSyncNode->replicasId[pSyncNode->raftCfg.cfg.myIndex];
340✔
1190

1191
  // init peersNum, peers, peersId
1192
  pSyncNode->peersNum = pSyncNode->raftCfg.cfg.totalReplicaNum - 1;
340✔
1193
  int32_t j = 0;
340✔
1194
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
825✔
1195
    if (i != pSyncNode->raftCfg.cfg.myIndex) {
485✔
1196
      pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
146✔
1197
      pSyncNode->peersId[j] = pSyncNode->replicasId[i];
146✔
1198
      syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
146✔
1199
      j++;
146✔
1200
    }
1201
  }
1202

1203
  pSyncNode->arbTerm = -1;
340✔
1204
  (void)taosThreadMutexInit(&pSyncNode->arbTokenMutex, NULL);
340✔
1205
  syncUtilGenerateArbToken(pSyncNode->myNodeInfo.nodeId, pSyncInfo->vgId, pSyncNode->arbToken);
340✔
1206
  sInfo("vgId:%d, generate arb token:%s", pSyncNode->vgId, pSyncNode->arbToken);
340✔
1207

1208
  // init raft algorithm
1209
  pSyncNode->pFsm = pSyncInfo->pFsm;
340✔
1210
  pSyncInfo->pFsm = NULL;
340✔
1211
  pSyncNode->quorum = syncUtilQuorum(pSyncNode->raftCfg.cfg.replicaNum);
340✔
1212
  pSyncNode->leaderCache = EMPTY_RAFT_ID;
340✔
1213

1214
  // init life cycle outside
1215

1216
  // TLA+ Spec
1217
  // InitHistoryVars == /\ elections = {}
1218
  //                    /\ allLogs   = {}
1219
  //                    /\ voterLog  = [i \in Server |-> [j \in {} |-> <<>>]]
1220
  // InitServerVars == /\ currentTerm = [i \in Server |-> 1]
1221
  //                   /\ state       = [i \in Server |-> Follower]
1222
  //                   /\ votedFor    = [i \in Server |-> Nil]
1223
  // InitCandidateVars == /\ votesResponded = [i \in Server |-> {}]
1224
  //                      /\ votesGranted   = [i \in Server |-> {}]
1225
  // \* The values nextIndex[i][i] and matchIndex[i][i] are never read, since the
1226
  // \* leader does not send itself messages. It's still easier to include these
1227
  // \* in the functions.
1228
  // InitLeaderVars == /\ nextIndex  = [i \in Server |-> [j \in Server |-> 1]]
1229
  //                   /\ matchIndex = [i \in Server |-> [j \in Server |-> 0]]
1230
  // InitLogVars == /\ log          = [i \in Server |-> << >>]
1231
  //                /\ commitIndex  = [i \in Server |-> 0]
1232
  // Init == /\ messages = [m \in {} |-> 0]
1233
  //         /\ InitHistoryVars
1234
  //         /\ InitServerVars
1235
  //         /\ InitCandidateVars
1236
  //         /\ InitLeaderVars
1237
  //         /\ InitLogVars
1238
  //
1239

1240
  // init TLA+ server vars
1241
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
340✔
1242
  pSyncNode->roleTimeMs = taosGetTimestampMs();
340✔
1243
  if ((code = raftStoreOpen(pSyncNode)) != 0) {
340!
1244
    terrno = code;
×
1245
    sError("vgId:%d, failed to open raft store at path %s", pSyncNode->vgId, pSyncNode->raftStorePath);
×
1246
    goto _error;
×
1247
  }
1248

1249
  // init TLA+ candidate vars
1250
  pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode);
340✔
1251
  if (pSyncNode->pVotesGranted == NULL) {
340!
1252
    sError("vgId:%d, failed to create VotesGranted", pSyncNode->vgId);
×
1253
    goto _error;
×
1254
  }
1255
  pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode);
340✔
1256
  if (pSyncNode->pVotesRespond == NULL) {
340!
1257
    sError("vgId:%d, failed to create VotesRespond", pSyncNode->vgId);
×
1258
    goto _error;
×
1259
  }
1260

1261
  // init TLA+ leader vars
1262
  pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode);
340✔
1263
  if (pSyncNode->pNextIndex == NULL) {
340!
1264
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
×
1265
    goto _error;
×
1266
  }
1267
  pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode);
340✔
1268
  if (pSyncNode->pMatchIndex == NULL) {
340!
1269
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
×
1270
    goto _error;
×
1271
  }
1272

1273
  // init TLA+ log vars
1274
  pSyncNode->pLogStore = logStoreCreate(pSyncNode);
340✔
1275
  if (pSyncNode->pLogStore == NULL) {
340!
1276
    sError("vgId:%d, failed to create SyncLogStore", pSyncNode->vgId);
×
1277
    goto _error;
×
1278
  }
1279

1280
  SyncIndex commitIndex = SYNC_INDEX_INVALID;
340✔
1281
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
340!
1282
    SSnapshot snapshot = {0};
340✔
1283
    // TODO check return value
1284
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
340✔
1285
    if (snapshot.lastApplyIndex > commitIndex) {
340✔
1286
      commitIndex = snapshot.lastApplyIndex;
49✔
1287
      sNTrace(pSyncNode, "reset commit index by snapshot");
49!
1288
    }
1289
    pSyncNode->fsmState = snapshot.state;
340✔
1290
    if (pSyncNode->fsmState == SYNC_FSM_STATE_INCOMPLETE) {
340!
1291
      sError("vgId:%d, fsm state is incomplete.", pSyncNode->vgId);
×
1292
      if (pSyncNode->replicaNum == 1) {
×
1293
        terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1294
        goto _error;
×
1295
      }
1296
    }
1297
  }
1298
  pSyncNode->commitIndex = commitIndex;
340✔
1299
  sInfo("vgId:%d, sync node commitIndex initialized as %" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
340✔
1300

1301
  // restore log store on need
1302
  if ((code = syncNodeLogStoreRestoreOnNeed(pSyncNode)) < 0) {
340!
1303
    terrno = code;
×
1304
    sError("vgId:%d, failed to restore log store since %s.", pSyncNode->vgId, terrstr());
×
1305
    goto _error;
×
1306
  }
1307

1308
  // timer ms init
1309
  pSyncNode->pingBaseLine = PING_TIMER_MS;
340✔
1310
  pSyncNode->electBaseLine = tsElectInterval;
340✔
1311
  pSyncNode->hbBaseLine = tsHeartbeatInterval;
340✔
1312

1313
  // init ping timer
1314
  pSyncNode->pPingTimer = NULL;
340✔
1315
  pSyncNode->pingTimerMS = pSyncNode->pingBaseLine;
340✔
1316
  atomic_store_64(&pSyncNode->pingTimerLogicClock, 0);
340✔
1317
  atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0);
340✔
1318
  pSyncNode->FpPingTimerCB = syncNodeEqPingTimer;
340✔
1319
  pSyncNode->pingTimerCounter = 0;
340✔
1320

1321
  // init elect timer
1322
  pSyncNode->pElectTimer = NULL;
340✔
1323
  pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
340✔
1324
  atomic_store_64(&pSyncNode->electTimerLogicClock, 0);
340✔
1325
  pSyncNode->FpElectTimerCB = syncNodeEqElectTimer;
340✔
1326
  pSyncNode->electTimerCounter = 0;
340✔
1327

1328
  // init heartbeat timer
1329
  pSyncNode->pHeartbeatTimer = NULL;
340✔
1330
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
340✔
1331
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0);
340✔
1332
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0);
340✔
1333
#ifdef BUILD_NO_CALL
1334
  pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer;
1335
#endif
1336
  pSyncNode->heartbeatTimerCounter = 0;
340✔
1337

1338
  // init peer heartbeat timer
1339
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
5,440✔
1340
    if ((code = syncHbTimerInit(pSyncNode, &(pSyncNode->peerHeartbeatTimerArr[i]), (pSyncNode->replicasId)[i])) != 0) {
5,100!
1341
      terrno = code;
×
1342
      goto _error;
×
1343
    }
1344
  }
1345

1346
  // tools
1347
  if ((code = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS, &pSyncNode->pSyncRespMgr)) != 0) {
340!
1348
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
×
1349
    goto _error;
×
1350
  }
1351
  if (pSyncNode->pSyncRespMgr == NULL) {
340!
1352
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
×
1353
    goto _error;
×
1354
  }
1355

1356
  // restore state
1357
  pSyncNode->restoreFinish = false;
340✔
1358

1359
  // snapshot senders
1360
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
5,440✔
1361
    SSyncSnapshotSender* pSender = NULL;
5,100✔
1362
    code = snapshotSenderCreate(pSyncNode, i, &pSender);
5,100✔
1363
    if (pSender == NULL) return NULL;
5,100!
1364

1365
    pSyncNode->senders[i] = pSender;
5,100✔
1366
    sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender);
5,100✔
1367
  }
1368

1369
  // snapshot receivers
1370
  code = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID, &pSyncNode->pNewNodeReceiver);
340✔
1371
  if (pSyncNode->pNewNodeReceiver == NULL) return NULL;
340!
1372
  sRDebug(pSyncNode->pNewNodeReceiver, "snapshot receiver create while open sync node, data:%p",
340✔
1373
          pSyncNode->pNewNodeReceiver);
1374

1375
  // is config changing
1376
  pSyncNode->changing = false;
340✔
1377

1378
  // replication mgr
1379
  if ((code = syncNodeLogReplInit(pSyncNode)) < 0) {
340!
1380
    terrno = code;
×
1381
    sError("vgId:%d, failed to init repl mgr since %s.", pSyncNode->vgId, terrstr());
×
1382
    goto _error;
×
1383
  }
1384

1385
  // peer state
1386
  if ((code = syncNodePeerStateInit(pSyncNode)) < 0) {
340!
1387
    terrno = code;
×
1388
    sError("vgId:%d, failed to init peer stat since %s.", pSyncNode->vgId, terrstr());
×
1389
    goto _error;
×
1390
  }
1391

1392
  //
1393
  // min match index
1394
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
340✔
1395

1396
  // start in syncNodeStart
1397
  // start raft
1398

1399
  int64_t timeNow = taosGetTimestampMs();
340✔
1400
  pSyncNode->startTime = timeNow;
340✔
1401
  pSyncNode->lastReplicateTime = timeNow;
340✔
1402

1403
  // snapshotting
1404
  atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
340✔
1405

1406
  // init log buffer
1407
  if ((code = syncLogBufferInit(pSyncNode->pLogBuf, pSyncNode)) < 0) {
340!
1408
    terrno = code;
×
1409
    sError("vgId:%d, failed to init sync log buffer since %s", pSyncNode->vgId, terrstr());
×
1410
    goto _error;
×
1411
  }
1412

1413
  pSyncNode->isStart = true;
340✔
1414
  pSyncNode->electNum = 0;
340✔
1415
  pSyncNode->becomeLeaderNum = 0;
340✔
1416
  pSyncNode->becomeAssignedLeaderNum = 0;
340✔
1417
  pSyncNode->configChangeNum = 0;
340✔
1418
  pSyncNode->hbSlowNum = 0;
340✔
1419
  pSyncNode->hbrSlowNum = 0;
340✔
1420
  pSyncNode->tmrRoutineNum = 0;
340✔
1421

1422
  sNInfo(pSyncNode, "sync node opened, node:%p electInterval:%d heartbeatInterval:%d heartbeatTimeout:%d", pSyncNode,
340✔
1423
         tsElectInterval, tsHeartbeatInterval, tsHeartbeatTimeout);
1424
  return pSyncNode;
340✔
1425

1426
_error:
×
1427
  if (pSyncInfo->pFsm) {
×
1428
    taosMemoryFree(pSyncInfo->pFsm);
×
1429
    pSyncInfo->pFsm = NULL;
×
1430
  }
1431
  syncNodeClose(pSyncNode);
×
1432
  pSyncNode = NULL;
×
1433
  return NULL;
×
1434
}
1435

1436
#ifdef BUILD_NO_CALL
1437
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
1438
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
1439
    SSnapshot snapshot = {0};
1440
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
1441
    if (snapshot.lastApplyIndex > pSyncNode->commitIndex) {
1442
      pSyncNode->commitIndex = snapshot.lastApplyIndex;
1443
    }
1444
  }
1445
}
1446
#endif
1447

1448
int32_t syncNodeRestore(SSyncNode* pSyncNode) {
336✔
1449
  int32_t code = 0;
336✔
1450
  if (pSyncNode->pLogStore == NULL) {
336!
1451
    sError("vgId:%d, log store not created", pSyncNode->vgId);
×
1452
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1453
  }
1454
  if (pSyncNode->pLogBuf == NULL) {
336!
1455
    sError("vgId:%d, ring log buffer not created", pSyncNode->vgId);
×
1456
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1457
  }
1458

1459
  (void)taosThreadMutexLock(&pSyncNode->pLogBuf->mutex);
336✔
1460
  SyncIndex lastVer = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
337✔
1461
  SyncIndex commitIndex = pSyncNode->pLogStore->syncLogCommitIndex(pSyncNode->pLogStore);
337✔
1462
  SyncIndex endIndex = pSyncNode->pLogBuf->endIndex;
337✔
1463
  (void)taosThreadMutexUnlock(&pSyncNode->pLogBuf->mutex);
337✔
1464

1465
  if (lastVer != -1 && endIndex != lastVer + 1) {
337!
1466
    code = TSDB_CODE_WAL_LOG_INCOMPLETE;
×
1467
    sWarn("vgId:%d, failed to restore sync node since %s. expected lastLogIndex:%" PRId64 ", lastVer:%" PRId64,
×
1468
          pSyncNode->vgId, terrstr(), endIndex - 1, lastVer);
1469
    // TAOS_RETURN(code);
1470
  }
1471

1472
  // if (endIndex != lastVer + 1) return TSDB_CODE_SYN_INTERNAL_ERROR;
1473
  pSyncNode->commitIndex = TMAX(pSyncNode->commitIndex, commitIndex);
336✔
1474
  sInfo("vgId:%d, restore began, and keep syncing until commitIndex:%" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
336✔
1475

1476
  if (pSyncNode->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
674!
1477
      (code = syncLogBufferCommit(pSyncNode->pLogBuf, pSyncNode, pSyncNode->commitIndex, NULL, "restore")) < 0) {
337✔
1478
    TAOS_RETURN(code);
×
1479
  }
1480

1481
  TAOS_RETURN(code);
337✔
1482
}
1483

1484
int32_t syncNodeStart(SSyncNode* pSyncNode) {
337✔
1485
  // start raft
1486
  sInfo("vgId:%d, begin to start sync node", pSyncNode->vgId);
337✔
1487
  if (pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole == TAOS_SYNC_ROLE_LEARNER) {
337✔
1488
    syncNodeBecomeLearner(pSyncNode, "first start");
8✔
1489
  } else {
1490
    if (pSyncNode->replicaNum == 1) {
329✔
1491
      raftStoreNextTerm(pSyncNode);
255✔
1492
      syncNodeBecomeLeader(pSyncNode, "one replica start");
255✔
1493

1494
      // Raft 3.6.2 Committing entries from previous terms
1495
      TAOS_CHECK_RETURN(syncNodeAppendNoop(pSyncNode));
255!
1496
    } else {
1497
      SRaftId id = {0};
74✔
1498
      syncNodeBecomeFollower(pSyncNode, id, "first start");
74✔
1499
    }
1500
  }
1501

1502
  int32_t ret = 0;
337✔
1503
  ret = syncNodeStartPingTimer(pSyncNode);
337✔
1504
  if (ret != 0) {
337!
1505
    sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, tstrerror(ret));
×
1506
  }
1507
  sInfo("vgId:%d, sync node started", pSyncNode->vgId);
337✔
1508
  return ret;
337✔
1509
}
1510

1511
#ifdef BUILD_NO_CALL
1512
int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) {
1513
  // state change
1514
  int32_t code = 0;
1515
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
1516
  pSyncNode->roleTimeMs = taosGetTimestampMs();
1517
  // TODO check return value
1518
  TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
1519

1520
  // reset elect timer, long enough
1521
  int32_t electMS = TIMER_MAX_MS;
1522
  code = syncNodeRestartElectTimer(pSyncNode, electMS);
1523
  if (code < 0) {
1524
    sError("vgId:%d, failed to restart elect timer since %s", pSyncNode->vgId, terrstr());
1525
    return -1;
1526
  }
1527

1528
  code = syncNodeStartPingTimer(pSyncNode);
1529
  if (code < 0) {
1530
    sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, terrstr());
1531
    return -1;
1532
  }
1533
  return code;
1534
}
1535
#endif
1536

1537
void syncNodePreClose(SSyncNode* pSyncNode) {
337✔
1538
  int32_t code = 0;
337✔
1539
  if (pSyncNode == NULL) {
337!
1540
    sError("failed to pre close sync node since sync node is null");
×
1541
    return;
×
1542
  }
1543
  if (pSyncNode->pFsm == NULL) {
337!
1544
    sError("failed to pre close sync node since fsm is null");
×
1545
    return;
×
1546
  }
1547
  if (pSyncNode->pFsm->FpApplyQueueItems == NULL) {
337!
1548
    sError("failed to pre close sync node since FpApplyQueueItems is null");
×
1549
    return;
×
1550
  }
1551

1552
  // stop elect timer
1553
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
337!
1554
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
1555
    return;
×
1556
  }
1557

1558
  // stop heartbeat timer
1559
  if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
337!
1560
    sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
1561
    return;
×
1562
  }
1563

1564
  // stop ping timer
1565
  if ((code = syncNodeStopPingTimer(pSyncNode)) != 0) {
337!
1566
    sError("vgId:%d, failed to stop ping timer since %s", pSyncNode->vgId, tstrerror(code));
×
1567
    return;
×
1568
  }
1569

1570
  // clean rsp
1571
  syncRespCleanRsp(pSyncNode->pSyncRespMgr);
337✔
1572
}
1573

1574
void syncNodePostClose(SSyncNode* pSyncNode) {
277✔
1575
  if (pSyncNode->pNewNodeReceiver != NULL) {
277!
1576
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
277!
1577
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
1578
    }
1579

1580
    sDebug("vgId:%d, snapshot receiver destroy while preclose sync node, data:%p", pSyncNode->vgId,
277✔
1581
           pSyncNode->pNewNodeReceiver);
1582
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
277✔
1583
    pSyncNode->pNewNodeReceiver = NULL;
277✔
1584
  }
1585
}
277✔
1586

1587
void syncHbTimerDataFree(SSyncHbTimerData* pData) { taosMemoryFree(pData); }
62!
1588

1589
void syncNodeClose(SSyncNode* pSyncNode) {
337✔
1590
  int32_t code = 0;
337✔
1591
  if (pSyncNode == NULL) return;
337!
1592
  sNInfo(pSyncNode, "sync close, node:%p", pSyncNode);
337✔
1593

1594
  syncRespCleanRsp(pSyncNode->pSyncRespMgr);
337✔
1595

1596
  if ((code = syncNodeStopPingTimer(pSyncNode)) != 0) {
337!
1597
    sError("vgId:%d, failed to stop ping timer since %s", pSyncNode->vgId, tstrerror(code));
×
1598
    return;
×
1599
  }
1600
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
337!
1601
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
1602
    return;
×
1603
  }
1604
  if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
337!
1605
    sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
1606
    return;
×
1607
  }
1608
  syncNodeLogReplDestroy(pSyncNode);
337✔
1609

1610
  syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
337✔
1611
  pSyncNode->pSyncRespMgr = NULL;
337✔
1612
  voteGrantedDestroy(pSyncNode->pVotesGranted);
337✔
1613
  pSyncNode->pVotesGranted = NULL;
337✔
1614
  votesRespondDestory(pSyncNode->pVotesRespond);
337✔
1615
  pSyncNode->pVotesRespond = NULL;
337✔
1616
  syncIndexMgrDestroy(pSyncNode->pNextIndex);
337✔
1617
  pSyncNode->pNextIndex = NULL;
337✔
1618
  syncIndexMgrDestroy(pSyncNode->pMatchIndex);
337✔
1619
  pSyncNode->pMatchIndex = NULL;
337✔
1620
  logStoreDestory(pSyncNode->pLogStore);
337✔
1621
  pSyncNode->pLogStore = NULL;
337✔
1622
  syncLogBufferDestroy(pSyncNode->pLogBuf);
337✔
1623
  pSyncNode->pLogBuf = NULL;
337✔
1624

1625
  (void)taosThreadMutexDestroy(&pSyncNode->arbTokenMutex);
337✔
1626

1627
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
5,391✔
1628
    if (pSyncNode->senders[i] != NULL) {
5,054!
1629
      sDebug("vgId:%d, snapshot sender destroy while close, data:%p", pSyncNode->vgId, pSyncNode->senders[i]);
5,054✔
1630

1631
      if (snapshotSenderIsStart(pSyncNode->senders[i])) {
5,054!
1632
        snapshotSenderStop(pSyncNode->senders[i], false);
×
1633
      }
1634

1635
      snapshotSenderDestroy(pSyncNode->senders[i]);
5,054✔
1636
      pSyncNode->senders[i] = NULL;
5,054✔
1637
    }
1638
  }
1639

1640
  if (pSyncNode->pNewNodeReceiver != NULL) {
337✔
1641
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
60!
1642
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
1643
    }
1644

1645
    sDebug("vgId:%d, snapshot receiver destroy while close, data:%p", pSyncNode->vgId, pSyncNode->pNewNodeReceiver);
60✔
1646
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
60✔
1647
    pSyncNode->pNewNodeReceiver = NULL;
60✔
1648
  }
1649

1650
  if (pSyncNode->pFsm != NULL) {
337!
1651
    taosMemoryFree(pSyncNode->pFsm);
337!
1652
  }
1653

1654
  raftStoreClose(pSyncNode);
337✔
1655

1656
  taosMemoryFree(pSyncNode);
337!
1657
}
1658

1659
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->raftCfg.snapshotStrategy; }
×
1660

1661
// timer control --------------
1662
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
337✔
1663
  int32_t code = 0;
337✔
1664
  if (syncIsInit()) {
337!
1665
    bool stopped = taosTmrResetPriority(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, (void*)pSyncNode->rid,
337✔
1666
                                        syncEnv()->pTimerManager, &pSyncNode->pPingTimer, 2);
337✔
1667
    if (stopped) {
337!
1668
      sError("vgId:%d, failed to reset ping timer, ms:%d", pSyncNode->vgId, pSyncNode->pingTimerMS);
×
1669
      return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1670
    }
1671
    atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
337✔
1672
  } else {
1673
    sError("vgId:%d, start ping timer error, sync env is stop", pSyncNode->vgId);
×
1674
  }
1675
  return code;
337✔
1676
}
1677

1678
int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
674✔
1679
  int32_t code = 0;
674✔
1680
  (void)atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
674✔
1681
  bool stop = taosTmrStop(pSyncNode->pPingTimer);
674✔
1682
  sDebug("vgId:%d, stop ping timer, stop:%d", pSyncNode->vgId, stop);
674✔
1683
  pSyncNode->pPingTimer = NULL;
674✔
1684
  return code;
674✔
1685
}
1686

1687
int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
60,089✔
1688
  int32_t code = 0;
60,089✔
1689
  if (syncIsInit()) {
60,089!
1690
    pSyncNode->electTimerMS = ms;
60,089✔
1691

1692
    int64_t execTime = taosGetTimestampMs() + ms;
60,089✔
1693
    atomic_store_64(&(pSyncNode->electTimerParam.executeTime), execTime);
60,089✔
1694
    atomic_store_64(&(pSyncNode->electTimerParam.logicClock), pSyncNode->electTimerLogicClock);
60,089✔
1695
    pSyncNode->electTimerParam.pSyncNode = pSyncNode;
60,089✔
1696
    pSyncNode->electTimerParam.pData = NULL;
60,089✔
1697

1698
    bool stopped = taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, (void*)(pSyncNode->rid),
60,089✔
1699
                                syncEnv()->pTimerManager, &pSyncNode->pElectTimer);
60,089✔
1700
    if (stopped) sError("vgId:%d, failed to reset elect timer, ms:%d", pSyncNode->vgId, ms);
60,089!
1701
  } else {
1702
    sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
×
1703
  }
1704
  return code;
60,089✔
1705
}
1706

1707
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
61,045✔
1708
  int32_t code = 0;
61,045✔
1709
  (void)atomic_add_fetch_64(&pSyncNode->electTimerLogicClock, 1);
61,045✔
1710
  bool stop = taosTmrStop(pSyncNode->pElectTimer);
61,045✔
1711
  sTrace("vgId:%d, stop elect timer, stop:%d", pSyncNode->vgId, stop);
61,045✔
1712
  pSyncNode->pElectTimer = NULL;
61,045✔
1713

1714
  return code;
61,045✔
1715
}
1716

1717
int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
60,089✔
1718
  int32_t ret = 0;
60,089✔
1719
  TAOS_CHECK_RETURN(syncNodeStopElectTimer(pSyncNode));
60,089!
1720
  TAOS_CHECK_RETURN(syncNodeStartElectTimer(pSyncNode, ms));
60,089!
1721
  return ret;
60,089✔
1722
}
1723

1724
void syncNodeResetElectTimer(SSyncNode* pSyncNode) {
60,088✔
1725
  int32_t code = 0;
60,088✔
1726
  int32_t electMS;
1727

1728
  if (pSyncNode->raftCfg.isStandBy) {
60,088!
1729
    electMS = TIMER_MAX_MS;
×
1730
  } else {
1731
    electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
60,088✔
1732
  }
1733

1734
  if ((code = syncNodeRestartElectTimer(pSyncNode, electMS)) != 0) {
60,089!
1735
    sWarn("vgId:%d, failed to restart elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
1736
    return;
×
1737
  };
1738

1739
  sNTrace(pSyncNode, "reset elect timer, min:%d, max:%d, ms:%d", pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine,
60,089!
1740
          electMS);
1741
}
1742

1743
#ifdef BUILD_NO_CALL
1744
static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) {
1745
  int32_t code = 0;
1746
  if (syncIsInit()) {
1747
    TAOS_CHECK_RETURN(taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, (void*)pSyncNode->rid,
1748
                                   syncEnv()->pTimerManager, &pSyncNode->pHeartbeatTimer));
1749
    atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
1750
  } else {
1751
    sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId);
1752
  }
1753

1754
  sNTrace(pSyncNode, "start heartbeat timer, ms:%d", pSyncNode->heartbeatTimerMS);
1755
  return code;
1756
}
1757
#endif
1758

1759
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
333✔
1760
  int32_t ret = 0;
333✔
1761

1762
#if 0
1763
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
1764
  ret = syncNodeDoStartHeartbeatTimer(pSyncNode);
1765
#endif
1766

1767
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
395✔
1768
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
62✔
1769
    if (pSyncTimer != NULL) {
62!
1770
      TAOS_CHECK_RETURN(syncHbTimerStart(pSyncNode, pSyncTimer));
62!
1771
    }
1772
  }
1773

1774
  return ret;
333✔
1775
}
1776

1777
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
846✔
1778
  int32_t code = 0;
846✔
1779

1780
#if 0
1781
  TAOS_CHECK_RETURN(atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1));
1782
  bool stop = taosTmrStop(pSyncNode->pHeartbeatTimer);
1783
  sDebug("vgId:%d, stop heartbeat timer, stop:%d", pSyncNode->vgId, stop);
1784
  pSyncNode->pHeartbeatTimer = NULL;
1785
#endif
1786

1787
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1,373✔
1788
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
527✔
1789
    if (pSyncTimer != NULL) {
527!
1790
      TAOS_CHECK_RETURN(syncHbTimerStop(pSyncNode, pSyncTimer));
527!
1791
    }
1792
  }
1793

1794
  return code;
846✔
1795
}
1796

1797
#ifdef BUILD_NO_CALL
1798
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) {
1799
  // TODO check return value
1800
  int32_t code = 0;
1801
  TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
1802
  TAOS_CHECK_RETURN(syncNodeStartHeartbeatTimer(pSyncNode));
1803
  return 0;
1804
}
1805
#endif
1806

1807
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pNode, SRpcMsg* pMsg) {
120,459✔
1808
  SEpSet* epSet = NULL;
120,459✔
1809
  for (int32_t i = 0; i < pNode->peersNum; ++i) {
179,264✔
1810
    if (destRaftId->addr == pNode->peersId[i].addr) {
179,262✔
1811
      epSet = &pNode->peersEpset[i];
120,457✔
1812
      break;
120,457✔
1813
    }
1814
  }
1815

1816
  int32_t code = -1;
120,459✔
1817
  if (pNode->syncSendMSg != NULL && epSet != NULL) {
120,459!
1818
    syncUtilMsgHtoN(pMsg->pCont);
120,459✔
1819
    pMsg->info.noResp = 1;
120,459✔
1820
    code = pNode->syncSendMSg(epSet, pMsg);
120,459✔
1821
  }
1822

1823
  if (code < 0) {
120,463!
1824
    sError("vgId:%d, failed to send sync msg since %s. epset:%p dnode:%d addr:0x%" PRIx64, pNode->vgId, tstrerror(code),
×
1825
           epSet, DID(destRaftId), destRaftId->addr);
1826
    rpcFreeCont(pMsg->pCont);
×
1827
  }
1828

1829
  TAOS_RETURN(code);
120,463✔
1830
}
1831

1832
inline bool syncNodeInConfig(SSyncNode* pNode, const SSyncCfg* pCfg) {
83✔
1833
  bool b1 = false;
83✔
1834
  bool b2 = false;
83✔
1835

1836
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
101!
1837
    if (strcmp(pCfg->nodeInfo[i].nodeFqdn, pNode->myNodeInfo.nodeFqdn) == 0 &&
101!
1838
        pCfg->nodeInfo[i].nodePort == pNode->myNodeInfo.nodePort) {
101✔
1839
      b1 = true;
83✔
1840
      break;
83✔
1841
    }
1842
  }
1843

1844
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
101!
1845
    SRaftId raftId = {
101✔
1846
        .addr = SYNC_ADDR(&pCfg->nodeInfo[i]),
101✔
1847
        .vgId = pNode->vgId,
101✔
1848
    };
1849

1850
    if (syncUtilSameId(&raftId, &pNode->myRaftId)) {
101✔
1851
      b2 = true;
83✔
1852
      break;
83✔
1853
    }
1854
  }
1855

1856
  if (b1 != b2) {
83!
1857
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1858
    return false;
×
1859
  }
1860
  return b1;
83✔
1861
}
1862

1863
static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg) {
95✔
1864
  if (pOldCfg->totalReplicaNum != pNewCfg->totalReplicaNum) return true;
95✔
1865
  if (pOldCfg->myIndex != pNewCfg->myIndex) return true;
65!
1866
  for (int32_t i = 0; i < pOldCfg->totalReplicaNum; ++i) {
142✔
1867
    const SNodeInfo* pOldInfo = &pOldCfg->nodeInfo[i];
95✔
1868
    const SNodeInfo* pNewInfo = &pNewCfg->nodeInfo[i];
95✔
1869
    if (strcmp(pOldInfo->nodeFqdn, pNewInfo->nodeFqdn) != 0) return true;
95!
1870
    if (pOldInfo->nodePort != pNewInfo->nodePort) return true;
95!
1871
    if (pOldInfo->nodeRole != pNewInfo->nodeRole) return true;
95✔
1872
  }
1873

1874
  return false;
47✔
1875
}
1876

1877
int32_t syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) {
59✔
1878
  int32_t  code = 0;
59✔
1879
  SSyncCfg oldConfig = pSyncNode->raftCfg.cfg;
59✔
1880
  if (!syncIsConfigChanged(&oldConfig, pNewConfig)) {
59✔
1881
    sInfo("vgId:1, sync not reconfig since not changed");
47✔
1882
    return 0;
47✔
1883
  }
1884

1885
  pSyncNode->raftCfg.cfg = *pNewConfig;
12✔
1886
  pSyncNode->raftCfg.lastConfigIndex = lastConfigChangeIndex;
12✔
1887

1888
  pSyncNode->configChangeNum++;
12✔
1889

1890
  bool IamInOld = syncNodeInConfig(pSyncNode, &oldConfig);
12✔
1891
  bool IamInNew = syncNodeInConfig(pSyncNode, pNewConfig);
12✔
1892

1893
  bool isDrop = false;
12✔
1894
  bool isAdd = false;
12✔
1895

1896
  if (IamInOld && !IamInNew) {
12!
1897
    isDrop = true;
×
1898
  } else {
1899
    isDrop = false;
12✔
1900
  }
1901

1902
  if (!IamInOld && IamInNew) {
12!
1903
    isAdd = true;
×
1904
  } else {
1905
    isAdd = false;
12✔
1906
  }
1907

1908
  // log begin config change
1909
  sNInfo(pSyncNode, "begin do config change, from %d to %d, from %" PRId64 " to %" PRId64 ", replicas:%d",
12!
1910
         pSyncNode->vgId, oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum, oldConfig.lastIndex,
1911
         pNewConfig->lastIndex);
1912

1913
  if (IamInNew) {
12!
1914
    pSyncNode->raftCfg.isStandBy = 0;  // change isStandBy to normal
12✔
1915
  }
1916
  if (isDrop) {
12!
1917
    pSyncNode->raftCfg.isStandBy = 1;  // set standby
×
1918
  }
1919

1920
  // add last config index
1921
  SRaftCfg* pCfg = &pSyncNode->raftCfg;
12✔
1922
  if (pCfg->configIndexCount >= MAX_CONFIG_INDEX_COUNT) {
12!
1923
    sNError(pSyncNode, "failed to add cfg index:%d since out of range", pCfg->configIndexCount);
×
1924
    terrno = TSDB_CODE_OUT_OF_RANGE;
×
1925
    return -1;
×
1926
  }
1927

1928
  pCfg->configIndexArr[pCfg->configIndexCount] = lastConfigChangeIndex;
12✔
1929
  pCfg->configIndexCount++;
12✔
1930

1931
  if (IamInNew) {
12!
1932
    //-----------------------------------------
1933
    int32_t ret = 0;
12✔
1934

1935
    // save snapshot senders
1936
    SRaftId oldReplicasId[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
1937
    memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId));
12✔
1938
    SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
1939
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
192✔
1940
      oldSenders[i] = pSyncNode->senders[i];
180✔
1941
      sSTrace(oldSenders[i], "snapshot sender save old");
180!
1942
    }
1943

1944
    // init internal
1945
    pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
12✔
1946
    if (syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId) == false) return terrno;
12!
1947

1948
    // init peersNum, peers, peersId
1949
    pSyncNode->peersNum = pSyncNode->raftCfg.cfg.totalReplicaNum - 1;
12✔
1950
    int32_t j = 0;
12✔
1951
    for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
44✔
1952
      if (i != pSyncNode->raftCfg.cfg.myIndex) {
32✔
1953
        pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
20✔
1954
        syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
20✔
1955
        j++;
20✔
1956
      }
1957
    }
1958
    for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
32✔
1959
      if (syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]) == false)
20!
1960
        return terrno;
×
1961
    }
1962

1963
    // init replicaNum, replicasId
1964
    pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
12✔
1965
    pSyncNode->totalReplicaNum = pSyncNode->raftCfg.cfg.totalReplicaNum;
12✔
1966
    for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
44✔
1967
      if (syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]) ==
32!
1968
          false)
1969
        return terrno;
×
1970
    }
1971

1972
    // update quorum first
1973
    pSyncNode->quorum = syncUtilQuorum(pSyncNode->raftCfg.cfg.replicaNum);
12✔
1974

1975
    syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode);
12✔
1976
    syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode);
12✔
1977
    voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode);
12✔
1978
    votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode);
12✔
1979

1980
    // reset snapshot senders
1981

1982
    // clear new
1983
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
192✔
1984
      pSyncNode->senders[i] = NULL;
180✔
1985
    }
1986

1987
    // reset new
1988
    for (int32_t i = 0; i < pSyncNode->totalReplicaNum; ++i) {
44✔
1989
      // reset sender
1990
      bool reset = false;
32✔
1991
      for (int32_t j = 0; j < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++j) {
140✔
1992
        if (syncUtilSameId(&(pSyncNode->replicasId)[i], &oldReplicasId[j]) && oldSenders[j] != NULL) {
134!
1993
          sNTrace(pSyncNode, "snapshot sender reset for:%" PRId64 ", newIndex:%d, dnode:%d, %p",
26!
1994
                  (pSyncNode->replicasId)[i].addr, i, DID(&pSyncNode->replicasId[i]), oldSenders[j]);
1995

1996
          pSyncNode->senders[i] = oldSenders[j];
26✔
1997
          oldSenders[j] = NULL;
26✔
1998
          reset = true;
26✔
1999

2000
          // reset replicaIndex
2001
          int32_t oldreplicaIndex = pSyncNode->senders[i]->replicaIndex;
26✔
2002
          pSyncNode->senders[i]->replicaIndex = i;
26✔
2003

2004
          sNTrace(pSyncNode, "snapshot sender udpate replicaIndex from %d to %d, dnode:%d, %p, reset:%d",
26!
2005
                  oldreplicaIndex, i, DID(&pSyncNode->replicasId[i]), pSyncNode->senders[i], reset);
2006

2007
          break;
26✔
2008
        }
2009
      }
2010
    }
2011

2012
    // create new
2013
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
192✔
2014
      if (pSyncNode->senders[i] == NULL) {
180✔
2015
        TAOS_CHECK_RETURN(snapshotSenderCreate(pSyncNode, i, &pSyncNode->senders[i]));
154!
2016
        if (pSyncNode->senders[i] == NULL) {
154!
2017
          // will be created later while send snapshot
2018
          sSError(pSyncNode->senders[i], "snapshot sender create failed while reconfig");
×
2019
        } else {
2020
          sSDebug(pSyncNode->senders[i], "snapshot sender create while reconfig, data:%p", pSyncNode->senders[i]);
154!
2021
        }
2022
      } else {
2023
        sSDebug(pSyncNode->senders[i], "snapshot sender already exist, data:%p", pSyncNode->senders[i]);
26!
2024
      }
2025
    }
2026

2027
    // free old
2028
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
192✔
2029
      if (oldSenders[i] != NULL) {
180✔
2030
        sSDebug(oldSenders[i], "snapshot sender destroy old, data:%p replica-index:%d", oldSenders[i], i);
154!
2031
        snapshotSenderDestroy(oldSenders[i]);
154✔
2032
        oldSenders[i] = NULL;
154✔
2033
      }
2034
    }
2035

2036
    // persist cfg
2037
    TAOS_CHECK_RETURN(syncWriteCfgFile(pSyncNode));
12!
2038
  } else {
2039
    // persist cfg
2040
    TAOS_CHECK_RETURN(syncWriteCfgFile(pSyncNode));
×
2041
    sNInfo(pSyncNode, "do not config change from %d to %d", oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum);
×
2042
  }
2043

2044
_END:
×
2045
  // log end config change
2046
  sNInfo(pSyncNode, "end do config change, from %d to %d", oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum);
12!
2047
  return 0;
12✔
2048
}
2049

2050
// raft state change --------------
2051
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
×
2052
  if (term > raftStoreGetTerm(pSyncNode)) {
×
2053
    raftStoreSetTerm(pSyncNode, term);
×
2054
  }
2055
}
×
2056

2057
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm, SRaftId id) {
55,894✔
2058
  SyncTerm currentTerm = raftStoreGetTerm(pSyncNode);
55,894✔
2059
  if (currentTerm > newTerm) {
55,895!
2060
    sNTrace(pSyncNode, "step down, ignore, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
×
2061
    return;
×
2062
  }
2063

2064
  do {
2065
    sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
55,895!
2066
  } while (0);
2067

2068
  if (pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
55,895!
2069
    (void)taosThreadMutexLock(&pSyncNode->arbTokenMutex);
×
2070
    syncUtilGenerateArbToken(pSyncNode->myNodeInfo.nodeId, pSyncNode->vgId, pSyncNode->arbToken);
×
2071
    sInfo("vgId:%d, step down as assigned leader, new arbToken:%s", pSyncNode->vgId, pSyncNode->arbToken);
×
2072
    (void)taosThreadMutexUnlock(&pSyncNode->arbTokenMutex);
×
2073
  }
2074

2075
  if (currentTerm < newTerm) {
55,894✔
2076
    raftStoreSetTerm(pSyncNode, newTerm);
46✔
2077
    char tmpBuf[64];
2078
    snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm);
46✔
2079
    syncNodeBecomeFollower(pSyncNode, id, tmpBuf);
46✔
2080
    raftStoreClearVote(pSyncNode);
46✔
2081
  } else {
2082
    if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
55,848✔
2083
      syncNodeBecomeFollower(pSyncNode, id, "step down");
1✔
2084
    }
2085
  }
2086
}
2087

2088
void syncNodeLeaderChangeRsp(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); }
121✔
2089

2090
void syncNodeBecomeFollower(SSyncNode* pSyncNode, SRaftId leaderId, const char* debugStr) {
121✔
2091
  int32_t code = 0;  // maybe clear leader cache
121✔
2092
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
121!
2093
    pSyncNode->leaderCache = EMPTY_RAFT_ID;
×
2094
  }
2095

2096
  pSyncNode->hbSlowNum = 0;
121✔
2097

2098
  pSyncNode->leaderCache = leaderId;  // state change
121✔
2099
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
121✔
2100
  pSyncNode->roleTimeMs = taosGetTimestampMs();
121✔
2101
  if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
121!
2102
    sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
2103
    return;
×
2104
  }
2105

2106
  // trace log
2107
  sNTrace(pSyncNode, "become follower %s", debugStr);
121!
2108

2109
  // send rsp to client
2110
  syncNodeLeaderChangeRsp(pSyncNode);
121✔
2111

2112
  // call back
2113
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeFollowerCb != NULL) {
121!
2114
    pSyncNode->pFsm->FpBecomeFollowerCb(pSyncNode->pFsm);
121✔
2115
  }
2116

2117
  // min match index
2118
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
121✔
2119

2120
  // reset log buffer
2121
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
121!
2122
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2123
    return;
×
2124
  }
2125

2126
  // reset elect timer
2127
  syncNodeResetElectTimer(pSyncNode);
121✔
2128

2129
  sInfo("vgId:%d, become follower. %s", pSyncNode->vgId, debugStr);
121!
2130
}
2131

2132
void syncNodeBecomeLearner(SSyncNode* pSyncNode, const char* debugStr) {
8✔
2133
  pSyncNode->hbSlowNum = 0;
8✔
2134

2135
  // state change
2136
  pSyncNode->state = TAOS_SYNC_STATE_LEARNER;
8✔
2137
  pSyncNode->roleTimeMs = taosGetTimestampMs();
8✔
2138

2139
  // trace log
2140
  sNTrace(pSyncNode, "become learner %s", debugStr);
8!
2141

2142
  // call back
2143
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLearnerCb != NULL) {
8!
2144
    pSyncNode->pFsm->FpBecomeLearnerCb(pSyncNode->pFsm);
8✔
2145
  }
2146

2147
  // min match index
2148
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
8✔
2149

2150
  // reset log buffer
2151
  int32_t code = 0;
8✔
2152
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
8!
2153
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2154
    return;
×
2155
  };
2156
}
2157

2158
// TLA+ Spec
2159
// \* Candidate i transitions to leader.
2160
// BecomeLeader(i) ==
2161
//     /\ state[i] = Candidate
2162
//     /\ votesGranted[i] \in Quorum
2163
//     /\ state'      = [state EXCEPT ![i] = Leader]
2164
//     /\ nextIndex'  = [nextIndex EXCEPT ![i] =
2165
//                          [j \in Server |-> Len(log[i]) + 1]]
2166
//     /\ matchIndex' = [matchIndex EXCEPT ![i] =
2167
//                          [j \in Server |-> 0]]
2168
//     /\ elections'  = elections \cup
2169
//                          {[eterm     |-> currentTerm[i],
2170
//                            eleader   |-> i,
2171
//                            elog      |-> log[i],
2172
//                            evotes    |-> votesGranted[i],
2173
//                            evoterLog |-> voterLog[i]]}
2174
//     /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
2175
//
2176
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
282✔
2177
  int32_t code = 0;
282✔
2178
  pSyncNode->becomeLeaderNum++;
282✔
2179
  pSyncNode->hbrSlowNum = 0;
282✔
2180

2181
  // reset restoreFinish
2182
  pSyncNode->restoreFinish = false;
282✔
2183

2184
  // state change
2185
  pSyncNode->state = TAOS_SYNC_STATE_LEADER;
282✔
2186
  pSyncNode->roleTimeMs = taosGetTimestampMs();
282✔
2187

2188
  // set leader cache
2189
  pSyncNode->leaderCache = pSyncNode->myRaftId;
282✔
2190

2191
  for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
610✔
2192
    SyncIndex lastIndex;
2193
    SyncTerm  lastTerm;
2194
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
328✔
2195
    if (code != 0) {
328!
2196
      sError("vgId:%d, failed to become leader since %s", pSyncNode->vgId, tstrerror(code));
×
2197
      return;
×
2198
    }
2199
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
328✔
2200
  }
2201

2202
  for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
610✔
2203
    // maybe overwrite myself, no harm
2204
    // just do it!
2205
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
328✔
2206
  }
2207

2208
  // init peer mgr
2209
  if ((code = syncNodePeerStateInit(pSyncNode)) != 0) {
282!
2210
    sError("vgId:%d, failed to init peer state since %s", pSyncNode->vgId, tstrerror(code));
×
2211
    return;
×
2212
  }
2213

2214
#if 0
2215
  // update sender private term
2216
  SSyncSnapshotSender* pMySender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->myRaftId));
2217
  if (pMySender != NULL) {
2218
    for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
2219
      if (pSyncNode->senders[i]->privateTerm > pMySender->privateTerm) {
2220
        pMySender->privateTerm = pSyncNode->senders[i]->privateTerm;
2221
      }
2222
    }
2223
    (pMySender->privateTerm) += 100;
2224
  }
2225
#endif
2226

2227
  // close receiver
2228
  if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
282!
2229
    snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
2230
  }
2231

2232
  // stop elect timer
2233
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
282!
2234
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
2235
    return;
×
2236
  }
2237

2238
  // start heartbeat timer
2239
  if ((code = syncNodeStartHeartbeatTimer(pSyncNode)) != 0) {
282!
2240
    sError("vgId:%d, failed to start heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
2241
    return;
×
2242
  }
2243

2244
  // send heartbeat right now
2245
  if ((code = syncNodeHeartbeatPeers(pSyncNode)) != 0) {
282!
2246
    sError("vgId:%d, failed to send heartbeat to peers since %s", pSyncNode->vgId, tstrerror(code));
×
2247
    return;
×
2248
  }
2249

2250
  // call back
2251
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLeaderCb != NULL) {
282!
2252
    pSyncNode->pFsm->FpBecomeLeaderCb(pSyncNode->pFsm);
282✔
2253
  }
2254

2255
  // min match index
2256
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
282✔
2257

2258
  // reset log buffer
2259
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
282!
2260
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2261
    return;
×
2262
  }
2263

2264
  // trace log
2265
  sNInfo(pSyncNode, "become leader %s", debugStr);
282✔
2266
}
2267

2268
void syncNodeBecomeAssignedLeader(SSyncNode* pSyncNode) {
×
2269
  int32_t code = 0;
×
2270
  pSyncNode->becomeAssignedLeaderNum++;
×
2271
  pSyncNode->hbrSlowNum = 0;
×
2272

2273
  // reset restoreFinish
2274
  // pSyncNode->restoreFinish = false;
2275

2276
  // state change
2277
  pSyncNode->state = TAOS_SYNC_STATE_ASSIGNED_LEADER;
×
2278
  pSyncNode->roleTimeMs = taosGetTimestampMs();
×
2279

2280
  // set leader cache
2281
  pSyncNode->leaderCache = pSyncNode->myRaftId;
×
2282

2283
  for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
×
2284
    SyncIndex lastIndex;
2285
    SyncTerm  lastTerm;
2286
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
×
2287
    if (code != 0) {
×
2288
      sError("vgId:%d, failed to become assigned leader since %s", pSyncNode->vgId, tstrerror(code));
×
2289
      return;
×
2290
    }
2291
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
×
2292
  }
2293

2294
  for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
×
2295
    // maybe overwrite myself, no harm
2296
    // just do it!
2297
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
×
2298
  }
2299

2300
  // init peer mgr
2301
  if ((code = syncNodePeerStateInit(pSyncNode)) != 0) {
×
2302
    sError("vgId:%d, failed to init peer state since %s", pSyncNode->vgId, tstrerror(code));
×
2303
    return;
×
2304
  }
2305

2306
  // close receiver
2307
  if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
×
2308
    snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
2309
  }
2310

2311
  // stop elect timer
2312
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
×
2313
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
2314
    return;
×
2315
  }
2316

2317
  // start heartbeat timer
2318
  if ((code = syncNodeStartHeartbeatTimer(pSyncNode)) != 0) {
×
2319
    sError("vgId:%d, failed to start heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
2320
    return;
×
2321
  }
2322

2323
  // send heartbeat right now
2324
  if ((code = syncNodeHeartbeatPeers(pSyncNode)) != 0) {
×
2325
    sError("vgId:%d, failed to send heartbeat to peers since %s", pSyncNode->vgId, tstrerror(code));
×
2326
    return;
×
2327
  }
2328

2329
  // call back
2330
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeAssignedLeaderCb != NULL) {
×
2331
    pSyncNode->pFsm->FpBecomeAssignedLeaderCb(pSyncNode->pFsm);
×
2332
  }
2333

2334
  // min match index
2335
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
×
2336

2337
  // reset log buffer
2338
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
×
2339
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2340
    return;
×
2341
  }
2342

2343
  // trace log
2344
  sNInfo(pSyncNode, "become assigned leader");
×
2345
}
2346

2347
void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
27✔
2348
  if (pSyncNode->state != TAOS_SYNC_STATE_CANDIDATE) {
27!
2349
    sError("vgId:%d, failed leader from candidate since node state is wrong:%d", pSyncNode->vgId, pSyncNode->state);
×
2350
    return;
×
2351
  }
2352
  bool granted = voteGrantedMajority(pSyncNode->pVotesGranted);
27✔
2353
  if (!granted) {
27!
2354
    sError("vgId:%d, not granted by majority.", pSyncNode->vgId);
×
2355
    return;
×
2356
  }
2357
  syncNodeBecomeLeader(pSyncNode, "candidate to leader");
27✔
2358

2359
  sNTrace(pSyncNode, "state change syncNodeCandidate2Leader");
27!
2360

2361
  int32_t ret = syncNodeAppendNoop(pSyncNode);
27✔
2362
  if (ret < 0) {
27!
2363
    sError("vgId:%d, failed to append noop entry since %s", pSyncNode->vgId, terrstr());
×
2364
  }
2365

2366
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
27✔
2367

2368
  sInfo("vgId:%d, become leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64, pSyncNode->vgId,
27!
2369
        raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
2370
}
2371

2372
bool syncNodeIsMnode(SSyncNode* pSyncNode) { return (pSyncNode->vgId == 1); }
5,476✔
2373

2374
int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) {
622✔
2375
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
9,952✔
2376
    pSyncNode->peerStates[i].lastSendIndex = SYNC_INDEX_INVALID;
9,330✔
2377
    pSyncNode->peerStates[i].lastSendTime = 0;
9,330✔
2378
  }
2379

2380
  return 0;
622✔
2381
}
2382

2383
void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
30✔
2384
  if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
30!
2385
    sError("vgId:%d, failed candidate from follower since node state is wrong:%d", pSyncNode->vgId, pSyncNode->state);
×
2386
    return;
×
2387
  }
2388
  pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
30✔
2389
  pSyncNode->roleTimeMs = taosGetTimestampMs();
30✔
2390
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
30✔
2391
  sInfo("vgId:%d, become candidate from follower. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
30!
2392
        pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
2393

2394
  sNTrace(pSyncNode, "follower to candidate");
30!
2395
}
2396

2397
int32_t syncNodeAssignedLeader2Leader(SSyncNode* pSyncNode) {
×
2398
  if (pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) return TSDB_CODE_SYN_INTERNAL_ERROR;
×
2399
  syncNodeBecomeLeader(pSyncNode, "assigned leader to leader");
×
2400

2401
  sNTrace(pSyncNode, "assigned leader to leader");
×
2402

2403
  int32_t ret = syncNodeAppendNoop(pSyncNode);
×
2404
  if (ret < 0) {
×
2405
    sError("vgId:%d, failed to append noop entry since %s", pSyncNode->vgId, tstrerror(ret));
×
2406
  }
2407

2408
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
×
2409
  sInfo("vgId:%d, become leader from assigned leader. term:%" PRId64 ", commit index:%" PRId64
×
2410
        "assigned commit index:%" PRId64 ", last index:%" PRId64,
2411
        pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, pSyncNode->assignedCommitIndex,
2412
        lastIndex);
2413
  return 0;
×
2414
}
2415

2416
// just called by syncNodeVoteForSelf
2417
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
30✔
2418
  SyncTerm storeTerm = raftStoreGetTerm(pSyncNode);
30✔
2419
  if (term != storeTerm) {
30!
2420
    sError("vgId:%d, failed to vote for term, term:%" PRId64 ", storeTerm:%" PRId64, pSyncNode->vgId, term, storeTerm);
×
2421
    return;
×
2422
  }
2423
  sTrace("vgId:%d, begin hasVoted", pSyncNode->vgId);
30!
2424
  bool voted = raftStoreHasVoted(pSyncNode);
30✔
2425
  if (voted) {
30!
2426
    sError("vgId:%d, failed to vote for term since not voted", pSyncNode->vgId);
×
2427
    return;
×
2428
  }
2429

2430
  raftStoreVote(pSyncNode, pRaftId);
30✔
2431
}
2432

2433
// simulate get vote from outside
2434
void syncNodeVoteForSelf(SSyncNode* pSyncNode, SyncTerm currentTerm) {
30✔
2435
  syncNodeVoteForTerm(pSyncNode, currentTerm, &pSyncNode->myRaftId);
30✔
2436

2437
  SRpcMsg rpcMsg = {0};
30✔
2438
  int32_t ret = syncBuildRequestVoteReply(&rpcMsg, pSyncNode->vgId);
30✔
2439
  if (ret != 0) return;
30!
2440

2441
  SyncRequestVoteReply* pMsg = rpcMsg.pCont;
30✔
2442
  pMsg->srcId = pSyncNode->myRaftId;
30✔
2443
  pMsg->destId = pSyncNode->myRaftId;
30✔
2444
  pMsg->term = currentTerm;
30✔
2445
  pMsg->voteGranted = true;
30✔
2446

2447
  voteGrantedVote(pSyncNode->pVotesGranted, pMsg);
30✔
2448
  votesRespondAdd(pSyncNode->pVotesRespond, pMsg);
30✔
2449
  rpcFreeCont(rpcMsg.pCont);
30✔
2450
}
2451

2452
// return if has a snapshot
2453
bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
429✔
2454
  bool      ret = false;
429✔
2455
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
429✔
2456
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
429!
2457
    // TODO check return value
2458
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
429✔
2459
    if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) {
429✔
2460
      ret = true;
76✔
2461
    }
2462
  }
2463
  return ret;
429✔
2464
}
2465

2466
// return max(logLastIndex, snapshotLastIndex)
2467
// if no snapshot and log, return -1
2468
SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode) {
429✔
2469
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
429✔
2470
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
429!
2471
    // TODO check return value
2472
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
429✔
2473
  }
2474
  SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
429✔
2475

2476
  SyncIndex lastIndex = logLastIndex > snapshot.lastApplyIndex ? logLastIndex : snapshot.lastApplyIndex;
429✔
2477
  return lastIndex;
429✔
2478
}
2479

2480
// return the last term of snapshot and log
2481
// if error, return SYNC_TERM_INVALID (by syncLogLastTerm)
2482
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) {
429✔
2483
  SyncTerm lastTerm = 0;
429✔
2484
  if (syncNodeHasSnapshot(pSyncNode)) {
429✔
2485
    // has snapshot
2486
    SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
76✔
2487
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
76!
2488
      // TODO check return value
2489
      (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
76✔
2490
    }
2491

2492
    SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
76✔
2493
    if (logLastIndex > snapshot.lastApplyIndex) {
76✔
2494
      lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
56✔
2495
    } else {
2496
      lastTerm = snapshot.lastApplyTerm;
20✔
2497
    }
2498

2499
  } else {
2500
    // no snapshot
2501
    lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
353✔
2502
  }
2503

2504
  return lastTerm;
429✔
2505
}
2506

2507
// get last index and term along with snapshot
2508
int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm) {
379✔
2509
  *pLastIndex = syncNodeGetLastIndex(pSyncNode);
379✔
2510
  *pLastTerm = syncNodeGetLastTerm(pSyncNode);
379✔
2511
  return 0;
379✔
2512
}
2513

2514
#ifdef BUILD_NO_CALL
2515
// return append-entries first try index
2516
SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) {
2517
  SyncIndex syncStartIndex = syncNodeGetLastIndex(pSyncNode) + 1;
2518
  return syncStartIndex;
2519
}
2520

2521
// if index > 0, return index - 1
2522
// else, return -1
2523
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
2524
  SyncIndex preIndex = index - 1;
2525
  if (preIndex < SYNC_INDEX_INVALID) {
2526
    preIndex = SYNC_INDEX_INVALID;
2527
  }
2528

2529
  return preIndex;
2530
}
2531

2532
// if index < 0, return SYNC_TERM_INVALID
2533
// if index == 0, return 0
2534
// if index > 0, return preTerm
2535
// if error, return SYNC_TERM_INVALID
2536
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
2537
  if (index < SYNC_INDEX_BEGIN) {
2538
    return SYNC_TERM_INVALID;
2539
  }
2540

2541
  if (index == SYNC_INDEX_BEGIN) {
2542
    return 0;
2543
  }
2544

2545
  SyncTerm  preTerm = 0;
2546
  SyncIndex preIndex = index - 1;
2547

2548
  SSyncRaftEntry* pPreEntry = NULL;
2549
  SLRUCache*      pCache = pSyncNode->pLogStore->pCache;
2550
  LRUHandle*      h = taosLRUCacheLookup(pCache, &preIndex, sizeof(preIndex));
2551
  int32_t         code = 0;
2552
  if (h) {
2553
    pPreEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
2554
    code = 0;
2555

2556
    pSyncNode->pLogStore->cacheHit++;
2557
    sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", preIndex, pPreEntry->bytes, pPreEntry);
2558

2559
  } else {
2560
    pSyncNode->pLogStore->cacheMiss++;
2561
    sNTrace(pSyncNode, "miss cache index:%" PRId64, preIndex);
2562

2563
    code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry);
2564
  }
2565

2566
  SSnapshot snapshot = {.data = NULL,
2567
                        .lastApplyIndex = SYNC_INDEX_INVALID,
2568
                        .lastApplyTerm = SYNC_TERM_INVALID,
2569
                        .lastConfigIndex = SYNC_INDEX_INVALID};
2570

2571
  if (code == 0) {
2572
    if (pPreEntry == NULL) return -1;
2573
    preTerm = pPreEntry->term;
2574

2575
    if (h) {
2576
      taosLRUCacheRelease(pCache, h, false);
2577
    } else {
2578
      syncEntryDestroy(pPreEntry);
2579
    }
2580

2581
    return preTerm;
2582
  } else {
2583
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
2584
      // TODO check return value
2585
      (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
2586
      if (snapshot.lastApplyIndex == preIndex) {
2587
        return snapshot.lastApplyTerm;
2588
      }
2589
    }
2590
  }
2591

2592
  sNError(pSyncNode, "sync node get pre term error, index:%" PRId64 ", snap-index:%" PRId64 ", snap-term:%" PRId64,
2593
          index, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
2594
  return SYNC_TERM_INVALID;
2595
}
2596

2597
// get pre index and term of "index"
2598
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm) {
2599
  *pPreIndex = syncNodeGetPreIndex(pSyncNode, index);
2600
  *pPreTerm = syncNodeGetPreTerm(pSyncNode, index);
2601
  return 0;
2602
}
2603
#endif
2604

2605
static void syncNodeEqPingTimer(void* param, void* tmrId) {
2,367✔
2606
  if (!syncIsInit()) return;
2,367!
2607

2608
  int64_t    rid = (int64_t)param;
2,367✔
2609
  SSyncNode* pNode = syncNodeAcquire(rid);
2,367✔
2610

2611
  if (pNode == NULL) return;
2,367!
2612

2613
  if (atomic_load_64(&pNode->pingTimerLogicClockUser) <= atomic_load_64(&pNode->pingTimerLogicClock)) {
2,367!
2614
    SRpcMsg rpcMsg = {0};
2,367✔
2615
    int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock),
2,367✔
2616
                                    pNode->pingTimerMS, pNode);
2617
    if (code != 0) {
2,367!
2618
      sError("failed to build ping msg");
×
2619
      rpcFreeCont(rpcMsg.pCont);
×
2620
      goto _out;
×
2621
    }
2622

2623
    // sTrace("enqueue ping msg");
2624
    code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
2,367✔
2625
    if (code != 0) {
2,367✔
2626
      sError("failed to sync enqueue ping msg since %s", terrstr());
1!
2627
      rpcFreeCont(rpcMsg.pCont);
1✔
2628
      goto _out;
1✔
2629
    }
2630

2631
  _out:
2,366✔
2632
    if (taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager,
2,367!
2633
                     &pNode->pPingTimer))
2634
      sError("failed to reset ping timer");
×
2635
  }
2636
  syncNodeRelease(pNode);
2,367✔
2637
}
2638

2639
static void syncNodeEqElectTimer(void* param, void* tmrId) {
31✔
2640
  if (!syncIsInit()) return;
32!
2641

2642
  int64_t    rid = (int64_t)param;
31✔
2643
  SSyncNode* pNode = syncNodeAcquire(rid);
31✔
2644

2645
  if (pNode == NULL) return;
31!
2646

2647
  if (pNode->syncEqMsg == NULL) {
31!
2648
    syncNodeRelease(pNode);
×
2649
    return;
×
2650
  }
2651

2652
  int64_t tsNow = taosGetTimestampMs();
31✔
2653
  if (tsNow < pNode->electTimerParam.executeTime) {
31✔
2654
    syncNodeRelease(pNode);
1✔
2655
    return;
1✔
2656
  }
2657

2658
  SRpcMsg rpcMsg = {0};
30✔
2659
  int32_t code =
2660
      syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pNode->electTimerParam.logicClock, pNode->electTimerMS, pNode);
30✔
2661

2662
  if (code != 0) {
30!
2663
    sError("failed to build elect msg");
×
2664
    syncNodeRelease(pNode);
×
2665
    return;
×
2666
  }
2667

2668
  SyncTimeout* pTimeout = rpcMsg.pCont;
30✔
2669
  sNTrace(pNode, "enqueue elect msg lc:%" PRId64, pTimeout->logicClock);
30!
2670

2671
  code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
30✔
2672
  if (code != 0) {
30!
2673
    sError("failed to sync enqueue elect msg since %s", terrstr());
×
2674
    rpcFreeCont(rpcMsg.pCont);
×
2675
    syncNodeRelease(pNode);
×
2676
    return;
×
2677
  }
2678

2679
  syncNodeRelease(pNode);
30✔
2680
}
2681

2682
#ifdef BUILD_NO_CALL
2683
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
2684
  if (!syncIsInit()) return;
2685

2686
  int64_t    rid = (int64_t)param;
2687
  SSyncNode* pNode = syncNodeAcquire(rid);
2688

2689
  if (pNode == NULL) return;
2690

2691
  if (pNode->totalReplicaNum > 1) {
2692
    if (atomic_load_64(&pNode->heartbeatTimerLogicClockUser) <= atomic_load_64(&pNode->heartbeatTimerLogicClock)) {
2693
      SRpcMsg rpcMsg = {0};
2694
      int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pNode->heartbeatTimerLogicClock),
2695
                                      pNode->heartbeatTimerMS, pNode);
2696

2697
      if (code != 0) {
2698
        sError("failed to build heartbeat msg");
2699
        goto _out;
2700
      }
2701

2702
      sTrace("vgId:%d, enqueue heartbeat timer", pNode->vgId);
2703
      code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
2704
      if (code != 0) {
2705
        sError("failed to enqueue heartbeat msg since %s", terrstr());
2706
        rpcFreeCont(rpcMsg.pCont);
2707
        goto _out;
2708
      }
2709

2710
    _out:
2711
      if (taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager,
2712
                       &pNode->pHeartbeatTimer) != 0)
2713
        return;
2714

2715
    } else {
2716
      sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRId64 ", heartbeatTimerLogicClockUser:%" PRId64,
2717
             pNode->heartbeatTimerLogicClock, pNode->heartbeatTimerLogicClockUser);
2718
    }
2719
  }
2720
}
2721
#endif
2722

2723
static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
4,149✔
2724
  int32_t code = 0;
4,149✔
2725
  int64_t hbDataRid = (int64_t)param;
4,149✔
2726
  int64_t tsNow = taosGetTimestampMs();
4,149✔
2727

2728
  SSyncHbTimerData* pData = syncHbTimerDataAcquire(hbDataRid);
4,149✔
2729
  if (pData == NULL) {
4,149!
2730
    sError("hb timer get pData NULL, %" PRId64, hbDataRid);
×
2731
    return;
×
2732
  }
2733

2734
  SSyncNode* pSyncNode = syncNodeAcquire(pData->syncNodeRid);
4,149✔
2735
  if (pSyncNode == NULL) {
4,149!
2736
    syncHbTimerDataRelease(pData);
×
2737
    sError("hb timer get pSyncNode NULL");
×
2738
    return;
×
2739
  }
2740

2741
  SSyncTimer* pSyncTimer = pData->pTimer;
4,149✔
2742

2743
  if (!pSyncNode->isStart) {
4,149!
2744
    syncNodeRelease(pSyncNode);
×
2745
    syncHbTimerDataRelease(pData);
×
2746
    sError("vgId:%d, hb timer sync node already stop", pSyncNode->vgId);
×
2747
    return;
×
2748
  }
2749

2750
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
4,149!
2751
    syncNodeRelease(pSyncNode);
×
2752
    syncHbTimerDataRelease(pData);
×
2753
    sError("vgId:%d, hb timer sync node not leader", pSyncNode->vgId);
×
2754
    return;
×
2755
  }
2756

2757
  sTrace("vgId:%d, peer hb timer execution, rid:%" PRId64 " addr:0x%" PRIx64, pSyncNode->vgId, hbDataRid,
4,149!
2758
         pData->destId.addr);
2759

2760
  if (pSyncNode->totalReplicaNum > 1) {
4,149!
2761
    int64_t timerLogicClock = atomic_load_64(&pSyncTimer->logicClock);
4,149✔
2762
    int64_t msgLogicClock = atomic_load_64(&pData->logicClock);
4,149✔
2763

2764
    if (timerLogicClock == msgLogicClock) {
4,149!
2765
      if (tsNow > pData->execTime) {
4,149✔
2766
        pData->execTime += pSyncTimer->timerMS;
4,140✔
2767

2768
        SRpcMsg rpcMsg = {0};
4,140✔
2769
        if ((code = syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId)) != 0) {
4,140!
2770
          sError("vgId:%d, failed to build heartbeat msg since %s", pSyncNode->vgId, tstrerror(code));
×
2771
          syncNodeRelease(pSyncNode);
×
2772
          syncHbTimerDataRelease(pData);
×
2773
          return;
×
2774
        }
2775

2776
        pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode);
4,140✔
2777

2778
        SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
4,140✔
2779
        pSyncMsg->srcId = pSyncNode->myRaftId;
4,140✔
2780
        pSyncMsg->destId = pData->destId;
4,140✔
2781
        pSyncMsg->term = raftStoreGetTerm(pSyncNode);
4,140✔
2782
        pSyncMsg->commitIndex = pSyncNode->commitIndex;
4,140✔
2783
        pSyncMsg->minMatchIndex = pSyncNode->minMatchIndex;
4,140✔
2784
        pSyncMsg->privateTerm = 0;
4,140✔
2785
        pSyncMsg->timeStamp = tsNow;
4,140✔
2786

2787
        // update reset time
2788
        int64_t timerElapsed = tsNow - pSyncTimer->timeStamp;
4,140✔
2789
        pSyncTimer->timeStamp = tsNow;
4,140✔
2790

2791
        // send msg
2792
        TRACE_SET_MSGID(&(rpcMsg.info.traceId), tGenIdPI64());
4,140✔
2793
        TRACE_SET_ROOTID(&(rpcMsg.info.traceId), tGenIdPI64());
4,140✔
2794
        sGTrace(&rpcMsg.info.traceId, "vgId:%d, send sync-heartbeat to dnode:%d", pSyncNode->vgId,
4,140!
2795
                DID(&(pSyncMsg->destId)));
2796
        syncLogSendHeartbeat(pSyncNode, pSyncMsg, false, timerElapsed, pData->execTime);
4,140✔
2797
        int ret = syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
4,140✔
2798
        if (ret != 0) {
4,140!
2799
          sError("vgId:%d, failed to send heartbeat since %s", pSyncNode->vgId, tstrerror(ret));
×
2800
        }
2801
      }
2802

2803
      if (syncIsInit()) {
4,149!
2804
        sTrace("vgId:%d, reset peer hb timer at %d", pSyncNode->vgId, pSyncTimer->timerMS);
4,149!
2805
        bool stopped = taosTmrResetPriority(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS, (void*)hbDataRid,
4,149✔
2806
                                            syncEnv()->pTimerManager, &pSyncTimer->pTimer, 2);
4,149✔
2807
        if (stopped) sError("vgId:%d, reset peer hb timer error, %s", pSyncNode->vgId, tstrerror(code));
4,149!
2808

2809
      } else {
2810
        sError("sync env is stop, reset peer hb timer error");
×
2811
      }
2812

2813
    } else {
2814
      sTrace("vgId:%d, do not send hb, timerLogicClock:%" PRId64 ", msgLogicClock:%" PRId64, pSyncNode->vgId,
×
2815
             timerLogicClock, msgLogicClock);
2816
    }
2817
  }
2818

2819
  syncHbTimerDataRelease(pData);
4,149✔
2820
  syncNodeRelease(pSyncNode);
4,149✔
2821
}
2822

2823
#ifdef BUILD_NO_CALL
2824
static void deleteCacheEntry(const void* key, size_t keyLen, void* value, void* ud) {
2825
  (void)ud;
2826
  taosMemoryFree(value);
2827
}
2828

2829
int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) {
2830
  SSyncLogStoreData* pData = pLogStore->data;
2831
  sNTrace(pData->pSyncNode, "in cache index:%" PRId64 ", bytes:%u, %p", pEntry->index, pEntry->bytes, pEntry);
2832

2833
  int32_t   code = 0;
2834
  int32_t   entryLen = sizeof(*pEntry) + pEntry->dataLen;
2835
  LRUStatus status = taosLRUCacheInsert(pLogStore->pCache, &pEntry->index, sizeof(pEntry->index), pEntry, entryLen,
2836
                                        deleteCacheEntry, h, TAOS_LRU_PRIORITY_LOW, NULL);
2837
  if (status != TAOS_LRU_STATUS_OK) {
2838
    code = -1;
2839
  }
2840

2841
  return code;
2842
}
2843
#endif
2844

2845
void syncBuildConfigFromReq(SAlterVnodeReplicaReq* pReq, SSyncCfg* cfg) {  // TODO SAlterVnodeReplicaReq name is proper?
×
2846
  cfg->replicaNum = 0;
×
2847
  cfg->totalReplicaNum = 0;
×
2848
  int32_t code = 0;
×
2849

2850
  for (int i = 0; i < pReq->replica; ++i) {
×
2851
    SNodeInfo* pNode = &cfg->nodeInfo[i];
×
2852
    pNode->nodeId = pReq->replicas[i].id;
×
2853
    pNode->nodePort = pReq->replicas[i].port;
×
2854
    tstrncpy(pNode->nodeFqdn, pReq->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
×
2855
    pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
×
2856
    bool update = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
×
2857
    sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d, update:%d", pReq->vgId, i, pNode->nodeFqdn,
×
2858
          pNode->nodePort, pNode->nodeId, pNode->nodeRole, update);
2859
    cfg->replicaNum++;
×
2860
  }
2861
  if (pReq->selfIndex != -1) {
×
2862
    cfg->myIndex = pReq->selfIndex;
×
2863
  }
2864
  for (int i = cfg->replicaNum; i < pReq->replica + pReq->learnerReplica; ++i) {
×
2865
    SNodeInfo* pNode = &cfg->nodeInfo[i];
×
2866
    pNode->nodeId = pReq->learnerReplicas[cfg->totalReplicaNum].id;
×
2867
    pNode->nodePort = pReq->learnerReplicas[cfg->totalReplicaNum].port;
×
2868
    pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
2869
    tstrncpy(pNode->nodeFqdn, pReq->learnerReplicas[cfg->totalReplicaNum].fqdn, sizeof(pNode->nodeFqdn));
×
2870
    bool update = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
×
2871
    sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d, update:%d", pReq->vgId, i, pNode->nodeFqdn,
×
2872
          pNode->nodePort, pNode->nodeId, pNode->nodeRole, update);
2873
    cfg->totalReplicaNum++;
×
2874
  }
2875
  cfg->totalReplicaNum += pReq->replica;
×
2876
  if (pReq->learnerSelfIndex != -1) {
×
2877
    cfg->myIndex = pReq->replica + pReq->learnerSelfIndex;
×
2878
  }
2879
  cfg->changeVersion = pReq->changeVersion;
×
2880
}
×
2881

2882
int32_t syncNodeCheckChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry) {
×
2883
  int32_t code = 0;
×
2884
  if (pEntry->originalRpcType != TDMT_SYNC_CONFIG_CHANGE) {
×
2885
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
2886
  }
2887

2888
  SMsgHead* head = (SMsgHead*)pEntry->data;
×
2889
  void*     pReq = POINTER_SHIFT(head, sizeof(SMsgHead));
×
2890

2891
  SAlterVnodeTypeReq req = {0};
×
2892
  if (tDeserializeSAlterVnodeReplicaReq(pReq, head->contLen, &req) != 0) {
×
2893
    code = TSDB_CODE_INVALID_MSG;
×
2894
    TAOS_RETURN(code);
×
2895
  }
2896

2897
  SSyncCfg cfg = {0};
×
2898
  syncBuildConfigFromReq(&req, &cfg);
×
2899

2900
  if (cfg.totalReplicaNum >= 1 &&
×
2901
      (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER)) {
×
2902
    bool incfg = false;
×
2903
    for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
×
2904
      if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
×
2905
          ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
×
2906
        incfg = true;
×
2907
        break;
×
2908
      }
2909
    }
2910

2911
    if (!incfg) {
×
2912
      SyncTerm currentTerm = raftStoreGetTerm(ths);
×
2913
      SRaftId  id = EMPTY_RAFT_ID;
×
2914
      syncNodeStepDown(ths, currentTerm, id);
×
2915
      return 1;
×
2916
    }
2917
  }
2918
  return 0;
×
2919
}
2920

2921
void syncNodeLogConfigInfo(SSyncNode* ths, SSyncCfg* cfg, char* str) {
×
2922
  sInfo("vgId:%d, %s. SyncNode, replicaNum:%d, peersNum:%d, lastConfigIndex:%" PRId64
×
2923
        ", changeVersion:%d, "
2924
        "restoreFinish:%d",
2925
        ths->vgId, str, ths->replicaNum, ths->peersNum, ths->raftCfg.lastConfigIndex, ths->raftCfg.cfg.changeVersion,
2926
        ths->restoreFinish);
2927

2928
  sInfo("vgId:%d, %s, myNodeInfo, clusterId:0x%" PRIx64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str,
×
2929
        ths->myNodeInfo.clusterId, ths->myNodeInfo.nodeId, ths->myNodeInfo.nodeFqdn, ths->myNodeInfo.nodePort,
2930
        ths->myNodeInfo.nodeRole);
2931

2932
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
2933
    sInfo("vgId:%d, %s, peersNodeInfo%d, clusterId:0x%" PRIx64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str,
×
2934
          i, ths->peersNodeInfo[i].clusterId, ths->peersNodeInfo[i].nodeId, ths->peersNodeInfo[i].nodeFqdn,
2935
          ths->peersNodeInfo[i].nodePort, ths->peersNodeInfo[i].nodeRole);
2936
  }
2937

2938
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
2939
    char    buf[256];
2940
    int32_t len = 256;
×
2941
    int32_t n = 0;
×
2942
    n += tsnprintf(buf + n, len - n, "%s", "{");
×
2943
    for (int i = 0; i < ths->peersEpset->numOfEps; i++) {
×
2944
      n += tsnprintf(buf + n, len - n, "%s:%d%s", ths->peersEpset->eps[i].fqdn, ths->peersEpset->eps[i].port,
×
2945
                     (i + 1 < ths->peersEpset->numOfEps ? ", " : ""));
×
2946
    }
2947
    n += tsnprintf(buf + n, len - n, "%s", "}");
×
2948

2949
    sInfo("vgId:%d, %s, peersEpset%d, %s, inUse:%d", ths->vgId, str, i, buf, ths->peersEpset->inUse);
×
2950
  }
2951

2952
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
2953
    sInfo("vgId:%d, %s, peersId%d, addr:0x%" PRIx64, ths->vgId, str, i, ths->peersId[i].addr);
×
2954
  }
2955

2956
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
2957
    sInfo("vgId:%d, %s, nodeInfo%d, clusterId:0x%" PRIx64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str, i,
×
2958
          ths->raftCfg.cfg.nodeInfo[i].clusterId, ths->raftCfg.cfg.nodeInfo[i].nodeId,
2959
          ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, ths->raftCfg.cfg.nodeInfo[i].nodePort,
2960
          ths->raftCfg.cfg.nodeInfo[i].nodeRole);
2961
  }
2962

2963
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
2964
    sInfo("vgId:%d, %s, replicasId%d, addr:0x%" PRIx64, ths->vgId, str, i, ths->replicasId[i].addr);
×
2965
  }
2966
}
×
2967

2968
int32_t syncNodeRebuildPeerAndCfg(SSyncNode* ths, SSyncCfg* cfg) {
×
2969
  int32_t i = 0;
×
2970

2971
  // change peersNodeInfo
2972
  i = 0;
×
2973
  for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
2974
    if (!(strcmp(ths->myNodeInfo.nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
2975
          ths->myNodeInfo.nodePort == cfg->nodeInfo[j].nodePort)) {
×
2976
      ths->peersNodeInfo[i].nodeRole = cfg->nodeInfo[j].nodeRole;
×
2977
      ths->peersNodeInfo[i].clusterId = cfg->nodeInfo[j].clusterId;
×
2978
      tstrncpy(ths->peersNodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn, TSDB_FQDN_LEN);
×
2979
      ths->peersNodeInfo[i].nodeId = cfg->nodeInfo[j].nodeId;
×
2980
      ths->peersNodeInfo[i].nodePort = cfg->nodeInfo[j].nodePort;
×
2981

2982
      syncUtilNodeInfo2EpSet(&ths->peersNodeInfo[i], &ths->peersEpset[i]);
×
2983

2984
      if (syncUtilNodeInfo2RaftId(&ths->peersNodeInfo[i], ths->vgId, &ths->peersId[i]) == false) {
×
2985
        sError("vgId:%d, failed to determine raft member id, peer:%d", ths->vgId, i);
×
2986
        return terrno;
×
2987
      }
2988

2989
      i++;
×
2990
    }
2991
  }
2992
  ths->peersNum = i;
×
2993

2994
  // change cfg nodeInfo
2995
  ths->raftCfg.cfg.replicaNum = 0;
×
2996
  i = 0;
×
2997
  for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
2998
    if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
2999
      ths->raftCfg.cfg.replicaNum++;
×
3000
    }
3001
    ths->raftCfg.cfg.nodeInfo[i].nodeRole = cfg->nodeInfo[j].nodeRole;
×
3002
    ths->raftCfg.cfg.nodeInfo[i].clusterId = cfg->nodeInfo[j].clusterId;
×
3003
    tstrncpy(ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn, TSDB_FQDN_LEN);
×
3004
    ths->raftCfg.cfg.nodeInfo[i].nodeId = cfg->nodeInfo[j].nodeId;
×
3005
    ths->raftCfg.cfg.nodeInfo[i].nodePort = cfg->nodeInfo[j].nodePort;
×
3006
    if ((strcmp(ths->myNodeInfo.nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
3007
         ths->myNodeInfo.nodePort == cfg->nodeInfo[j].nodePort)) {
×
3008
      ths->raftCfg.cfg.myIndex = i;
×
3009
    }
3010
    i++;
×
3011
  }
3012
  ths->raftCfg.cfg.totalReplicaNum = i;
×
3013

3014
  return 0;
×
3015
}
3016

3017
void syncNodeChangePeerAndCfgToVoter(SSyncNode* ths, SSyncCfg* cfg) {
×
3018
  // change peersNodeInfo
3019
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
3020
    for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
3021
      if (strcmp(ths->peersNodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
3022
          ths->peersNodeInfo[i].nodePort == cfg->nodeInfo[j].nodePort) {
×
3023
        if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3024
          ths->peersNodeInfo[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3025
        }
3026
      }
3027
    }
3028
  }
3029

3030
  // change cfg nodeInfo
3031
  ths->raftCfg.cfg.replicaNum = 0;
×
3032
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3033
    for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
3034
      if (strcmp(ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
3035
          ths->raftCfg.cfg.nodeInfo[i].nodePort == cfg->nodeInfo[j].nodePort) {
×
3036
        if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3037
          ths->raftCfg.cfg.nodeInfo[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3038
          ths->raftCfg.cfg.replicaNum++;
×
3039
        }
3040
      }
3041
    }
3042
  }
3043
}
×
3044

3045
int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum) {
×
3046
  int32_t code = 0;
×
3047
  // 1.rebuild replicasId, remove deleted one
3048
  SRaftId oldReplicasId[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
3049
  memcpy(oldReplicasId, ths->replicasId, sizeof(oldReplicasId));
×
3050

3051
  ths->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3052
  ths->totalReplicaNum = ths->raftCfg.cfg.totalReplicaNum;
×
3053
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3054
    if (syncUtilNodeInfo2RaftId(&ths->raftCfg.cfg.nodeInfo[i], ths->vgId, &ths->replicasId[i]) == false) return terrno;
×
3055
  }
3056

3057
  // 2.rebuild MatchIndex, remove deleted one
3058
  SSyncIndexMgr* oldIndex = ths->pMatchIndex;
×
3059

3060
  ths->pMatchIndex = syncIndexMgrCreate(ths);
×
3061
  if (ths->pMatchIndex == NULL) {
×
3062
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
3063
    if (terrno != 0) code = terrno;
×
3064
    TAOS_RETURN(code);
×
3065
  }
3066

3067
  syncIndexMgrCopyIfExist(ths->pMatchIndex, oldIndex, oldReplicasId);
×
3068

3069
  syncIndexMgrDestroy(oldIndex);
×
3070

3071
  // 3.rebuild NextIndex, remove deleted one
3072
  SSyncIndexMgr* oldNextIndex = ths->pNextIndex;
×
3073

3074
  ths->pNextIndex = syncIndexMgrCreate(ths);
×
3075
  if (ths->pNextIndex == NULL) {
×
3076
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
3077
    if (terrno != 0) code = terrno;
×
3078
    TAOS_RETURN(code);
×
3079
  }
3080

3081
  syncIndexMgrCopyIfExist(ths->pNextIndex, oldNextIndex, oldReplicasId);
×
3082

3083
  syncIndexMgrDestroy(oldNextIndex);
×
3084

3085
  // 4.rebuild pVotesGranted, pVotesRespond, no need to keep old vote state, only rebuild
3086
  voteGrantedUpdate(ths->pVotesGranted, ths);
×
3087
  votesRespondUpdate(ths->pVotesRespond, ths);
×
3088

3089
  // 5.rebuild logReplMgr
3090
  for (int i = 0; i < oldtotalReplicaNum; ++i) {
×
3091
    sDebug("vgId:%d, old logReplMgrs i:%d, peerId:%d, restoreed:%d, [%" PRId64 " %" PRId64 ", %" PRId64 ")", ths->vgId,
×
3092
           i, ths->logReplMgrs[i]->peerId, ths->logReplMgrs[i]->restored, ths->logReplMgrs[i]->startIndex,
3093
           ths->logReplMgrs[i]->matchIndex, ths->logReplMgrs[i]->endIndex);
3094
  }
3095

3096
  SSyncLogReplMgr* oldLogReplMgrs = NULL;
×
3097
  int64_t          length = sizeof(SSyncLogReplMgr) * (TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA);
×
3098
  oldLogReplMgrs = taosMemoryMalloc(length);
×
3099
  if (NULL == oldLogReplMgrs) return terrno;
×
3100
  memset(oldLogReplMgrs, 0, length);
×
3101

3102
  for (int i = 0; i < oldtotalReplicaNum; i++) {
×
3103
    oldLogReplMgrs[i] = *(ths->logReplMgrs[i]);
×
3104
  }
3105

3106
  syncNodeLogReplDestroy(ths);
×
3107
  if ((code = syncNodeLogReplInit(ths)) != 0) {
×
3108
    taosMemoryFree(oldLogReplMgrs);
×
3109
    TAOS_RETURN(code);
×
3110
  }
3111

3112
  for (int i = 0; i < ths->totalReplicaNum; ++i) {
×
3113
    for (int j = 0; j < oldtotalReplicaNum; j++) {
×
3114
      if (syncUtilSameId(&ths->replicasId[i], &oldReplicasId[j])) {
×
3115
        *(ths->logReplMgrs[i]) = oldLogReplMgrs[j];
×
3116
        ths->logReplMgrs[i]->peerId = i;
×
3117
      }
3118
    }
3119
  }
3120

3121
  for (int i = 0; i < ths->totalReplicaNum; ++i) {
×
3122
    sDebug("vgId:%d, new logReplMgrs i:%d, peerId:%d, restoreed:%d, [%" PRId64 " %" PRId64 ", %" PRId64 ")", ths->vgId,
×
3123
           i, ths->logReplMgrs[i]->peerId, ths->logReplMgrs[i]->restored, ths->logReplMgrs[i]->startIndex,
3124
           ths->logReplMgrs[i]->matchIndex, ths->logReplMgrs[i]->endIndex);
3125
  }
3126

3127
  // 6.rebuild sender
3128
  for (int i = 0; i < oldtotalReplicaNum; ++i) {
×
3129
    sDebug("vgId:%d, old sender i:%d, replicaIndex:%d, lastSendTime:%" PRId64, ths->vgId, i,
×
3130
           ths->senders[i]->replicaIndex, ths->senders[i]->lastSendTime)
3131
  }
3132

3133
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
×
3134
    if (ths->senders[i] != NULL) {
×
3135
      sDebug("vgId:%d, snapshot sender destroy while close, data:%p", ths->vgId, ths->senders[i]);
×
3136

3137
      if (snapshotSenderIsStart(ths->senders[i])) {
×
3138
        snapshotSenderStop(ths->senders[i], false);
×
3139
      }
3140

3141
      snapshotSenderDestroy(ths->senders[i]);
×
3142
      ths->senders[i] = NULL;
×
3143
    }
3144
  }
3145

3146
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
×
3147
    SSyncSnapshotSender* pSender = NULL;
×
3148
    int32_t              code = snapshotSenderCreate(ths, i, &pSender);
×
3149
    if (pSender == NULL) return terrno = code;
×
3150

3151
    ths->senders[i] = pSender;
×
3152
    sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender);
×
3153
  }
3154

3155
  for (int i = 0; i < ths->totalReplicaNum; i++) {
×
3156
    sDebug("vgId:%d, new sender i:%d, replicaIndex:%d, lastSendTime:%" PRId64, ths->vgId, i,
×
3157
           ths->senders[i]->replicaIndex, ths->senders[i]->lastSendTime)
3158
  }
3159

3160
  // 7.rebuild synctimer
3161
  if ((code = syncNodeStopHeartbeatTimer(ths)) != 0) {
×
3162
    taosMemoryFree(oldLogReplMgrs);
×
3163
    TAOS_RETURN(code);
×
3164
  }
3165

3166
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
×
3167
    if ((code = syncHbTimerInit(ths, &ths->peerHeartbeatTimerArr[i], ths->replicasId[i])) != 0) {
×
3168
      taosMemoryFree(oldLogReplMgrs);
×
3169
      TAOS_RETURN(code);
×
3170
    }
3171
  }
3172

3173
  if ((code = syncNodeStartHeartbeatTimer(ths)) != 0) {
×
3174
    taosMemoryFree(oldLogReplMgrs);
×
3175
    TAOS_RETURN(code);
×
3176
  }
3177

3178
  // 8.rebuild peerStates
3179
  SPeerState oldState[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA] = {0};
×
3180
  for (int i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; i++) {
×
3181
    oldState[i] = ths->peerStates[i];
×
3182
  }
3183

3184
  for (int i = 0; i < ths->totalReplicaNum; i++) {
×
3185
    for (int j = 0; j < oldtotalReplicaNum; j++) {
×
3186
      if (syncUtilSameId(&ths->replicasId[i], &oldReplicasId[j])) {
×
3187
        ths->peerStates[i] = oldState[j];
×
3188
      }
3189
    }
3190
  }
3191

3192
  taosMemoryFree(oldLogReplMgrs);
×
3193

3194
  return 0;
×
3195
}
3196

3197
void syncNodeChangeToVoter(SSyncNode* ths) {
×
3198
  // replicasId, only need to change replicaNum when 1->3
3199
  ths->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3200
  sDebug("vgId:%d, totalReplicaNum:%d", ths->vgId, ths->totalReplicaNum);
×
3201
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
×
3202
    sDebug("vgId:%d, i:%d, replicaId.addr:0x%" PRIx64, ths->vgId, i, ths->replicasId[i].addr);
×
3203
  }
3204

3205
  // pMatchIndex, pNextIndex, only need to change replicaNum when 1->3
3206
  ths->pMatchIndex->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3207
  ths->pNextIndex->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3208

3209
  sDebug("vgId:%d, pMatchIndex->totalReplicaNum:%d", ths->vgId, ths->pMatchIndex->totalReplicaNum);
×
3210
  for (int32_t i = 0; i < ths->pMatchIndex->totalReplicaNum; ++i) {
×
3211
    sDebug("vgId:%d, i:%d, match.index:%" PRId64, ths->vgId, i, ths->pMatchIndex->index[i]);
×
3212
  }
3213

3214
  // pVotesGranted, pVotesRespond
3215
  voteGrantedUpdate(ths->pVotesGranted, ths);
×
3216
  votesRespondUpdate(ths->pVotesRespond, ths);
×
3217

3218
  // logRepMgrs
3219
  // no need to change logRepMgrs when 1->3
3220
}
×
3221

3222
void syncNodeResetPeerAndCfg(SSyncNode* ths) {
×
3223
  SNodeInfo node = {0};
×
3224
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
3225
    memcpy(&ths->peersNodeInfo[i], &node, sizeof(SNodeInfo));
×
3226
  }
3227

3228
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3229
    memcpy(&ths->raftCfg.cfg.nodeInfo[i], &node, sizeof(SNodeInfo));
×
3230
  }
3231
}
×
3232

3233
int32_t syncNodeChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry, char* str) {
×
3234
  int32_t code = 0;
×
3235
  if (pEntry->originalRpcType != TDMT_SYNC_CONFIG_CHANGE) {
×
3236
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3237
  }
3238

3239
  SMsgHead* head = (SMsgHead*)pEntry->data;
×
3240
  void*     pReq = POINTER_SHIFT(head, sizeof(SMsgHead));
×
3241

3242
  SAlterVnodeTypeReq req = {0};
×
3243
  if (tDeserializeSAlterVnodeReplicaReq(pReq, head->contLen, &req) != 0) {
×
3244
    code = TSDB_CODE_INVALID_MSG;
×
3245
    TAOS_RETURN(code);
×
3246
  }
3247

3248
  SSyncCfg cfg = {0};
×
3249
  syncBuildConfigFromReq(&req, &cfg);
×
3250

3251
  if (cfg.changeVersion <= ths->raftCfg.cfg.changeVersion) {
×
3252
    sInfo(
×
3253
        "vgId:%d, skip conf change entry since lower version. "
3254
        "this entry, index:%" PRId64 ", term:%" PRId64
3255
        ", totalReplicaNum:%d, changeVersion:%d; "
3256
        "current node, replicaNum:%d, peersNum:%d, lastConfigIndex:%" PRId64 ", changeVersion:%d",
3257
        ths->vgId, pEntry->index, pEntry->term, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum, ths->peersNum,
3258
        ths->raftCfg.lastConfigIndex, ths->raftCfg.cfg.changeVersion);
3259
    return 0;
×
3260
  }
3261

3262
  if (strcmp(str, "Commit") == 0) {
×
3263
    sInfo(
×
3264
        "vgId:%d, change config from %s. "
3265
        "this, i:%" PRId64
3266
        ", trNum:%d, vers:%d; "
3267
        "node, rNum:%d, pNum:%d, trNum:%d, "
3268
        "buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
3269
        "), "
3270
        "cond:(next i:%" PRId64 ", t:%" PRId64 " ==%s)",
3271
        ths->vgId, str, pEntry->index - 1, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum, ths->peersNum,
3272
        ths->totalReplicaNum, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex, ths->pLogBuf->matchIndex,
3273
        ths->pLogBuf->endIndex, pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType));
3274
  } else {
3275
    sInfo(
×
3276
        "vgId:%d, change config from %s. "
3277
        "this, i:%" PRId64 ", t:%" PRId64
3278
        ", trNum:%d, vers:%d; "
3279
        "node, rNum:%d, pNum:%d, trNum:%d, "
3280
        "buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
3281
        "), "
3282
        "cond:(pre i:%" PRId64 "==ci:%" PRId64 ", bci:%" PRId64 ")",
3283
        ths->vgId, str, pEntry->index, pEntry->term, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum,
3284
        ths->peersNum, ths->totalReplicaNum, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
3285
        ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex, pEntry->index - 1, ths->commitIndex,
3286
        ths->pLogBuf->commitIndex);
3287
  }
3288

3289
  syncNodeLogConfigInfo(ths, &cfg, "before config change");
×
3290

3291
  int32_t oldTotalReplicaNum = ths->totalReplicaNum;
×
3292

3293
  if (cfg.totalReplicaNum == 1 || cfg.totalReplicaNum == 2) {  // remove replica
×
3294

3295
    bool incfg = false;
×
3296
    for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
×
3297
      if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
×
3298
          ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
×
3299
        incfg = true;
×
3300
        break;
×
3301
      }
3302
    }
3303

3304
    if (incfg) {  // remove other
×
3305
      syncNodeResetPeerAndCfg(ths);
×
3306

3307
      // no need to change myNodeInfo
3308

3309
      if ((code = syncNodeRebuildPeerAndCfg(ths, &cfg)) != 0) {
×
3310
        TAOS_RETURN(code);
×
3311
      };
3312

3313
      if ((code = syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum)) != 0) {
×
3314
        TAOS_RETURN(code);
×
3315
      };
3316
    } else {  // remove myself
3317
      // no need to do anything actually, to change the following to reduce distruptive server chance
3318

3319
      syncNodeResetPeerAndCfg(ths);
×
3320

3321
      // change myNodeInfo
3322
      ths->myNodeInfo.nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3323

3324
      // change peer and cfg
3325
      ths->peersNum = 0;
×
3326
      memcpy(&ths->raftCfg.cfg.nodeInfo[0], &ths->myNodeInfo, sizeof(SNodeInfo));
×
3327
      ths->raftCfg.cfg.replicaNum = 0;
×
3328
      ths->raftCfg.cfg.totalReplicaNum = 1;
×
3329

3330
      // change other
3331
      if ((code = syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum)) != 0) {
×
3332
        TAOS_RETURN(code);
×
3333
      }
3334

3335
      // change state
3336
      ths->state = TAOS_SYNC_STATE_LEARNER;
×
3337
    }
3338

3339
    ths->restoreFinish = false;
×
3340
  } else {                            // add replica, or change replica type
3341
    if (ths->totalReplicaNum == 3) {  // change replica type
×
3342
      sInfo("vgId:%d, begin change replica type", ths->vgId);
×
3343

3344
      // change myNodeInfo
3345
      for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
×
3346
        if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
×
3347
            ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
×
3348
          if (cfg.nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3349
            ths->myNodeInfo.nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3350
          }
3351
        }
3352
      }
3353

3354
      // change peer and cfg
3355
      syncNodeChangePeerAndCfgToVoter(ths, &cfg);
×
3356

3357
      // change other
3358
      syncNodeChangeToVoter(ths);
×
3359

3360
      // change state
3361
      if (ths->state == TAOS_SYNC_STATE_LEARNER) {
×
3362
        if (ths->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3363
          ths->state = TAOS_SYNC_STATE_FOLLOWER;
×
3364
        }
3365
      }
3366

3367
      ths->restoreFinish = false;
×
3368
    } else {  // add replica
3369
      sInfo("vgId:%d, begin add replica", ths->vgId);
×
3370

3371
      // no need to change myNodeInfo
3372

3373
      // change peer and cfg
3374
      if ((code = syncNodeRebuildPeerAndCfg(ths, &cfg)) != 0) {
×
3375
        TAOS_RETURN(code);
×
3376
      };
3377

3378
      // change other
3379
      if ((code = syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum)) != 0) {
×
3380
        TAOS_RETURN(code);
×
3381
      };
3382

3383
      // no need to change state
3384

3385
      if (ths->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_LEARNER) {
×
3386
        ths->restoreFinish = false;
×
3387
      }
3388
    }
3389
  }
3390

3391
  ths->quorum = syncUtilQuorum(ths->replicaNum);
×
3392

3393
  ths->raftCfg.lastConfigIndex = pEntry->index;
×
3394
  ths->raftCfg.cfg.lastIndex = pEntry->index;
×
3395
  ths->raftCfg.cfg.changeVersion = cfg.changeVersion;
×
3396

3397
  syncNodeLogConfigInfo(ths, &cfg, "after config change");
×
3398

3399
  if ((code = syncWriteCfgFile(ths)) != 0) {
×
3400
    sError("vgId:%d, failed to create sync cfg file", ths->vgId);
×
3401
    TAOS_RETURN(code);
×
3402
  };
3403

3404
  TAOS_RETURN(code);
×
3405
}
3406

3407
int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry, SRpcMsg* pMsg) {
63,689✔
3408
  int32_t code = -1;
63,689✔
3409
  if (pEntry->dataLen < sizeof(SMsgHead)) {
63,689!
3410
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
3411
    sError("vgId:%d, msg:%p, cannot append an invalid client request with no msg head, type:%s dataLen:%d", ths->vgId,
×
3412
           pMsg, TMSG_INFO(pEntry->originalRpcType), pEntry->dataLen);
3413
    syncEntryDestroy(pEntry);
×
3414
    pEntry = NULL;
×
3415
    goto _out;
×
3416
  }
3417

3418
  // append to log buffer
3419
  if ((code = syncLogBufferAppend(ths->pLogBuf, ths, pEntry)) < 0) {
63,689✔
3420
    sError("vgId:%d, index:%" PRId64 ", failed to enqueue sync log buffer", ths->vgId, pEntry->index);
3!
3421
    int32_t ret = 0;
3✔
3422
    if ((ret = syncFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, terrno, false)) != 0) {
3!
3423
      sError("vgId:%d, index:%" PRId64 ", failed to execute fsm since %s", ths->vgId, pEntry->index, tstrerror(ret));
×
3424
    }
3425
    syncEntryDestroy(pEntry);
×
3426
    pEntry = NULL;
×
3427
    goto _out;
×
3428
  }
3429

3430
  code = 0;
63,684✔
3431
_out:;
63,684✔
3432
  // proceed match index, with replicating on needed
3433
  SyncIndex matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, NULL, "Append", pMsg);
63,684✔
3434
  const STraceId* trace = pEntry ? &pEntry->originRpcTraceId : NULL;
63,691!
3435

3436
  if (pEntry != NULL) {
63,691!
3437
    sGDebug(trace,
63,692!
3438
            "vgId:%d, index:%" PRId64 ", raft entry appended, msg:%p term:%" PRId64 " buf:[%" PRId64 " %" PRId64
3439
            " %" PRId64 ", %" PRId64 ")",
3440
            ths->vgId, pEntry->index, pMsg, pEntry->term, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
3441
            ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex);
3442
  }
3443

3444
  if (code == 0 && ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
63,691!
3445
    int64_t index = syncNodeUpdateAssignedCommitIndex(ths, matchIndex);
×
3446
    sGTrace(trace, "vgId:%d, index:%" PRId64 ", update assigned commit, msg:%p", ths->vgId, index, pMsg);
×
3447

3448
    if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
×
3449
        syncLogBufferCommit(ths->pLogBuf, ths, ths->assignedCommitIndex, trace, "append-entry") < 0) {
×
3450
      sGError(trace, "vgId:%d, index:%" PRId64 ", failed to commit, msg:%p commit index:%" PRId64, ths->vgId, index,
×
3451
              pMsg, ths->commitIndex);
3452
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
3453
    }
3454
  }
3455

3456
  // multi replica
3457
  if (ths->replicaNum > 1) {
63,691✔
3458
    TAOS_RETURN(code);
27,865✔
3459
  }
3460

3461
  // single replica
3462
  SyncIndex returnIndex = syncNodeUpdateCommitIndex(ths, matchIndex);
35,826✔
3463
  sGTrace(trace, "vgId:%d, index:%" PRId64 ", raft entry update commit, msg:%p return index:%" PRId64, ths->vgId,
35,822!
3464
          matchIndex, pMsg, returnIndex);
3465

3466
  if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
71,644!
3467
      (code = syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex, trace, "append-entry")) < 0) {
35,820✔
3468
    sGError(trace,
×
3469
            "vgId:%d, index:%" PRId64 ", failed to commit, msg:%p commit index:%" PRId64 " return index:%" PRId64,
3470
            ths->vgId, matchIndex, pMsg, ths->commitIndex, returnIndex);
3471
  }
3472

3473
  TAOS_RETURN(code);
35,824✔
3474
}
3475

3476
bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode) {
63,430✔
3477
  if (pSyncNode->totalReplicaNum == 1) {
63,430✔
3478
    return false;
35,557✔
3479
  }
3480

3481
  int32_t toCount = 0;
27,873✔
3482
  int64_t tsNow = taosGetTimestampMs();
27,873✔
3483
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
83,597✔
3484
    if (pSyncNode->peersNodeInfo[i].nodeRole == TAOS_SYNC_ROLE_LEARNER) {
55,725✔
3485
      continue;
14✔
3486
    }
3487
    int64_t recvTime = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
55,711✔
3488
    if (recvTime == 0 || recvTime == -1) {
55,710!
3489
      continue;
×
3490
    }
3491

3492
    if (tsNow - recvTime > tsHeartbeatTimeout) {
55,710!
3493
      toCount++;
×
3494
    }
3495
  }
3496

3497
  bool b = (toCount >= pSyncNode->quorum ? true : false);
27,872✔
3498

3499
  return b;
27,872✔
3500
}
3501

3502
bool syncNodeSnapshotSending(SSyncNode* pSyncNode) {
×
3503
  if (pSyncNode == NULL) return false;
×
3504
  bool b = false;
×
3505
  for (int32_t i = 0; i < pSyncNode->totalReplicaNum; ++i) {
×
3506
    if (pSyncNode->senders[i] != NULL && pSyncNode->senders[i]->start) {
×
3507
      b = true;
×
3508
      break;
×
3509
    }
3510
  }
3511
  return b;
×
3512
}
3513

3514
bool syncNodeSnapshotRecving(SSyncNode* pSyncNode) {
×
3515
  if (pSyncNode == NULL) return false;
×
3516
  if (pSyncNode->pNewNodeReceiver == NULL) return false;
×
3517
  if (pSyncNode->pNewNodeReceiver->start) return true;
×
3518
  return false;
×
3519
}
3520

3521
static int32_t syncNodeAppendNoop(SSyncNode* ths) {
282✔
3522
  int32_t   code = 0;
282✔
3523
  SyncIndex index = syncLogBufferGetEndIndex(ths->pLogBuf);
282✔
3524
  SyncTerm  term = raftStoreGetTerm(ths);
282✔
3525

3526
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
282✔
3527
  if (pEntry == NULL) {
282!
3528
    code = TSDB_CODE_OUT_OF_MEMORY;
×
3529
    TAOS_RETURN(code);
×
3530
  }
3531

3532
  code = syncNodeAppend(ths, pEntry, NULL);
282✔
3533
  TAOS_RETURN(code);
282✔
3534
}
3535

3536
#ifdef BUILD_NO_CALL
3537
static int32_t syncNodeAppendNoopOld(SSyncNode* ths) {
3538
  int32_t ret = 0;
3539

3540
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
3541
  SyncTerm        term = raftStoreGetTerm(ths);
3542
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
3543
  if (pEntry == NULL) return -1;
3544

3545
  LRUHandle* h = NULL;
3546

3547
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
3548
    int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry, false);
3549
    if (code != 0) {
3550
      sError("append noop error");
3551
      return -1;
3552
    }
3553

3554
    syncCacheEntry(ths->pLogStore, pEntry, &h);
3555
  }
3556

3557
  if (h) {
3558
    taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
3559
  } else {
3560
    syncEntryDestroy(pEntry);
3561
  }
3562

3563
  return ret;
3564
}
3565
#endif
3566

3567
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
4,089✔
3568
  SyncHeartbeat* pMsg = pRpcMsg->pCont;
4,089✔
3569
  bool           resetElect = false;
4,089✔
3570

3571
  int64_t tsMs = taosGetTimestampMs();
4,089✔
3572
  int64_t timeDiff = tsMs - pMsg->timeStamp;
4,089✔
3573
  syncLogRecvHeartbeat(ths, pMsg, timeDiff, &pRpcMsg->info.traceId);
4,089✔
3574

3575
  if (!syncNodeInRaftGroup(ths, &pMsg->srcId)) {
4,089!
3576
    sWarn(
×
3577
        "vgId:%d, drop heartbeat msg from dnode:%d, because it come from another cluster:%d, differ from current "
3578
        "cluster:%d",
3579
        ths->vgId, DID(&(pMsg->srcId)), CID(&(pMsg->srcId)), CID(&(ths->myRaftId)));
3580
    return 0;
×
3581
  }
3582

3583
  SRpcMsg rpcMsg = {0};
4,089✔
3584
  TAOS_CHECK_RETURN(syncBuildHeartbeatReply(&rpcMsg, ths->vgId));
4,089!
3585
  SyncTerm currentTerm = raftStoreTryGetTerm(ths);
4,089✔
3586

3587
  SyncHeartbeatReply* pMsgReply = rpcMsg.pCont;
4,089✔
3588
  pMsgReply->destId = pMsg->srcId;
4,089✔
3589
  pMsgReply->srcId = ths->myRaftId;
4,089✔
3590
  pMsgReply->term = currentTerm;
4,089✔
3591
  pMsgReply->privateTerm = 8864;  // magic number
4,089✔
3592
  pMsgReply->startTime = ths->startTime;
4,089✔
3593
  pMsgReply->timeStamp = tsMs;
4,089✔
3594
  rpcMsg.info.traceId = pRpcMsg->info.traceId;
4,089✔
3595

3596
  // reply
3597
  TRACE_SET_MSGID(&(rpcMsg.info.traceId), tGenIdPI64());
4,089✔
3598
  sGDebug(&rpcMsg.info.traceId, "vgId:%d, send sync-heartbeat-reply to dnode:%d term:%" PRId64 " timestamp:%" PRId64,
4,089!
3599
          ths->vgId, DID(&(pMsgReply->destId)), pMsgReply->term, pMsgReply->timeStamp);
3600

3601
  TAOS_CHECK_RETURN(syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg));
4,089!
3602

3603
  if (currentTerm == 0) currentTerm = raftStoreGetTerm(ths);
4,089✔
3604
  sGDebug(&rpcMsg.info.traceId,
4,089!
3605
          "vgId:%d, process sync-heartbeat msg from dnode:%d, commit-index:%" PRId64 ", cluster:%d msgTerm:%" PRId64
3606
          " currentTerm:%" PRId64,
3607
          ths->vgId, DID(&(pMsg->srcId)), pMsg->commitIndex, CID(&(pMsg->srcId)), pMsg->term, currentTerm);
3608

3609
  if (pMsg->term > currentTerm && ths->state == TAOS_SYNC_STATE_LEARNER) {
4,089!
3610
    raftStoreSetTerm(ths, pMsg->term);
8✔
3611
    currentTerm = pMsg->term;
8✔
3612
  }
3613

3614
  if (pMsg->term == currentTerm &&
4,089!
3615
      (ths->state != TAOS_SYNC_STATE_LEADER && ths->state != TAOS_SYNC_STATE_ASSIGNED_LEADER)) {
4,089!
3616
    syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs);
4,089✔
3617
    resetElect = true;
4,089✔
3618

3619
    ths->minMatchIndex = pMsg->minMatchIndex;
4,089✔
3620

3621
    if (ths->state == TAOS_SYNC_STATE_FOLLOWER || ths->state == TAOS_SYNC_STATE_LEARNER) {
4,089✔
3622
      SRpcMsg rpcMsgLocalCmd = {0};
4,088✔
3623
      TAOS_CHECK_RETURN(syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId));
4,088!
3624
      rpcMsgLocalCmd.info.traceId = rpcMsg.info.traceId;
4,088✔
3625

3626
      SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
4,088✔
3627
      pSyncMsg->cmd =
4,088✔
3628
          (ths->state == TAOS_SYNC_STATE_LEARNER) ? SYNC_LOCAL_CMD_LEARNER_CMT : SYNC_LOCAL_CMD_FOLLOWER_CMT;
4,088✔
3629
      pSyncMsg->commitIndex = pMsg->commitIndex;
4,088✔
3630
      pSyncMsg->currentTerm = pMsg->term;
4,088✔
3631

3632
      if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
4,088!
3633
        int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
4,088✔
3634
        if (code != 0) {
4,088!
3635
          sError("vgId:%d, failed to enqueue commit msg from heartbeat since %s, code:%d", ths->vgId, terrstr(), code);
×
3636
          rpcFreeCont(rpcMsgLocalCmd.pCont);
×
3637
        } else {
3638
          sGTrace(&rpcMsg.info.traceId,
4,088!
3639
                  "vgId:%d, enqueue commit msg from heartbeat, commit-index:%" PRId64 ", term:%" PRId64, ths->vgId,
3640
                  pMsg->commitIndex, pMsg->term);
3641
        }
3642
      }
3643
    }
3644
  }
3645

3646
  if (pMsg->term >= currentTerm &&
4,089!
3647
      (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER)) {
4,089!
3648
    SRpcMsg rpcMsgLocalCmd = {0};
×
3649
    TAOS_CHECK_RETURN(syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId));
×
3650
    rpcMsgLocalCmd.info.traceId = rpcMsg.info.traceId;
×
3651

3652
    SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
×
3653
    pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN;
×
3654
    pSyncMsg->currentTerm = pMsg->term;
×
3655
    pSyncMsg->commitIndex = pMsg->commitIndex;
×
3656

3657
    if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
×
3658
      int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
×
3659
      if (code != 0) {
×
3660
        sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code);
×
3661
        rpcFreeCont(rpcMsgLocalCmd.pCont);
×
3662
      } else {
3663
        sTrace("vgId:%d, sync enqueue step-down msg, new-term:%" PRId64, ths->vgId, pMsg->term);
×
3664
      }
3665
    }
3666
  }
3667

3668
  if (resetElect) syncNodeResetElectTimer(ths);
4,089!
3669
  return 0;
4,089✔
3670
}
3671

3672
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
4,089✔
3673
  int32_t code = 0;
4,089✔
3674

3675
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
4,089✔
3676
  SSyncLogReplMgr*    pMgr = syncNodeGetLogReplMgr(ths, &pMsg->srcId);
4,089✔
3677
  if (pMgr == NULL) {
4,089!
3678
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
3679
    if (terrno != 0) code = terrno;
×
3680
    sError("vgId:%d, failed to get log repl mgr for the peer at addr 0x016%" PRIx64, ths->vgId, pMsg->srcId.addr);
×
3681
    TAOS_RETURN(code);
×
3682
  }
3683

3684
  int64_t tsMs = taosGetTimestampMs();
4,089✔
3685
  syncLogRecvHeartbeatReply(ths, pMsg, tsMs - pMsg->timeStamp, &pRpcMsg->info.traceId);
4,089✔
3686

3687
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);
4,089✔
3688

3689
  return syncLogReplProcessHeartbeatReply(pMgr, ths, pMsg);
4,089✔
3690
}
3691

3692
#ifdef BUILD_NO_CALL
3693
int32_t syncNodeOnHeartbeatReplyOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
3694
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
3695

3696
  int64_t tsMs = taosGetTimestampMs();
3697
  int64_t timeDiff = tsMs - pMsg->timeStamp;
3698
  syncLogRecvHeartbeatReply(ths, pMsg, timeDiff, &pRpcMsg->info.traceId);
3699

3700
  // update last reply time, make decision whether the other node is alive or not
3701
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);
3702
  return 0;
3703
}
3704
#endif
3705

3706
int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
4,088✔
3707
  SyncLocalCmd* pMsg = pRpcMsg->pCont;
4,088✔
3708
  syncLogRecvLocalCmd(ths, pMsg, &pRpcMsg->info.traceId);
4,088✔
3709

3710
  if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
4,088!
3711
    SRaftId id = EMPTY_RAFT_ID;
×
3712
    syncNodeStepDown(ths, pMsg->currentTerm, id);
×
3713

3714
  } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT || pMsg->cmd == SYNC_LOCAL_CMD_LEARNER_CMT) {
8,176!
3715
    if (syncLogBufferIsEmpty(ths->pLogBuf)) {
4,088!
3716
      sError("vgId:%d, sync log buffer is empty", ths->vgId);
×
3717
      return 0;
×
3718
    }
3719
    SyncTerm matchTerm = syncLogBufferGetLastMatchTerm(ths->pLogBuf);
4,088✔
3720
    if (matchTerm < 0) {
4,088!
3721
      return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3722
    }
3723
    if (pMsg->currentTerm == matchTerm) {
4,088✔
3724
      SyncIndex returnIndex = syncNodeUpdateCommitIndex(ths, pMsg->commitIndex);
4,005✔
3725
      sTrace("vgId:%d, raft entry update commit, return index:%" PRId64, ths->vgId, returnIndex);
4,005!
3726
    }
3727
    if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
8,176!
3728
        syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex, &pRpcMsg->info.traceId, "heartbeat") < 0) {
4,088✔
3729
      sError("vgId:%d, failed to commit raft log since %s. commit index:%" PRId64, ths->vgId, terrstr(),
×
3730
             ths->commitIndex);
3731
    }
3732
  } else {
3733
    sError("error local cmd");
×
3734
  }
3735

3736
  return 0;
4,088✔
3737
}
3738

3739
// TLA+ Spec
3740
// ClientRequest(i, v) ==
3741
//     /\ state[i] = Leader
3742
//     /\ LET entry == [term  |-> currentTerm[i],
3743
//                      value |-> v]
3744
//            newLog == Append(log[i], entry)
3745
//        IN  log' = [log EXCEPT ![i] = newLog]
3746
//     /\ UNCHANGED <<messages, serverVars, candidateVars,
3747
//                    leaderVars, commitIndex>>
3748
//
3749

3750
int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex) {
63,392✔
3751
  sGDebug(&pMsg->info.traceId, "vgId:%d, msg:%p, process client request", ths->vgId, pMsg);
63,392!
3752
  int32_t code = 0;
63,405✔
3753

3754
  SyncIndex       index = syncLogBufferGetEndIndex(ths->pLogBuf);
63,405✔
3755
  SyncTerm        term = raftStoreGetTerm(ths);
63,409✔
3756
  SSyncRaftEntry* pEntry = NULL;
63,409✔
3757
  if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
63,409✔
3758
    pEntry = syncEntryBuildFromClientRequest(pMsg->pCont, term, index, &pMsg->info.traceId);
30,030✔
3759
  } else {
3760
    pEntry = syncEntryBuildFromRpcMsg(pMsg, term, index);
33,379✔
3761
  }
3762

3763
  if (pEntry == NULL) {
63,409!
3764
    sGError(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to process client request since %s", ths->vgId, pMsg, terrstr());
×
3765
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3766
  }
3767

3768
  // 1->2, config change is add in write thread, and will continue in sync thread
3769
  // need save message for it
3770
  if (pMsg->msgType == TDMT_SYNC_CONFIG_CHANGE) {
63,409!
3771
    SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
×
3772
    uint64_t  seqNum = syncRespMgrAdd(ths->pSyncRespMgr, &stub);
×
3773
    pEntry->seqNum = seqNum;
×
3774
  }
3775

3776
  if (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
63,409!
3777
    if (pRetIndex) {
63,409✔
3778
      (*pRetIndex) = index;
33,378✔
3779
    }
3780

3781
    if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) {
63,409!
3782
      int32_t code = syncNodeCheckChangeConfig(ths, pEntry);
×
3783
      if (code < 0) {
×
3784
        sGError(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to check change config since %s", ths->vgId, pMsg, terrstr());
×
3785
        syncEntryDestroy(pEntry);
×
3786
        pEntry = NULL;
×
3787
        TAOS_RETURN(code);
×
3788
      }
3789

3790
      if (code > 0) {
×
3791
        SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
×
3792
        int32_t num = syncRespMgrGetAndDel(ths->pSyncRespMgr, pEntry->seqNum, &rsp.info);
×
3793
        sGDebug(&pMsg->info.traceId, "vgId:%d, msg:%p, get response stub for config change, seqNum:%" PRIu64 " num:%d",
×
3794
                ths->vgId, pMsg, pEntry->seqNum, num);
3795
        if (rsp.info.handle != NULL) {
×
3796
          tmsgSendRsp(&rsp);
×
3797
        }
3798
        syncEntryDestroy(pEntry);
×
3799
        pEntry = NULL;
×
3800
        TAOS_RETURN(code);
×
3801
      }
3802
    }
3803

3804
    code = syncNodeAppend(ths, pEntry, pMsg);
63,409✔
3805
    return code;
63,401✔
3806
  } else {
3807
    syncEntryDestroy(pEntry);
×
3808
    pEntry = NULL;
×
3809
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3810
  }
3811
}
3812

3813
const char* syncStr(ESyncState state) {
8,624✔
3814
  switch (state) {
8,624!
3815
    case TAOS_SYNC_STATE_FOLLOWER:
3,243✔
3816
      return "follower";
3,243✔
3817
    case TAOS_SYNC_STATE_CANDIDATE:
45✔
3818
      return "candidate";
45✔
3819
    case TAOS_SYNC_STATE_LEADER:
4,878✔
3820
      return "leader";
4,878✔
3821
    case TAOS_SYNC_STATE_ERROR:
×
3822
      return "error";
×
3823
    case TAOS_SYNC_STATE_OFFLINE:
123✔
3824
      return "offline";
123✔
3825
    case TAOS_SYNC_STATE_LEARNER:
335✔
3826
      return "learner";
335✔
3827
    case TAOS_SYNC_STATE_ASSIGNED_LEADER:
×
3828
      return "assigned leader";
×
3829
    default:
×
3830
      return "unknown";
×
3831
  }
3832
}
3833

3834
int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) {
59✔
3835
  for (int32_t i = 0; i < pNewCfg->totalReplicaNum; ++i) {
69!
3836
    SRaftId raftId = {
69✔
3837
        .addr = SYNC_ADDR(&pNewCfg->nodeInfo[i]),
69✔
3838
        .vgId = ths->vgId,
69✔
3839
    };
3840

3841
    if (syncUtilSameId(&(ths->myRaftId), &raftId)) {
69✔
3842
      pNewCfg->myIndex = i;
59✔
3843
      return 0;
59✔
3844
    }
3845
  }
3846

3847
  return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3848
}
3849

3850
bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) {
63,425✔
3851
  return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1);
63,425!
3852
}
3853

3854
bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId) {
116,254✔
3855
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
230,219✔
3856
    if (syncUtilSameId(&((ths->replicasId)[i]), pRaftId)) {
230,218✔
3857
      return true;
116,256✔
3858
    }
3859
  }
3860
  return false;
1✔
3861
}
3862

3863
SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId) {
2,325✔
3864
  SSyncSnapshotSender* pSender = NULL;
2,325✔
3865
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
9,270✔
3866
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
6,943✔
3867
      pSender = (ths->senders)[i];
2,327✔
3868
    }
3869
  }
3870
  return pSender;
2,327✔
3871
}
3872

3873
SSyncTimer* syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId) {
589✔
3874
  SSyncTimer* pTimer = NULL;
589✔
3875
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
2,257✔
3876
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
1,668✔
3877
      pTimer = &((ths->peerHeartbeatTimerArr)[i]);
589✔
3878
    }
3879
  }
3880
  return pTimer;
589✔
3881
}
3882

3883
SPeerState* syncNodeGetPeerState(SSyncNode* ths, const SRaftId* pDestId) {
704✔
3884
  SPeerState* pState = NULL;
704✔
3885
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
1,408✔
3886
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
704!
3887
      pState = &((ths->peerStates)[i]);
704✔
3888
    }
3889
  }
3890
  return pState;
704✔
3891
}
3892

3893
#ifdef BUILD_NO_CALL
3894
bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const SyncAppendEntries* pMsg) {
3895
  SPeerState* pState = syncNodeGetPeerState(ths, pDestId);
3896
  if (pState == NULL) {
3897
    sError("vgId:%d, replica maybe dropped", ths->vgId);
3898
    return false;
3899
  }
3900

3901
  SyncIndex sendIndex = pMsg->prevLogIndex + 1;
3902
  int64_t   tsNow = taosGetTimestampMs();
3903

3904
  if (pState->lastSendIndex == sendIndex && tsNow - pState->lastSendTime < SYNC_APPEND_ENTRIES_TIMEOUT_MS) {
3905
    return false;
3906
  }
3907

3908
  return true;
3909
}
3910

3911
bool syncNodeCanChange(SSyncNode* pSyncNode) {
3912
  if (pSyncNode->changing) {
3913
    sError("sync cannot change");
3914
    return false;
3915
  }
3916

3917
  if ((pSyncNode->commitIndex >= SYNC_INDEX_BEGIN)) {
3918
    SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
3919
    if (pSyncNode->commitIndex != lastIndex) {
3920
      sError("sync cannot change2");
3921
      return false;
3922
    }
3923
  }
3924

3925
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
3926
    SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->peersId)[i]);
3927
    if (pSender != NULL && pSender->start) {
3928
      sError("sync cannot change3");
3929
      return false;
3930
    }
3931
  }
3932

3933
  return true;
3934
}
3935
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc