• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3819

01 Apr 2025 09:27AM UTC coverage: 34.076% (+0.01%) from 34.065%
#3819

push

travis-ci

happyguoxy
test:alter gcda dir

148544 of 599532 branches covered (24.78%)

Branch coverage included in aggregate %.

222541 of 489451 relevant lines covered (45.47%)

763329.1 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

48.27
/source/libs/sync/src/syncMain.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "sync.h"
18
#include "syncAppendEntries.h"
19
#include "syncAppendEntriesReply.h"
20
#include "syncCommit.h"
21
#include "syncElection.h"
22
#include "syncEnv.h"
23
#include "syncIndexMgr.h"
24
#include "syncInt.h"
25
#include "syncMessage.h"
26
#include "syncPipeline.h"
27
#include "syncRaftCfg.h"
28
#include "syncRaftLog.h"
29
#include "syncRaftStore.h"
30
#include "syncReplication.h"
31
#include "syncRequestVote.h"
32
#include "syncRequestVoteReply.h"
33
#include "syncRespMgr.h"
34
#include "syncSnapshot.h"
35
#include "syncTimeout.h"
36
#include "syncUtil.h"
37
#include "syncVoteMgr.h"
38
#include "tglobal.h"
39
#include "tmisce.h"
40
#include "tref.h"
41

42
static void    syncNodeEqPingTimer(void* param, void* tmrId);
43
static void    syncNodeEqElectTimer(void* param, void* tmrId);
44
static void    syncNodeEqHeartbeatTimer(void* param, void* tmrId);
45
static int32_t syncNodeAppendNoop(SSyncNode* ths);
46
static void    syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId);
47
static bool    syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg);
48
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId);
49
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
50
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
51
static int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg);
52
static bool    syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config);
53
static int32_t syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex);
54
static bool    syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg);
55

56
static bool    syncNodeCanChange(SSyncNode* pSyncNode);
57
static int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode);
58
static int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader);
59
static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry);
60

61
static ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
62

63
int64_t syncOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
340✔
64
  sInfo("vgId:%d, start to open sync", pSyncInfo->vgId);
340✔
65
  SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo, vnodeVersion);
340✔
66
  if (pSyncNode == NULL) {
340!
67
    sError("vgId:%d, failed to open sync node", pSyncInfo->vgId);
×
68
    return -1;
×
69
  }
70

71
  pSyncNode->rid = syncNodeAdd(pSyncNode);
340✔
72
  if (pSyncNode->rid < 0) {
340!
73
    syncNodeClose(pSyncNode);
×
74
    return -1;
×
75
  }
76

77
  pSyncNode->pingBaseLine = pSyncInfo->pingMs;
340✔
78
  pSyncNode->pingTimerMS = pSyncInfo->pingMs;
340✔
79
  pSyncNode->electBaseLine = pSyncInfo->electMs;
340✔
80
  pSyncNode->hbBaseLine = pSyncInfo->heartbeatMs;
340✔
81
  pSyncNode->heartbeatTimerMS = pSyncInfo->heartbeatMs;
340✔
82
  pSyncNode->msgcb = pSyncInfo->msgcb;
340✔
83
  sInfo("vgId:%d, sync opened", pSyncInfo->vgId);
340✔
84
  return pSyncNode->rid;
340✔
85
}
86

87
int32_t syncStart(int64_t rid) {
337✔
88
  int32_t    code = 0;
337✔
89
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
337✔
90
  if (pSyncNode == NULL) {
337!
91
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
92
    if (terrno != 0) code = terrno;
×
93
    sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid);
×
94
    TAOS_RETURN(code);
×
95
  }
96
  sInfo("vgId:%d, begin to start sync", pSyncNode->vgId);
337✔
97

98
  if ((code = syncNodeRestore(pSyncNode)) < 0) {
337!
99
    sError("vgId:%d, failed to restore sync log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
100
    goto _err;
×
101
  }
102
  sInfo("vgId:%d, sync node restore is executed", pSyncNode->vgId);
337✔
103

104
  if ((code = syncNodeStart(pSyncNode)) < 0) {
337!
105
    sError("vgId:%d, failed to start sync node since %s", pSyncNode->vgId, tstrerror(code));
×
106
    goto _err;
×
107
  }
108
  sInfo("vgId:%d, sync node start is executed", pSyncNode->vgId);
337✔
109

110
  syncNodeRelease(pSyncNode);
337✔
111

112
  sInfo("vgId:%d, sync started", pSyncNode->vgId);
337✔
113

114
  TAOS_RETURN(code);
337✔
115

116
_err:
×
117
  syncNodeRelease(pSyncNode);
×
118
  TAOS_RETURN(code);
×
119
}
120

121
int32_t syncNodeGetConfig(int64_t rid, SSyncCfg* cfg) {
719✔
122
  int32_t    code = 0;
719✔
123
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
719✔
124

125
  if (pSyncNode == NULL) {
719!
126
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
127
    if (terrno != 0) code = terrno;
×
128
    sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid);
×
129
    TAOS_RETURN(code);
×
130
  }
131

132
  *cfg = pSyncNode->raftCfg.cfg;
719✔
133

134
  syncNodeRelease(pSyncNode);
719✔
135

136
  return 0;
719✔
137
}
138

139
void syncStop(int64_t rid) {
337✔
140
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
337✔
141
  if (pSyncNode != NULL) {
337!
142
    pSyncNode->isStart = false;
337✔
143
    syncNodeRelease(pSyncNode);
337✔
144
    syncNodeRemove(rid);
337✔
145
  }
146
}
337✔
147

148
void syncPreStop(int64_t rid) {
337✔
149
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
337✔
150
  if (pSyncNode != NULL) {
337!
151
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
337!
152
      sInfo("vgId:%d, stop snapshot receiver", pSyncNode->vgId);
×
153
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
154
    }
155
    syncNodePreClose(pSyncNode);
337✔
156
    syncNodeRelease(pSyncNode);
337✔
157
  }
158
}
337✔
159

160
void syncPostStop(int64_t rid) {
277✔
161
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
277✔
162
  if (pSyncNode != NULL) {
277!
163
    syncNodePostClose(pSyncNode);
277✔
164
    syncNodeRelease(pSyncNode);
277✔
165
  }
166
}
277✔
167

168
static bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pCfg) {
59✔
169
  if (!syncNodeInConfig(pSyncNode, pCfg)) return false;
59!
170
  return abs(pCfg->replicaNum - pSyncNode->replicaNum) <= 1;
59!
171
}
172

173
int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) {
63✔
174
  int32_t    code = 0;
63✔
175
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
63✔
176
  if (pSyncNode == NULL) {
63!
177
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
178
    if (terrno != 0) code = terrno;
×
179
    TAOS_RETURN(code);
×
180
  }
181

182
  if (pSyncNode->raftCfg.lastConfigIndex >= pNewCfg->lastIndex) {
63✔
183
    syncNodeRelease(pSyncNode);
4✔
184
    sInfo("vgId:%d, no need Reconfig, current index:%" PRId64 ", new index:%" PRId64, pSyncNode->vgId,
4!
185
          pSyncNode->raftCfg.lastConfigIndex, pNewCfg->lastIndex);
186
    return 0;
4✔
187
  }
188

189
  if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) {
59!
190
    syncNodeRelease(pSyncNode);
×
191
    code = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
×
192
    sError("vgId:%d, failed to reconfig since invalid new config", pSyncNode->vgId);
×
193
    TAOS_RETURN(code);
×
194
  }
195

196
  TAOS_CHECK_RETURN(syncNodeUpdateNewConfigIndex(pSyncNode, pNewCfg));
59!
197

198
  if (syncNodeDoConfigChange(pSyncNode, pNewCfg, pNewCfg->lastIndex) != 0) {
59!
199
    code = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
×
200
    sError("vgId:%d, failed to reconfig since do change error", pSyncNode->vgId);
×
201
    TAOS_RETURN(code);
×
202
  }
203

204
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER || pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
59!
205
    // TODO check return value
206
    TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
51!
207

208
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
816✔
209
      TAOS_CHECK_RETURN(syncHbTimerInit(pSyncNode, &pSyncNode->peerHeartbeatTimerArr[i], pSyncNode->replicasId[i]));
765!
210
    }
211

212
    TAOS_CHECK_RETURN(syncNodeStartHeartbeatTimer(pSyncNode));
51!
213
    // syncNodeReplicate(pSyncNode);
214
  }
215

216
  syncNodeRelease(pSyncNode);
59✔
217
  TAOS_RETURN(code);
59✔
218
}
219

220
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
177,755✔
221
  int32_t code = -1;
177,755✔
222
  if (!syncIsInit()) {
177,755!
223
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
224
    if (terrno != 0) code = terrno;
×
225
    TAOS_RETURN(code);
×
226
  }
227

228
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
177,751✔
229
  if (pSyncNode == NULL) {
177,760✔
230
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
4✔
231
    if (terrno != 0) code = terrno;
4!
232
    TAOS_RETURN(code);
×
233
  }
234

235
  switch (pMsg->msgType) {
177,756!
236
    case TDMT_SYNC_HEARTBEAT:
4,237✔
237
      code = syncNodeOnHeartbeat(pSyncNode, pMsg);
4,237✔
238
      break;
4,236✔
239
    case TDMT_SYNC_HEARTBEAT_REPLY:
4,236✔
240
      code = syncNodeOnHeartbeatReply(pSyncNode, pMsg);
4,236✔
241
      break;
4,236✔
242
    case TDMT_SYNC_TIMEOUT:
2,330✔
243
      code = syncNodeOnTimeout(pSyncNode, pMsg);
2,330✔
244
      break;
2,330✔
245
    case TDMT_SYNC_TIMEOUT_ELECTION:
36✔
246
      code = syncNodeOnTimeout(pSyncNode, pMsg);
36✔
247
      break;
36✔
248
    case TDMT_SYNC_CLIENT_REQUEST:
34,189✔
249
      code = syncNodeOnClientRequest(pSyncNode, pMsg, NULL);
34,189✔
250
      break;
34,190✔
251
    case TDMT_SYNC_REQUEST_VOTE:
53✔
252
      code = syncNodeOnRequestVote(pSyncNode, pMsg);
53✔
253
      break;
53✔
254
    case TDMT_SYNC_REQUEST_VOTE_REPLY:
53✔
255
      code = syncNodeOnRequestVoteReply(pSyncNode, pMsg);
53✔
256
      break;
53✔
257
    case TDMT_SYNC_APPEND_ENTRIES:
64,194✔
258
      code = syncNodeOnAppendEntries(pSyncNode, pMsg);
64,194✔
259
      break;
64,194✔
260
    case TDMT_SYNC_APPEND_ENTRIES_REPLY:
64,192✔
261
      code = syncNodeOnAppendEntriesReply(pSyncNode, pMsg);
64,192✔
262
      break;
64,192✔
263
    case TDMT_SYNC_SNAPSHOT_SEND:
×
264
      code = syncNodeOnSnapshot(pSyncNode, pMsg);
×
265
      break;
×
266
    case TDMT_SYNC_SNAPSHOT_RSP:
×
267
      code = syncNodeOnSnapshotRsp(pSyncNode, pMsg);
×
268
      break;
×
269
    case TDMT_SYNC_LOCAL_CMD:
4,236✔
270
      code = syncNodeOnLocalCmd(pSyncNode, pMsg);
4,236✔
271
      break;
4,237✔
272
    case TDMT_SYNC_FORCE_FOLLOWER:
×
273
      code = syncForceBecomeFollower(pSyncNode, pMsg);
×
274
      break;
×
275
    case TDMT_SYNC_SET_ASSIGNED_LEADER:
×
276
      code = syncBecomeAssignedLeader(pSyncNode, pMsg);
×
277
      break;
×
278
    default:
×
279
      code = TSDB_CODE_MSG_NOT_PROCESSED;
×
280
  }
281

282
  syncNodeRelease(pSyncNode);
177,757✔
283
  if (code != 0) {
177,755!
284
    sDebug("vgId:%d, failed to process sync msg:%p type:%s since %s", pSyncNode->vgId, pMsg, TMSG_INFO(pMsg->msgType),
×
285
           tstrerror(code));
286
  }
287
  TAOS_RETURN(code);
177,755✔
288
}
289

290
int32_t syncLeaderTransfer(int64_t rid) {
337✔
291
  int32_t    code = 0;
337✔
292
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
337✔
293
  if (pSyncNode == NULL) {
337!
294
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
295
    if (terrno != 0) code = terrno;
×
296
    TAOS_RETURN(code);
×
297
  }
298

299
  int32_t ret = syncNodeLeaderTransfer(pSyncNode);
337✔
300
  syncNodeRelease(pSyncNode);
337✔
301
  return ret;
337✔
302
}
303

304
int32_t syncForceBecomeFollower(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
×
305
  SRaftId id = {0};
×
306
  syncNodeBecomeFollower(ths, id, "force election");
×
307

308
  SRpcMsg rsp = {
×
309
      .code = 0,
310
      .pCont = pRpcMsg->info.rsp,
×
311
      .contLen = pRpcMsg->info.rspLen,
×
312
      .info = pRpcMsg->info,
313
  };
314
  tmsgSendRsp(&rsp);
×
315

316
  return 0;
×
317
}
318

319
int32_t syncBecomeAssignedLeader(SSyncNode* ths, SRpcMsg* pRpcMsg) {
×
320
  int32_t code = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
×
321
  void*   pHead = NULL;
×
322
  int32_t contLen = 0;
×
323

324
  SVArbSetAssignedLeaderReq req = {0};
×
325
  if (tDeserializeSVArbSetAssignedLeaderReq((char*)pRpcMsg->pCont + sizeof(SMsgHead), pRpcMsg->contLen, &req) != 0) {
×
326
    sError("vgId:%d, failed to deserialize SVArbSetAssignedLeaderReq", ths->vgId);
×
327
    code = TSDB_CODE_INVALID_MSG;
×
328
    goto _OVER;
×
329
  }
330

331
  if (ths->arbTerm > req.arbTerm) {
×
332
    sInfo("vgId:%d, skip to set assigned leader, msg with lower term, local:%" PRId64 "msg:%" PRId64, ths->vgId,
×
333
          ths->arbTerm, req.arbTerm);
334
    goto _OVER;
×
335
  }
336

337
  ths->arbTerm = TMAX(req.arbTerm, ths->arbTerm);
×
338

339
  if (!req.force) {
×
340
    if (strncmp(req.memberToken, ths->arbToken, TSDB_ARB_TOKEN_SIZE) != 0) {
×
341
      sInfo("vgId:%d, skip to set assigned leader, token mismatch, local:%s, msg:%s", ths->vgId, ths->arbToken,
×
342
            req.memberToken);
343
      goto _OVER;
×
344
    }
345
  }
346

347
  if (ths->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
×
348
    code = TSDB_CODE_SUCCESS;
×
349
    raftStoreNextTerm(ths);
×
350
    if (terrno != TSDB_CODE_SUCCESS) {
×
351
      code = terrno;
×
352
      sError("vgId:%d, failed to set next term since:%s", ths->vgId, tstrerror(code));
×
353
      goto _OVER;
×
354
    }
355
    syncNodeBecomeAssignedLeader(ths);
×
356

357
    if ((code = syncNodeAppendNoop(ths)) < 0) {
×
358
      sError("vgId:%d, assigned leader failed to append noop entry since %s", ths->vgId, tstrerror(code));
×
359
    }
360
  }
361

362
  SVArbSetAssignedLeaderRsp rsp = {0};
×
363
  rsp.arbToken = req.arbToken;
×
364
  rsp.memberToken = req.memberToken;
×
365
  rsp.vgId = ths->vgId;
×
366

367
  contLen = tSerializeSVArbSetAssignedLeaderRsp(NULL, 0, &rsp);
×
368
  if (contLen <= 0) {
×
369
    code = TSDB_CODE_OUT_OF_MEMORY;
×
370
    sError("vgId:%d, failed to serialize SVArbSetAssignedLeaderRsp", ths->vgId);
×
371
    goto _OVER;
×
372
  }
373
  pHead = rpcMallocCont(contLen);
×
374
  if (!pHead) {
×
375
    code = terrno;
×
376
    sError("vgId:%d, failed to malloc memory for SVArbSetAssignedLeaderRsp", ths->vgId);
×
377
    goto _OVER;
×
378
  }
379
  if (tSerializeSVArbSetAssignedLeaderRsp(pHead, contLen, &rsp) <= 0) {
×
380
    code = TSDB_CODE_OUT_OF_MEMORY;
×
381
    sError("vgId:%d, failed to serialize SVArbSetAssignedLeaderRsp", ths->vgId);
×
382
    rpcFreeCont(pHead);
×
383
    goto _OVER;
×
384
  }
385

386
  code = TSDB_CODE_SUCCESS;
×
387

388
_OVER:;
×
389
  SRpcMsg rspMsg = {
×
390
      .code = code,
391
      .pCont = pHead,
392
      .contLen = contLen,
393
      .info = pRpcMsg->info,
394
  };
395

396
  tmsgSendRsp(&rspMsg);
×
397

398
  tFreeSVArbSetAssignedLeaderReq(&req);
×
399
  TAOS_RETURN(code);
×
400
}
401

402
int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) {
×
403
  int32_t    code = 0;
×
404
  SSyncNode* pNode = syncNodeAcquire(rid);
×
405
  if (pNode == NULL) {
×
406
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
407
    if (terrno != 0) code = terrno;
×
408
    TAOS_RETURN(code);
×
409
  }
410

411
  SRpcMsg rpcMsg = {0, .info.notFreeAhandle = 1};
×
412
  int32_t ret = syncRespMgrGetAndDel(pNode->pSyncRespMgr, seq, &rpcMsg.info);
×
413
  rpcMsg.code = TSDB_CODE_SYN_TIMEOUT;
×
414

415
  syncNodeRelease(pNode);
×
416
  if (ret == 1) {
×
417
    sInfo("send timeout response, seq:%" PRId64 " handle:%p ahandle:%p", seq, rpcMsg.info.handle, rpcMsg.info.ahandle);
×
418
    code = rpcSendResponse(&rpcMsg);
×
419
    return code;
×
420
  } else {
421
    sError("no message handle to send timeout response, seq:%" PRId64, seq);
×
422
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
423
  }
424
}
425

426
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) {
4,341✔
427
  SyncIndex minMatchIndex = SYNC_INDEX_INVALID;
4,341✔
428

429
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
12,972✔
430
    SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
8,631✔
431
    if (minMatchIndex == SYNC_INDEX_INVALID) {
8,631✔
432
      minMatchIndex = matchIndex;
4,393✔
433
    } else if (matchIndex > 0 && matchIndex < minMatchIndex) {
4,238✔
434
      minMatchIndex = matchIndex;
44✔
435
    }
436
  }
437
  return minMatchIndex;
4,341✔
438
}
439

440
static SyncIndex syncLogRetentionIndex(SSyncNode* pSyncNode, int64_t bytes) {
612✔
441
  return pSyncNode->pLogStore->syncLogIndexRetention(pSyncNode->pLogStore, bytes);
612✔
442
}
443

444
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
818✔
445
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
818✔
446
  int32_t    code = 0;
818✔
447
  if (pSyncNode == NULL) {
818!
448
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
449
    if (terrno != 0) code = terrno;
×
450
    sError("sync begin snapshot error");
×
451
    TAOS_RETURN(code);
×
452
  }
453

454
  SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
818✔
455
  SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
818✔
456
  bool      isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);
818✔
457

458
  if (isEmpty || !(lastApplyIndex >= beginIndex && lastApplyIndex <= endIndex)) {
818!
459
    sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", empty:%d, do not delete wal", lastApplyIndex, isEmpty);
×
460
    syncNodeRelease(pSyncNode);
×
461
    return 0;
×
462
  }
463

464
  int64_t logRetention = 0;
818✔
465

466
  if (syncNodeIsMnode(pSyncNode)) {
818✔
467
    // mnode
468
    logRetention = tsMndLogRetention;
99✔
469
  } else {
470
    // vnode
471
    if (pSyncNode->replicaNum > 1) {
719✔
472
      logRetention = SYNC_VNODE_LOG_RETENTION;
601✔
473
    }
474
  }
475

476
  if (pSyncNode->totalReplicaNum > 1) {
818✔
477
    if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER &&
612✔
478
        pSyncNode->state != TAOS_SYNC_STATE_LEARNER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
5!
479
      sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " candidate or unknown state, do not delete wal",
×
480
              lastApplyIndex);
481
      syncNodeRelease(pSyncNode);
×
482
      return 0;
×
483
    }
484
    SyncIndex retentionIndex =
612✔
485
        TMAX(pSyncNode->minMatchIndex, syncLogRetentionIndex(pSyncNode, SYNC_WAL_LOG_RETENTION_SIZE));
612✔
486
    logRetention += TMAX(0, lastApplyIndex - retentionIndex);
612✔
487
  }
488

489
_DEL_WAL:
206✔
490

491
  do {
492
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
818✔
493
    SyncIndex          snapshotVer = walGetSnapshotVer(pData->pWal);
818✔
494
    SyncIndex          walCommitVer = walGetCommittedVer(pData->pWal);
818✔
495
    SyncIndex          wallastVer = walGetLastVer(pData->pWal);
818✔
496
    if (lastApplyIndex <= walCommitVer) {
818!
497
      SyncIndex snapshottingIndex = atomic_load_64(&pSyncNode->snapshottingIndex);
818✔
498

499
      if (snapshottingIndex == SYNC_INDEX_INVALID) {
818!
500
        atomic_store_64(&pSyncNode->snapshottingIndex, lastApplyIndex);
818✔
501
        pSyncNode->snapshottingTime = taosGetTimestampMs();
818✔
502

503
        code = walBeginSnapshot(pData->pWal, lastApplyIndex, logRetention);
818✔
504
        if (code == 0) {
818!
505
          sNTrace(pSyncNode, "wal snapshot begin, index:%" PRId64 ", last apply index:%" PRId64,
818✔
506
                  pSyncNode->snapshottingIndex, lastApplyIndex);
507
        } else {
508
          sNError(pSyncNode, "wal snapshot begin error since:%s, index:%" PRId64 ", last apply index:%" PRId64,
×
509
                  terrstr(), pSyncNode->snapshottingIndex, lastApplyIndex);
510
          atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
×
511
        }
512

513
      } else {
514
        sNTrace(pSyncNode, "snapshotting for %" PRId64 ", do not delete wal for new-snapshot-index:%" PRId64,
×
515
                snapshottingIndex, lastApplyIndex);
516
      }
517
    }
518
  } while (0);
519

520
  syncNodeRelease(pSyncNode);
818✔
521
  TAOS_RETURN(code);
818✔
522
}
523

524
int32_t syncEndSnapshot(int64_t rid) {
817✔
525
  int32_t    code = 0;
817✔
526
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
817✔
527
  if (pSyncNode == NULL) {
818!
528
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
529
    if (terrno != 0) code = terrno;
×
530
    sError("sync end snapshot error");
×
531
    TAOS_RETURN(code);
×
532
  }
533

534
  if (atomic_load_64(&pSyncNode->snapshottingIndex) != SYNC_INDEX_INVALID) {
818!
535
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
817✔
536
    code = walEndSnapshot(pData->pWal);
817✔
537
    if (code != 0) {
818!
538
      sNError(pSyncNode, "wal snapshot end error since:%s", tstrerror(code));
×
539
      syncNodeRelease(pSyncNode);
×
540
      TAOS_RETURN(code);
×
541
    } else {
542
      sNTrace(pSyncNode, "wal snapshot end, index:%" PRId64, atomic_load_64(&pSyncNode->snapshottingIndex));
818✔
543
      atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
818✔
544
    }
545
  }
546

547
  syncNodeRelease(pSyncNode);
818✔
548
  TAOS_RETURN(code);
818✔
549
}
550

551
bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) {
26,967✔
552
  if (pSyncNode == NULL) {
26,967!
553
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
554
    sError("sync ready for read error");
×
555
    return false;
×
556
  }
557

558
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
26,967!
559
    terrno = TSDB_CODE_SYN_NOT_LEADER;
4,245✔
560
    return false;
4,245✔
561
  }
562

563
  if (!pSyncNode->restoreFinish) {
22,722✔
564
    terrno = TSDB_CODE_SYN_RESTORING;
473✔
565
    return false;
473✔
566
  }
567

568
  return true;
22,249✔
569
}
570

571
bool syncIsReadyForRead(int64_t rid) {
15,820✔
572
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
15,820✔
573
  if (pSyncNode == NULL) {
15,821!
574
    sError("sync ready for read error");
×
575
    return false;
×
576
  }
577

578
  bool ready = syncNodeIsReadyForRead(pSyncNode);
15,821✔
579

580
  syncNodeRelease(pSyncNode);
15,820✔
581
  return ready;
15,820✔
582
}
583

584
#ifdef BUILD_NO_CALL
585
bool syncSnapshotSending(int64_t rid) {
586
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
587
  if (pSyncNode == NULL) {
588
    return false;
589
  }
590

591
  bool b = syncNodeSnapshotSending(pSyncNode);
592
  syncNodeRelease(pSyncNode);
593
  return b;
594
}
595

596
bool syncSnapshotRecving(int64_t rid) {
597
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
598
  if (pSyncNode == NULL) {
599
    return false;
600
  }
601

602
  bool b = syncNodeSnapshotRecving(pSyncNode);
603
  syncNodeRelease(pSyncNode);
604
  return b;
605
}
606
#endif
607

608
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) {
337✔
609
  if (pSyncNode->peersNum == 0) {
337✔
610
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
249✔
611
    return 0;
249✔
612
  }
613

614
  int32_t ret = 0;
88✔
615
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && pSyncNode->replicaNum > 1) {
88✔
616
    SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0];
29✔
617
    if (pSyncNode->peersNum == 2) {
29✔
618
      SyncIndex matchIndex0 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
21✔
619
      SyncIndex matchIndex1 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[1]));
21✔
620
      if (matchIndex1 > matchIndex0) {
21✔
621
        newLeader = (pSyncNode->peersNodeInfo)[1];
1✔
622
      }
623
    }
624
    ret = syncNodeLeaderTransferTo(pSyncNode, newLeader);
29✔
625
  }
626

627
  return ret;
88✔
628
}
629

630
int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) {
29✔
631
  if (pSyncNode->replicaNum == 1) {
29!
632
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
×
633
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
634
  }
635

636
  sNTrace(pSyncNode, "begin leader transfer to %s:%u", newLeader.nodeFqdn, newLeader.nodePort);
29!
637

638
  SRpcMsg rpcMsg = {0};
29✔
639
  TAOS_CHECK_RETURN(syncBuildLeaderTransfer(&rpcMsg, pSyncNode->vgId));
29!
640

641
  SyncLeaderTransfer* pMsg = rpcMsg.pCont;
29✔
642
  pMsg->newLeaderId.addr = SYNC_ADDR(&newLeader);
29✔
643
  pMsg->newLeaderId.vgId = pSyncNode->vgId;
29✔
644
  pMsg->newNodeInfo = newLeader;
29✔
645

646
  int32_t ret = syncNodePropose(pSyncNode, &rpcMsg, false, NULL);
29✔
647
  rpcFreeCont(rpcMsg.pCont);
29✔
648
  return ret;
29✔
649
}
650

651
SSyncState syncGetState(int64_t rid) {
41,928✔
652
  SSyncState state = {.state = TAOS_SYNC_STATE_ERROR};
41,928✔
653

654
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
41,928✔
655
  if (pSyncNode != NULL) {
41,937!
656
    state.state = pSyncNode->state;
41,937✔
657
    state.roleTimeMs = pSyncNode->roleTimeMs;
41,937✔
658
    state.startTimeMs = pSyncNode->startTime;
41,937✔
659
    state.restored = pSyncNode->restoreFinish;
41,937✔
660
    if (pSyncNode->vgId != 1) {
41,937✔
661
      state.canRead = syncNodeIsReadyForRead(pSyncNode);
11,146✔
662
    } else {
663
      state.canRead = state.restored;
30,791✔
664
    }
665
    /*
666
    double progress = 0;
667
    if(pSyncNode->pLogBuf->totalIndex > 0 && pSyncNode->pLogBuf->commitIndex > 0){
668
      progress = (double)pSyncNode->pLogBuf->commitIndex/(double)pSyncNode->pLogBuf->totalIndex;
669
      state.progress = (int32_t)(progress * 100);
670
    }
671
    else{
672
      state.progress = -1;
673
    }
674
    sDebug("vgId:%d, learner progress state, commitIndex:%" PRId64 " totalIndex:%" PRId64 ", "
675
            "progress:%lf, progress:%d",
676
          pSyncNode->vgId,
677
         pSyncNode->pLogBuf->commitIndex, pSyncNode->pLogBuf->totalIndex, progress, state.progress);
678
    */
679
    state.term = raftStoreGetTerm(pSyncNode);
41,937✔
680
    syncNodeRelease(pSyncNode);
41,937✔
681
  }
682

683
  return state;
41,933✔
684
}
685

686
void syncGetCommitIndex(int64_t rid, int64_t* syncCommitIndex) {
9,362✔
687
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
9,362✔
688
  if (pSyncNode != NULL) {
9,362!
689
    *syncCommitIndex = pSyncNode->commitIndex;
9,362✔
690
    syncNodeRelease(pSyncNode);
9,362✔
691
  }
692
}
9,362✔
693

694
int32_t syncGetArbToken(int64_t rid, char* outToken) {
1,377✔
695
  int32_t    code = 0;
1,377✔
696
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
1,377✔
697
  if (pSyncNode == NULL) {
1,377!
698
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
699
    if (terrno != 0) code = terrno;
×
700
    TAOS_RETURN(code);
×
701
  }
702

703
  memset(outToken, 0, TSDB_ARB_TOKEN_SIZE);
1,377✔
704
  (void)taosThreadMutexLock(&pSyncNode->arbTokenMutex);
1,377✔
705
  tstrncpy(outToken, pSyncNode->arbToken, TSDB_ARB_TOKEN_SIZE);
1,377✔
706
  (void)taosThreadMutexUnlock(&pSyncNode->arbTokenMutex);
1,377✔
707

708
  syncNodeRelease(pSyncNode);
1,377✔
709
  TAOS_RETURN(code);
1,377✔
710
}
711

712
int32_t syncCheckSynced(int64_t rid) {
×
713
  int32_t    code = 0;
×
714
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
×
715
  if (pSyncNode == NULL) {
×
716
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
717
    if (terrno != 0) code = terrno;
×
718
    TAOS_RETURN(code);
×
719
  }
720

721
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
×
722
    code = TSDB_CODE_VND_ARB_NOT_SYNCED;
×
723
    syncNodeRelease(pSyncNode);
×
724
    TAOS_RETURN(code);
×
725
  }
726

727
  bool isSync = pSyncNode->commitIndex >= pSyncNode->assignedCommitIndex;
×
728
  code = (isSync ? TSDB_CODE_SUCCESS : TSDB_CODE_VND_ARB_NOT_SYNCED);
×
729

730
  syncNodeRelease(pSyncNode);
×
731
  TAOS_RETURN(code);
×
732
}
733

734
int32_t syncUpdateArbTerm(int64_t rid, SyncTerm arbTerm) {
×
735
  int32_t    code = 0;
×
736
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
×
737
  if (pSyncNode == NULL) {
×
738
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
739
    if (terrno != 0) code = terrno;
×
740
    TAOS_RETURN(code);
×
741
  }
742

743
  pSyncNode->arbTerm = TMAX(arbTerm, pSyncNode->arbTerm);
×
744
  syncNodeRelease(pSyncNode);
×
745
  TAOS_RETURN(code);
×
746
}
747

748
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex) {
101,147✔
749
  if (pSyncNode->raftCfg.configIndexCount < 1) {
101,147!
750
    sError("vgId:%d, failed get snapshot config index, configIndexCount:%d", pSyncNode->vgId,
×
751
           pSyncNode->raftCfg.configIndexCount);
752
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
753
    return -2;
×
754
  }
755
  SyncIndex lastIndex = (pSyncNode->raftCfg.configIndexArr)[0];
101,147✔
756

757
  for (int32_t i = 0; i < pSyncNode->raftCfg.configIndexCount; ++i) {
202,988✔
758
    if ((pSyncNode->raftCfg.configIndexArr)[i] > lastIndex &&
101,841✔
759
        (pSyncNode->raftCfg.configIndexArr)[i] <= snapshotLastApplyIndex) {
694!
760
      lastIndex = (pSyncNode->raftCfg.configIndexArr)[i];
694✔
761
    }
762
  }
763
  sTrace("vgId:%d, index:%" PRId64 ", get last snapshot config index:%" PRId64, pSyncNode->vgId, snapshotLastApplyIndex,
101,147✔
764
         lastIndex);
765

766
  return lastIndex;
101,146✔
767
}
768

769
static SRaftId syncGetRaftIdByEp(SSyncNode* pSyncNode, const SEp* pEp) {
1,937✔
770
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
3,131✔
771
    if (strcmp(pEp->fqdn, (pSyncNode->peersNodeInfo)[i].nodeFqdn) == 0 &&
2,190!
772
        pEp->port == (pSyncNode->peersNodeInfo)[i].nodePort) {
2,190✔
773
      return pSyncNode->peersId[i];
996✔
774
    }
775
  }
776
  return EMPTY_RAFT_ID;
941✔
777
}
778

779
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
945✔
780
  pEpSet->numOfEps = 0;
945✔
781

782
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
945✔
783
  if (pSyncNode == NULL) return;
945!
784

785
  int index = -1;
945✔
786

787
  int j = 0;
945✔
788
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
2,886✔
789
    if (pSyncNode->raftCfg.cfg.nodeInfo[i].nodeRole == TAOS_SYNC_ROLE_LEARNER) continue;
1,941✔
790
    SEp* pEp = &pEpSet->eps[j];
1,937✔
791
    tstrncpy(pEp->fqdn, pSyncNode->raftCfg.cfg.nodeInfo[i].nodeFqdn, TSDB_FQDN_LEN);
1,937✔
792
    pEp->port = (pSyncNode->raftCfg.cfg.nodeInfo)[i].nodePort;
1,937✔
793
    pEpSet->numOfEps++;
1,937✔
794
    sDebug("vgId:%d, sync get retry epset, index:%d %s:%d", pSyncNode->vgId, i, pEp->fqdn, pEp->port);
1,937✔
795
    SRaftId id = syncGetRaftIdByEp(pSyncNode, pEp);
1,937✔
796
    if (id.addr == pSyncNode->leaderCache.addr && id.vgId == pSyncNode->leaderCache.vgId && id.addr != 0 &&
1,937!
797
        id.vgId != 0)
151!
798
      index = j;
151✔
799
    j++;
1,937✔
800
  }
801
  if (pEpSet->numOfEps > 0) {
945!
802
    if (index != -1) {
945✔
803
      pEpSet->inUse = index;
151✔
804
    } else {
805
      if (pSyncNode->myRaftId.addr == pSyncNode->leaderCache.addr &&
794✔
806
          pSyncNode->myRaftId.vgId == pSyncNode->leaderCache.vgId) {
157!
807
        pEpSet->inUse = pSyncNode->raftCfg.cfg.myIndex;
157✔
808
      } else {
809
        pEpSet->inUse = (pSyncNode->raftCfg.cfg.myIndex + 1) % pEpSet->numOfEps;
637✔
810
      }
811
    }
812
    // pEpSet->inUse = 0;
813
  }
814
  epsetSort(pEpSet);
945✔
815

816
  sInfo("vgId:%d, sync get retry epset numOfEps:%d inUse:%d", pSyncNode->vgId, pEpSet->numOfEps, pEpSet->inUse);
945!
817
  syncNodeRelease(pSyncNode);
945✔
818
}
819

820
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
70,894✔
821
  int32_t    code = 0;
70,894✔
822
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
70,894✔
823
  if (pSyncNode == NULL) {
70,901✔
824
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
1✔
825
    if (terrno != 0) code = terrno;
1!
826
    sError("sync propose error");
1!
827
    TAOS_RETURN(code);
1✔
828
  }
829

830
  int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak, seq);
70,900✔
831
  syncNodeRelease(pSyncNode);
70,899✔
832
  return ret;
70,900✔
833
}
834

835
int32_t syncCheckMember(int64_t rid) {
×
836
  int32_t    code = 0;
×
837
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
×
838
  if (pSyncNode == NULL) {
×
839
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
840
    if (terrno != 0) code = terrno;
×
841
    sError("sync propose error");
×
842
    TAOS_RETURN(code);
×
843
  }
844

845
  if (pSyncNode->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_LEARNER) {
×
846
    syncNodeRelease(pSyncNode);
×
847
    return TSDB_CODE_SYN_WRONG_ROLE;
×
848
  }
849

850
  syncNodeRelease(pSyncNode);
×
851
  return 0;
×
852
}
853

854
int32_t syncIsCatchUp(int64_t rid) {
108✔
855
  int32_t    code = 0;
108✔
856
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
108✔
857
  if (pSyncNode == NULL) {
108!
858
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
859
    if (terrno != 0) code = terrno;
×
860
    sError("sync Node Acquire error since %d", ERRNO);
×
861
    TAOS_RETURN(code);
×
862
  }
863

864
  int32_t isCatchUp = 0;
108✔
865
  if (pSyncNode->pLogBuf->totalIndex < 0 || pSyncNode->pLogBuf->commitIndex < 0 ||
108!
866
      pSyncNode->pLogBuf->totalIndex < pSyncNode->pLogBuf->commitIndex ||
24!
867
      pSyncNode->pLogBuf->totalIndex - pSyncNode->pLogBuf->commitIndex > SYNC_LEARNER_CATCHUP) {
24✔
868
    sInfo("vgId:%d, Not catch up, wait one second, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64,
100!
869
          pSyncNode->vgId, pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex,
870
          pSyncNode->pLogBuf->matchIndex);
871
    isCatchUp = 0;
100✔
872
  } else {
873
    sInfo("vgId:%d, Catch up, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64, pSyncNode->vgId,
8!
874
          pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex, pSyncNode->pLogBuf->matchIndex);
875
    isCatchUp = 1;
8✔
876
  }
877

878
  syncNodeRelease(pSyncNode);
108✔
879
  return isCatchUp;
108✔
880
}
881

882
ESyncRole syncGetRole(int64_t rid) {
108✔
883
  int32_t    code = 0;
108✔
884
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
108✔
885
  if (pSyncNode == NULL) {
108!
886
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
887
    if (terrno != 0) code = terrno;
×
888
    sError("sync Node Acquire error since %d", ERRNO);
×
889
    TAOS_RETURN(code);
×
890
  }
891

892
  ESyncRole role = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole;
108✔
893

894
  syncNodeRelease(pSyncNode);
108✔
895
  return role;
108✔
896
}
897

898
int64_t syncGetTerm(int64_t rid) {
546✔
899
  int32_t    code = 0;
546✔
900
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
546✔
901
  if (pSyncNode == NULL) {
546!
902
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
903
    if (terrno != 0) code = terrno;
×
904
    sError("sync Node Acquire error since %d", ERRNO);
×
905
    TAOS_RETURN(code);
×
906
  }
907

908
  int64_t term = raftStoreGetTerm(pSyncNode);
546✔
909

910
  syncNodeRelease(pSyncNode);
546✔
911
  return term;
546✔
912
}
913

914
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
70,919✔
915
  int32_t code = 0;
70,919✔
916
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
70,919!
917
    code = TSDB_CODE_SYN_NOT_LEADER;
7✔
918
    sNWarn(pSyncNode, "sync propose not leader, type:%s", TMSG_INFO(pMsg->msgType));
7!
919
    TAOS_RETURN(code);
7✔
920
  }
921

922
  if (!pSyncNode->restoreFinish) {
70,912!
923
    code = TSDB_CODE_SYN_PROPOSE_NOT_READY;
×
924
    sNWarn(pSyncNode, "failed to sync propose since not ready, type:%s, last:%" PRId64 ", cmt:%" PRId64,
×
925
           TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
926
    TAOS_RETURN(code);
×
927
  }
928

929
  // heartbeat timeout
930
  if (pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER && syncNodeHeartbeatReplyTimeout(pSyncNode)) {
70,912!
931
    code = TSDB_CODE_SYN_PROPOSE_NOT_READY;
×
932
    sNError(pSyncNode, "failed to sync propose since heartbeat timeout, type:%s, last:%" PRId64 ", cmt:%" PRId64,
×
933
            TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
934
    TAOS_RETURN(code);
×
935
  }
936

937
  // optimized one replica
938
  if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) {
70,910✔
939
    SyncIndex retIndex;
940
    int32_t   code = syncNodeOnClientRequest(pSyncNode, pMsg, &retIndex);
36,694✔
941
    if (code >= 0) {
36,698!
942
      pMsg->info.conn.applyIndex = retIndex;
36,698✔
943
      pMsg->info.conn.applyTerm = raftStoreGetTerm(pSyncNode);
36,698✔
944

945
      // after raft member change, need to handle 1->2 switching point
946
      // at this point, need to switch entry handling thread
947
      if (pSyncNode->replicaNum == 1) {
36,698!
948
        sGDebug(&pMsg->info.traceId, "vgId:%d, index:%" PRId64 ", propose optimized msg type:%s", pSyncNode->vgId,
36,699!
949
                retIndex, TMSG_INFO(pMsg->msgType));
950
        return 1;
36,699✔
951
      } else {
952
        sGDebug(&pMsg->info.traceId,
×
953
                "vgId:%d, index:%" PRId64 ", propose optimized msg, return to normal, type:%s, handle:%p",
954
                pSyncNode->vgId, retIndex, TMSG_INFO(pMsg->msgType), pMsg->info.handle);
955
        return 0;
×
956
      }
957
    } else {
958
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
959
      sError("vgId:%d, failed to propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
×
960
             TMSG_INFO(pMsg->msgType));
961
      TAOS_RETURN(code);
×
962
    }
963
  } else {
964
    SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
34,222✔
965
    uint64_t  seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);
34,222✔
966
    SRpcMsg   rpcMsg = {.info.traceId = pMsg->info.traceId};
34,223✔
967
    int32_t   code = syncBuildClientRequest(&rpcMsg, pMsg, seqNum, isWeak, pSyncNode->vgId);
34,223✔
968
    if (code != 0) {
34,223!
969
      sError("vgId:%d, failed to propose msg while serialize since %s", pSyncNode->vgId, terrstr());
×
970
      code = syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
×
971
      TAOS_RETURN(code);
×
972
    }
973

974
    sGDebug(&pMsg->info.traceId, "vgId:%d, msg:%p, propose msg, type:%s", pSyncNode->vgId, pMsg, TMSG_INFO(pMsg->msgType));
34,223!
975
    code = (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg);
34,223✔
976
    if (code != 0) {
34,223✔
977
      sWarn("vgId:%d, failed to propose msg while enqueue since %s", pSyncNode->vgId, terrstr());
33!
978
      TAOS_CHECK_RETURN(syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum));
33!
979
    }
980

981
    if (seq != NULL) *seq = seqNum;
34,223✔
982
    TAOS_RETURN(code);
34,223✔
983
  }
984
}
985

986
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId) {
5,865✔
987
  pSyncTimer->pTimer = NULL;
5,865✔
988
  pSyncTimer->counter = 0;
5,865✔
989
  pSyncTimer->timerMS = pSyncNode->hbBaseLine;
5,865✔
990
  pSyncTimer->timerCb = syncNodeEqPeerHeartbeatTimer;
5,865✔
991
  pSyncTimer->destId = destId;
5,865✔
992
  pSyncTimer->timeStamp = taosGetTimestampMs();
5,865✔
993
  atomic_store_64(&pSyncTimer->logicClock, 0);
5,865✔
994
  return 0;
5,865✔
995
}
996

997
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
62✔
998
  int32_t code = 0;
62✔
999
  int64_t tsNow = taosGetTimestampMs();
62✔
1000
  if (syncIsInit()) {
62!
1001
    SSyncHbTimerData* pData = syncHbTimerDataAcquire(pSyncTimer->hbDataRid);
62✔
1002
    if (pData == NULL) {
62!
1003
      pData = taosMemoryMalloc(sizeof(SSyncHbTimerData));
62!
1004
      pData->rid = syncHbTimerDataAdd(pData);
62✔
1005
    }
1006
    pSyncTimer->hbDataRid = pData->rid;
62✔
1007
    pSyncTimer->timeStamp = tsNow;
62✔
1008

1009
    pData->syncNodeRid = pSyncNode->rid;
62✔
1010
    pData->pTimer = pSyncTimer;
62✔
1011
    pData->destId = pSyncTimer->destId;
62✔
1012
    pData->logicClock = pSyncTimer->logicClock;
62✔
1013
    pData->execTime = tsNow + pSyncTimer->timerMS;
62✔
1014

1015
    sTrace("vgId:%d, start hb timer, rid:%" PRId64 " addr:0x%" PRIx64 " at %d", pSyncNode->vgId, pData->rid,
62!
1016
           pData->destId.addr, pSyncTimer->timerMS);
1017

1018
    bool stopped = taosTmrResetPriority(pSyncTimer->timerCb, pSyncTimer->timerMS, (void*)(pData->rid),
62✔
1019
                                        syncEnv()->pTimerManager, &pSyncTimer->pTimer, 2);
62✔
1020
    if (stopped) {
62!
1021
      sError("vgId:%d, failed to reset hb timer success", pSyncNode->vgId);
×
1022
      return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1023
    }
1024
  } else {
1025
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1026
    sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId);
×
1027
  }
1028
  return code;
62✔
1029
}
1030

1031
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
535✔
1032
  int32_t ret = 0;
535✔
1033
  (void)atomic_add_fetch_64(&pSyncTimer->logicClock, 1);
535✔
1034
  bool stop = taosTmrStop(pSyncTimer->pTimer);
535✔
1035
  sDebug("vgId:%d, stop hb timer stop:%d", pSyncNode->vgId, stop);
535!
1036
  pSyncTimer->pTimer = NULL;
535✔
1037
  syncHbTimerDataRemove(pSyncTimer->hbDataRid);
535✔
1038
  pSyncTimer->hbDataRid = -1;
535✔
1039
  return ret;
535✔
1040
}
1041

1042
int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) {
340✔
1043
  int32_t code = 0;
340✔
1044
  if (pNode->pLogStore == NULL) {
340!
1045
    sError("vgId:%d, log store not created", pNode->vgId);
×
1046
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1047
  }
1048
  if (pNode->pFsm == NULL) {
340!
1049
    sError("vgId:%d, pFsm not registered", pNode->vgId);
×
1050
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1051
  }
1052
  if (pNode->pFsm->FpGetSnapshotInfo == NULL) {
340!
1053
    sError("vgId:%d, FpGetSnapshotInfo not registered", pNode->vgId);
×
1054
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1055
  }
1056
  SSnapshot snapshot = {0};
340✔
1057
  // TODO check return value
1058
  (void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
340✔
1059

1060
  SyncIndex commitIndex = snapshot.lastApplyIndex;
340✔
1061
  SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
340✔
1062
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
340✔
1063
  if ((lastVer < commitIndex || firstVer > commitIndex + 1) || pNode->fsmState == SYNC_FSM_STATE_INCOMPLETE) {
340!
1064
    if ((code = pNode->pLogStore->syncLogRestoreFromSnapshot(pNode->pLogStore, commitIndex)) != 0) {
1!
1065
      sError("vgId:%d, failed to restore log store from snapshot since %s. lastVer:%" PRId64 ", snapshotVer:%" PRId64,
×
1066
             pNode->vgId, terrstr(), lastVer, commitIndex);
1067
      TAOS_RETURN(code);
×
1068
    }
1069
  }
1070
  TAOS_RETURN(code);
340✔
1071
}
1072

1073
// open/close --------------
1074
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
340✔
1075
  int32_t    code = 0;
340✔
1076
  SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode));
340!
1077
  if (pSyncNode == NULL) {
340!
1078
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1079
    goto _error;
×
1080
  }
1081

1082
  if (!taosDirExist((char*)(pSyncInfo->path))) {
340✔
1083
    if (taosMkDir(pSyncInfo->path) != 0) {
260!
1084
      terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
1085
      sError("vgId:%d, failed to create dir:%s since %s", pSyncInfo->vgId, pSyncInfo->path, terrstr());
×
1086
      goto _error;
×
1087
    }
1088
  }
1089

1090
  memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
340✔
1091
  snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s%sraft_store.json", pSyncInfo->path,
340✔
1092
           TD_DIRSEP);
1093
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
340✔
1094

1095
  if (!taosCheckExistFile(pSyncNode->configPath)) {
340✔
1096
    // create a new raft config file
1097
    sInfo("vgId:%d, create a new raft config file", pSyncInfo->vgId);
260✔
1098
    pSyncNode->vgId = pSyncInfo->vgId;
260✔
1099
    pSyncNode->raftCfg.isStandBy = pSyncInfo->isStandBy;
260✔
1100
    pSyncNode->raftCfg.snapshotStrategy = pSyncInfo->snapshotStrategy;
260✔
1101
    pSyncNode->raftCfg.lastConfigIndex = pSyncInfo->syncCfg.lastIndex;
260✔
1102
    pSyncNode->raftCfg.batchSize = pSyncInfo->batchSize;
260✔
1103
    pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
260✔
1104
    pSyncNode->raftCfg.configIndexCount = 1;
260✔
1105
    pSyncNode->raftCfg.configIndexArr[0] = -1;
260✔
1106

1107
    if ((code = syncWriteCfgFile(pSyncNode)) != 0) {
260!
1108
      terrno = code;
×
1109
      sError("vgId:%d, failed to create sync cfg file", pSyncNode->vgId);
×
1110
      goto _error;
×
1111
    }
1112
  } else {
1113
    // update syncCfg by raft_config.json
1114
    if ((code = syncReadCfgFile(pSyncNode)) != 0) {
80!
1115
      terrno = code;
×
1116
      sError("vgId:%d, failed to read sync cfg file", pSyncNode->vgId);
×
1117
      goto _error;
×
1118
    }
1119

1120
    if (vnodeVersion > pSyncNode->raftCfg.cfg.changeVersion) {
80✔
1121
      if (pSyncInfo->syncCfg.totalReplicaNum > 0 && syncIsConfigChanged(&pSyncNode->raftCfg.cfg, &pSyncInfo->syncCfg)) {
47!
1122
        sInfo("vgId:%d, use sync config from input options and write to cfg file", pSyncNode->vgId);
36!
1123
        pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
36✔
1124
        if ((code = syncWriteCfgFile(pSyncNode)) != 0) {
36!
1125
          terrno = code;
×
1126
          sError("vgId:%d, failed to write sync cfg file", pSyncNode->vgId);
×
1127
          goto _error;
×
1128
        }
1129
      } else {
1130
        sInfo("vgId:%d, use sync config from sync cfg file", pSyncNode->vgId);
11!
1131
        pSyncInfo->syncCfg = pSyncNode->raftCfg.cfg;
11✔
1132
      }
1133
    } else {
1134
      sInfo("vgId:%d, skip save sync cfg file since request ver:%d <= file ver:%d", pSyncNode->vgId, vnodeVersion,
33!
1135
            pSyncInfo->syncCfg.changeVersion);
1136
    }
1137
  }
1138

1139
  // init by SSyncInfo
1140
  pSyncNode->vgId = pSyncInfo->vgId;
340✔
1141
  SSyncCfg* pCfg = &pSyncNode->raftCfg.cfg;
340✔
1142
  bool      updated = false;
340✔
1143
  sInfo("vgId:%d, start to open sync node, totalReplicaNum:%d replicaNum:%d selfIndex:%d", pSyncNode->vgId,
340✔
1144
        pCfg->totalReplicaNum, pCfg->replicaNum, pCfg->myIndex);
1145
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
825✔
1146
    SNodeInfo* pNode = &pCfg->nodeInfo[i];
485✔
1147
    if (tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort)) {
485!
1148
      updated = true;
×
1149
    }
1150
    sInfo("vgId:%d, index:%d ep:%s:%u dnode:%d cluster:%" PRId64, pSyncNode->vgId, i, pNode->nodeFqdn, pNode->nodePort,
485✔
1151
          pNode->nodeId, pNode->clusterId);
1152
  }
1153

1154
  if (vnodeVersion > pSyncInfo->syncCfg.changeVersion) {
340✔
1155
    if (updated) {
63!
1156
      sInfo("vgId:%d, save config info since dnode info changed", pSyncNode->vgId);
×
1157
      if ((code = syncWriteCfgFile(pSyncNode)) != 0) {
×
1158
        terrno = code;
×
1159
        sError("vgId:%d, failed to write sync cfg file on dnode info updated", pSyncNode->vgId);
×
1160
        goto _error;
×
1161
      }
1162
    }
1163
  }
1164

1165
  pSyncNode->pWal = pSyncInfo->pWal;
340✔
1166
  pSyncNode->msgcb = pSyncInfo->msgcb;
340✔
1167
  pSyncNode->syncSendMSg = pSyncInfo->syncSendMSg;
340✔
1168
  pSyncNode->syncEqMsg = pSyncInfo->syncEqMsg;
340✔
1169
  pSyncNode->syncEqCtrlMsg = pSyncInfo->syncEqCtrlMsg;
340✔
1170

1171
  // create raft log ring buffer
1172
  code = syncLogBufferCreate(&pSyncNode->pLogBuf);
340✔
1173
  if (pSyncNode->pLogBuf == NULL) {
340!
1174
    sError("failed to init sync log buffer since %s. vgId:%d", tstrerror(code), pSyncNode->vgId);
×
1175
    goto _error;
×
1176
  }
1177

1178
  // init replicaNum, replicasId
1179
  pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
340✔
1180
  pSyncNode->totalReplicaNum = pSyncNode->raftCfg.cfg.totalReplicaNum;
340✔
1181
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
825✔
1182
    if (syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]) ==
485!
1183
        false) {
1184
      sError("vgId:%d, failed to determine raft member id, replica:%d", pSyncNode->vgId, i);
×
1185
      goto _error;
×
1186
    }
1187
  }
1188

1189
  // init internal
1190
  pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
340✔
1191
  pSyncNode->myRaftId = pSyncNode->replicasId[pSyncNode->raftCfg.cfg.myIndex];
340✔
1192

1193
  // init peersNum, peers, peersId
1194
  pSyncNode->peersNum = pSyncNode->raftCfg.cfg.totalReplicaNum - 1;
340✔
1195
  int32_t j = 0;
340✔
1196
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
825✔
1197
    if (i != pSyncNode->raftCfg.cfg.myIndex) {
485✔
1198
      pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
146✔
1199
      pSyncNode->peersId[j] = pSyncNode->replicasId[i];
146✔
1200
      syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
146✔
1201
      j++;
146✔
1202
    }
1203
  }
1204

1205
  pSyncNode->arbTerm = -1;
340✔
1206
  (void)taosThreadMutexInit(&pSyncNode->arbTokenMutex, NULL);
340✔
1207
  syncUtilGenerateArbToken(pSyncNode->myNodeInfo.nodeId, pSyncInfo->vgId, pSyncNode->arbToken);
340✔
1208
  sInfo("vgId:%d, generate arb token:%s", pSyncNode->vgId, pSyncNode->arbToken);
340✔
1209

1210
  // init raft algorithm
1211
  pSyncNode->pFsm = pSyncInfo->pFsm;
340✔
1212
  pSyncInfo->pFsm = NULL;
340✔
1213
  pSyncNode->quorum = syncUtilQuorum(pSyncNode->raftCfg.cfg.replicaNum);
340✔
1214
  pSyncNode->leaderCache = EMPTY_RAFT_ID;
340✔
1215

1216
  // init life cycle outside
1217

1218
  // TLA+ Spec
1219
  // InitHistoryVars == /\ elections = {}
1220
  //                    /\ allLogs   = {}
1221
  //                    /\ voterLog  = [i \in Server |-> [j \in {} |-> <<>>]]
1222
  // InitServerVars == /\ currentTerm = [i \in Server |-> 1]
1223
  //                   /\ state       = [i \in Server |-> Follower]
1224
  //                   /\ votedFor    = [i \in Server |-> Nil]
1225
  // InitCandidateVars == /\ votesResponded = [i \in Server |-> {}]
1226
  //                      /\ votesGranted   = [i \in Server |-> {}]
1227
  // \* The values nextIndex[i][i] and matchIndex[i][i] are never read, since the
1228
  // \* leader does not send itself messages. It's still easier to include these
1229
  // \* in the functions.
1230
  // InitLeaderVars == /\ nextIndex  = [i \in Server |-> [j \in Server |-> 1]]
1231
  //                   /\ matchIndex = [i \in Server |-> [j \in Server |-> 0]]
1232
  // InitLogVars == /\ log          = [i \in Server |-> << >>]
1233
  //                /\ commitIndex  = [i \in Server |-> 0]
1234
  // Init == /\ messages = [m \in {} |-> 0]
1235
  //         /\ InitHistoryVars
1236
  //         /\ InitServerVars
1237
  //         /\ InitCandidateVars
1238
  //         /\ InitLeaderVars
1239
  //         /\ InitLogVars
1240
  //
1241

1242
  // init TLA+ server vars
1243
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
340✔
1244
  pSyncNode->roleTimeMs = taosGetTimestampMs();
340✔
1245
  if ((code = raftStoreOpen(pSyncNode)) != 0) {
340!
1246
    terrno = code;
×
1247
    sError("vgId:%d, failed to open raft store at path %s", pSyncNode->vgId, pSyncNode->raftStorePath);
×
1248
    goto _error;
×
1249
  }
1250

1251
  // init TLA+ candidate vars
1252
  pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode);
340✔
1253
  if (pSyncNode->pVotesGranted == NULL) {
340!
1254
    sError("vgId:%d, failed to create VotesGranted", pSyncNode->vgId);
×
1255
    goto _error;
×
1256
  }
1257
  pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode);
340✔
1258
  if (pSyncNode->pVotesRespond == NULL) {
340!
1259
    sError("vgId:%d, failed to create VotesRespond", pSyncNode->vgId);
×
1260
    goto _error;
×
1261
  }
1262

1263
  // init TLA+ leader vars
1264
  pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode);
340✔
1265
  if (pSyncNode->pNextIndex == NULL) {
340!
1266
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
×
1267
    goto _error;
×
1268
  }
1269
  pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode);
340✔
1270
  if (pSyncNode->pMatchIndex == NULL) {
340!
1271
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
×
1272
    goto _error;
×
1273
  }
1274

1275
  // init TLA+ log vars
1276
  pSyncNode->pLogStore = logStoreCreate(pSyncNode);
340✔
1277
  if (pSyncNode->pLogStore == NULL) {
339!
1278
    sError("vgId:%d, failed to create SyncLogStore", pSyncNode->vgId);
×
1279
    goto _error;
×
1280
  }
1281

1282
  SyncIndex commitIndex = SYNC_INDEX_INVALID;
339✔
1283
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
339!
1284
    SSnapshot snapshot = {0};
339✔
1285
    // TODO check return value
1286
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
339✔
1287
    if (snapshot.lastApplyIndex > commitIndex) {
340✔
1288
      commitIndex = snapshot.lastApplyIndex;
49✔
1289
      sNTrace(pSyncNode, "reset commit index by snapshot");
49!
1290
    }
1291
    pSyncNode->fsmState = snapshot.state;
340✔
1292
    if (pSyncNode->fsmState == SYNC_FSM_STATE_INCOMPLETE) {
340!
1293
      sError("vgId:%d, fsm state is incomplete.", pSyncNode->vgId);
×
1294
      if (pSyncNode->replicaNum == 1) {
×
1295
        terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1296
        goto _error;
×
1297
      }
1298
    }
1299
  }
1300
  pSyncNode->commitIndex = commitIndex;
340✔
1301
  sInfo("vgId:%d, sync node commitIndex initialized as %" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
340✔
1302

1303
  // restore log store on need
1304
  if ((code = syncNodeLogStoreRestoreOnNeed(pSyncNode)) < 0) {
340!
1305
    terrno = code;
×
1306
    sError("vgId:%d, failed to restore log store since %s.", pSyncNode->vgId, terrstr());
×
1307
    goto _error;
×
1308
  }
1309

1310
  // timer ms init
1311
  pSyncNode->pingBaseLine = PING_TIMER_MS;
340✔
1312
  pSyncNode->electBaseLine = tsElectInterval;
340✔
1313
  pSyncNode->hbBaseLine = tsHeartbeatInterval;
340✔
1314

1315
  // init ping timer
1316
  pSyncNode->pPingTimer = NULL;
340✔
1317
  pSyncNode->pingTimerMS = pSyncNode->pingBaseLine;
340✔
1318
  atomic_store_64(&pSyncNode->pingTimerLogicClock, 0);
340✔
1319
  atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0);
340✔
1320
  pSyncNode->FpPingTimerCB = syncNodeEqPingTimer;
340✔
1321
  pSyncNode->pingTimerCounter = 0;
340✔
1322

1323
  // init elect timer
1324
  pSyncNode->pElectTimer = NULL;
340✔
1325
  pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
340✔
1326
  atomic_store_64(&pSyncNode->electTimerLogicClock, 0);
340✔
1327
  pSyncNode->FpElectTimerCB = syncNodeEqElectTimer;
340✔
1328
  pSyncNode->electTimerCounter = 0;
340✔
1329

1330
  // init heartbeat timer
1331
  pSyncNode->pHeartbeatTimer = NULL;
340✔
1332
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
340✔
1333
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0);
340✔
1334
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0);
340✔
1335
#ifdef BUILD_NO_CALL
1336
  pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer;
1337
#endif
1338
  pSyncNode->heartbeatTimerCounter = 0;
340✔
1339

1340
  // init peer heartbeat timer
1341
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
5,440✔
1342
    if ((code = syncHbTimerInit(pSyncNode, &(pSyncNode->peerHeartbeatTimerArr[i]), (pSyncNode->replicasId)[i])) != 0) {
5,100!
1343
      terrno = code;
×
1344
      goto _error;
×
1345
    }
1346
  }
1347

1348
  // tools
1349
  if ((code = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS, &pSyncNode->pSyncRespMgr)) != 0) {
340!
1350
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
×
1351
    goto _error;
×
1352
  }
1353
  if (pSyncNode->pSyncRespMgr == NULL) {
340!
1354
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
×
1355
    goto _error;
×
1356
  }
1357

1358
  // restore state
1359
  pSyncNode->restoreFinish = false;
340✔
1360

1361
  // snapshot senders
1362
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
5,440✔
1363
    SSyncSnapshotSender* pSender = NULL;
5,099✔
1364
    code = snapshotSenderCreate(pSyncNode, i, &pSender);
5,099✔
1365
    if (pSender == NULL) return NULL;
5,099!
1366

1367
    pSyncNode->senders[i] = pSender;
5,099✔
1368
    sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender);
5,099✔
1369
  }
1370

1371
  // snapshot receivers
1372
  code = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID, &pSyncNode->pNewNodeReceiver);
341✔
1373
  if (pSyncNode->pNewNodeReceiver == NULL) return NULL;
340!
1374
  sRDebug(pSyncNode->pNewNodeReceiver, "snapshot receiver create while open sync node, data:%p",
340✔
1375
          pSyncNode->pNewNodeReceiver);
1376

1377
  // is config changing
1378
  pSyncNode->changing = false;
340✔
1379

1380
  // replication mgr
1381
  if ((code = syncNodeLogReplInit(pSyncNode)) < 0) {
340!
1382
    terrno = code;
×
1383
    sError("vgId:%d, failed to init repl mgr since %s.", pSyncNode->vgId, terrstr());
×
1384
    goto _error;
×
1385
  }
1386

1387
  // peer state
1388
  if ((code = syncNodePeerStateInit(pSyncNode)) < 0) {
340!
1389
    terrno = code;
×
1390
    sError("vgId:%d, failed to init peer stat since %s.", pSyncNode->vgId, terrstr());
×
1391
    goto _error;
×
1392
  }
1393

1394
  //
1395
  // min match index
1396
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
340✔
1397

1398
  // start in syncNodeStart
1399
  // start raft
1400

1401
  int64_t timeNow = taosGetTimestampMs();
340✔
1402
  pSyncNode->startTime = timeNow;
340✔
1403
  pSyncNode->lastReplicateTime = timeNow;
340✔
1404

1405
  // snapshotting
1406
  atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
340✔
1407

1408
  // init log buffer
1409
  if ((code = syncLogBufferInit(pSyncNode->pLogBuf, pSyncNode)) < 0) {
340!
1410
    terrno = code;
×
1411
    sError("vgId:%d, failed to init sync log buffer since %s", pSyncNode->vgId, terrstr());
×
1412
    goto _error;
×
1413
  }
1414

1415
  pSyncNode->isStart = true;
340✔
1416
  pSyncNode->electNum = 0;
340✔
1417
  pSyncNode->becomeLeaderNum = 0;
340✔
1418
  pSyncNode->becomeAssignedLeaderNum = 0;
340✔
1419
  pSyncNode->configChangeNum = 0;
340✔
1420
  pSyncNode->hbSlowNum = 0;
340✔
1421
  pSyncNode->hbrSlowNum = 0;
340✔
1422
  pSyncNode->tmrRoutineNum = 0;
340✔
1423

1424
  sNInfo(pSyncNode, "sync node opened, node:%p electInterval:%d heartbeatInterval:%d heartbeatTimeout:%d", pSyncNode,
340✔
1425
         tsElectInterval, tsHeartbeatInterval, tsHeartbeatTimeout);
1426
  return pSyncNode;
340✔
1427

1428
_error:
×
1429
  if (pSyncInfo->pFsm) {
×
1430
    taosMemoryFree(pSyncInfo->pFsm);
×
1431
    pSyncInfo->pFsm = NULL;
×
1432
  }
1433
  syncNodeClose(pSyncNode);
×
1434
  pSyncNode = NULL;
×
1435
  return NULL;
×
1436
}
1437

1438
#ifdef BUILD_NO_CALL
1439
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
1440
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
1441
    SSnapshot snapshot = {0};
1442
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
1443
    if (snapshot.lastApplyIndex > pSyncNode->commitIndex) {
1444
      pSyncNode->commitIndex = snapshot.lastApplyIndex;
1445
    }
1446
  }
1447
}
1448
#endif
1449

1450
int32_t syncNodeRestore(SSyncNode* pSyncNode) {
337✔
1451
  int32_t code = 0;
337✔
1452
  if (pSyncNode->pLogStore == NULL) {
337!
1453
    sError("vgId:%d, log store not created", pSyncNode->vgId);
×
1454
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1455
  }
1456
  if (pSyncNode->pLogBuf == NULL) {
337!
1457
    sError("vgId:%d, ring log buffer not created", pSyncNode->vgId);
×
1458
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1459
  }
1460

1461
  (void)taosThreadMutexLock(&pSyncNode->pLogBuf->mutex);
337✔
1462
  SyncIndex lastVer = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
336✔
1463
  SyncIndex commitIndex = pSyncNode->pLogStore->syncLogCommitIndex(pSyncNode->pLogStore);
336✔
1464
  SyncIndex endIndex = pSyncNode->pLogBuf->endIndex;
337✔
1465
  (void)taosThreadMutexUnlock(&pSyncNode->pLogBuf->mutex);
337✔
1466

1467
  if (lastVer != -1 && endIndex != lastVer + 1) {
337!
1468
    code = TSDB_CODE_WAL_LOG_INCOMPLETE;
×
1469
    sWarn("vgId:%d, failed to restore sync node since %s. expected lastLogIndex:%" PRId64 ", lastVer:%" PRId64,
×
1470
          pSyncNode->vgId, terrstr(), endIndex - 1, lastVer);
1471
    // TAOS_RETURN(code);
1472
  }
1473

1474
  // if (endIndex != lastVer + 1) return TSDB_CODE_SYN_INTERNAL_ERROR;
1475
  pSyncNode->commitIndex = TMAX(pSyncNode->commitIndex, commitIndex);
337✔
1476
  sInfo("vgId:%d, restore began, and keep syncing until commitIndex:%" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
337✔
1477

1478
  if (pSyncNode->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
674!
1479
      (code = syncLogBufferCommit(pSyncNode->pLogBuf, pSyncNode, pSyncNode->commitIndex, NULL, "restore")) < 0) {
337✔
1480
    TAOS_RETURN(code);
×
1481
  }
1482

1483
  TAOS_RETURN(code);
337✔
1484
}
1485

1486
int32_t syncNodeStart(SSyncNode* pSyncNode) {
337✔
1487
  // start raft
1488
  sInfo("vgId:%d, begin to start sync node", pSyncNode->vgId);
337✔
1489
  if (pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole == TAOS_SYNC_ROLE_LEARNER) {
337✔
1490
    syncNodeBecomeLearner(pSyncNode, "first start");
8✔
1491
  } else {
1492
    if (pSyncNode->replicaNum == 1) {
329✔
1493
      raftStoreNextTerm(pSyncNode);
255✔
1494
      syncNodeBecomeLeader(pSyncNode, "one replica start");
255✔
1495

1496
      // Raft 3.6.2 Committing entries from previous terms
1497
      TAOS_CHECK_RETURN(syncNodeAppendNoop(pSyncNode));
255!
1498
    } else {
1499
      SRaftId id = {0};
74✔
1500
      syncNodeBecomeFollower(pSyncNode, id, "first start");
74✔
1501
    }
1502
  }
1503

1504
  int32_t ret = 0;
337✔
1505
  ret = syncNodeStartPingTimer(pSyncNode);
337✔
1506
  if (ret != 0) {
337!
1507
    sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, tstrerror(ret));
×
1508
  }
1509
  sInfo("vgId:%d, sync node started", pSyncNode->vgId);
337✔
1510
  return ret;
337✔
1511
}
1512

1513
#ifdef BUILD_NO_CALL
1514
int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) {
1515
  // state change
1516
  int32_t code = 0;
1517
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
1518
  pSyncNode->roleTimeMs = taosGetTimestampMs();
1519
  // TODO check return value
1520
  TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
1521

1522
  // reset elect timer, long enough
1523
  int32_t electMS = TIMER_MAX_MS;
1524
  code = syncNodeRestartElectTimer(pSyncNode, electMS);
1525
  if (code < 0) {
1526
    sError("vgId:%d, failed to restart elect timer since %s", pSyncNode->vgId, terrstr());
1527
    return -1;
1528
  }
1529

1530
  code = syncNodeStartPingTimer(pSyncNode);
1531
  if (code < 0) {
1532
    sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, terrstr());
1533
    return -1;
1534
  }
1535
  return code;
1536
}
1537
#endif
1538

1539
void syncNodePreClose(SSyncNode* pSyncNode) {
337✔
1540
  int32_t code = 0;
337✔
1541
  if (pSyncNode == NULL) {
337!
1542
    sError("failed to pre close sync node since sync node is null");
×
1543
    return;
×
1544
  }
1545
  if (pSyncNode->pFsm == NULL) {
337!
1546
    sError("failed to pre close sync node since fsm is null");
×
1547
    return;
×
1548
  }
1549
  if (pSyncNode->pFsm->FpApplyQueueItems == NULL) {
337!
1550
    sError("failed to pre close sync node since FpApplyQueueItems is null");
×
1551
    return;
×
1552
  }
1553

1554
  // stop elect timer
1555
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
337!
1556
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
1557
    return;
×
1558
  }
1559

1560
  // stop heartbeat timer
1561
  if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
337!
1562
    sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
1563
    return;
×
1564
  }
1565

1566
  // stop ping timer
1567
  if ((code = syncNodeStopPingTimer(pSyncNode)) != 0) {
337!
1568
    sError("vgId:%d, failed to stop ping timer since %s", pSyncNode->vgId, tstrerror(code));
×
1569
    return;
×
1570
  }
1571

1572
  // clean rsp
1573
  syncRespCleanRsp(pSyncNode->pSyncRespMgr);
337✔
1574
}
1575

1576
void syncNodePostClose(SSyncNode* pSyncNode) {
277✔
1577
  if (pSyncNode->pNewNodeReceiver != NULL) {
277!
1578
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
277!
1579
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
1580
    }
1581

1582
    sDebug("vgId:%d, snapshot receiver destroy while preclose sync node, data:%p", pSyncNode->vgId,
277✔
1583
           pSyncNode->pNewNodeReceiver);
1584
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
277✔
1585
    pSyncNode->pNewNodeReceiver = NULL;
277✔
1586
  }
1587
}
277✔
1588

1589
void syncHbTimerDataFree(SSyncHbTimerData* pData) { taosMemoryFree(pData); }
62!
1590

1591
void syncNodeClose(SSyncNode* pSyncNode) {
337✔
1592
  int32_t code = 0;
337✔
1593
  if (pSyncNode == NULL) return;
337!
1594
  sNInfo(pSyncNode, "sync close, node:%p", pSyncNode);
337✔
1595

1596
  syncRespCleanRsp(pSyncNode->pSyncRespMgr);
337✔
1597

1598
  if ((code = syncNodeStopPingTimer(pSyncNode)) != 0) {
337!
1599
    sError("vgId:%d, failed to stop ping timer since %s", pSyncNode->vgId, tstrerror(code));
×
1600
    return;
×
1601
  }
1602
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
337!
1603
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
1604
    return;
×
1605
  }
1606
  if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
337!
1607
    sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
1608
    return;
×
1609
  }
1610
  syncNodeLogReplDestroy(pSyncNode);
337✔
1611

1612
  syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
337✔
1613
  pSyncNode->pSyncRespMgr = NULL;
337✔
1614
  voteGrantedDestroy(pSyncNode->pVotesGranted);
337✔
1615
  pSyncNode->pVotesGranted = NULL;
337✔
1616
  votesRespondDestory(pSyncNode->pVotesRespond);
337✔
1617
  pSyncNode->pVotesRespond = NULL;
337✔
1618
  syncIndexMgrDestroy(pSyncNode->pNextIndex);
337✔
1619
  pSyncNode->pNextIndex = NULL;
337✔
1620
  syncIndexMgrDestroy(pSyncNode->pMatchIndex);
337✔
1621
  pSyncNode->pMatchIndex = NULL;
337✔
1622
  logStoreDestory(pSyncNode->pLogStore);
337✔
1623
  pSyncNode->pLogStore = NULL;
337✔
1624
  syncLogBufferDestroy(pSyncNode->pLogBuf);
337✔
1625
  pSyncNode->pLogBuf = NULL;
337✔
1626

1627
  (void)taosThreadMutexDestroy(&pSyncNode->arbTokenMutex);
337✔
1628

1629
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
5,392✔
1630
    if (pSyncNode->senders[i] != NULL) {
5,055!
1631
      sDebug("vgId:%d, snapshot sender destroy while close, data:%p", pSyncNode->vgId, pSyncNode->senders[i]);
5,055✔
1632

1633
      if (snapshotSenderIsStart(pSyncNode->senders[i])) {
5,055!
1634
        snapshotSenderStop(pSyncNode->senders[i], false);
×
1635
      }
1636

1637
      snapshotSenderDestroy(pSyncNode->senders[i]);
5,055✔
1638
      pSyncNode->senders[i] = NULL;
5,055✔
1639
    }
1640
  }
1641

1642
  if (pSyncNode->pNewNodeReceiver != NULL) {
337✔
1643
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
60!
1644
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
1645
    }
1646

1647
    sDebug("vgId:%d, snapshot receiver destroy while close, data:%p", pSyncNode->vgId, pSyncNode->pNewNodeReceiver);
60✔
1648
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
60✔
1649
    pSyncNode->pNewNodeReceiver = NULL;
60✔
1650
  }
1651

1652
  if (pSyncNode->pFsm != NULL) {
337!
1653
    taosMemoryFree(pSyncNode->pFsm);
337!
1654
  }
1655

1656
  raftStoreClose(pSyncNode);
337✔
1657

1658
  taosMemoryFree(pSyncNode);
337!
1659
}
1660

1661
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->raftCfg.snapshotStrategy; }
×
1662

1663
// timer control --------------
1664
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
337✔
1665
  int32_t code = 0;
337✔
1666
  if (syncIsInit()) {
337!
1667
    bool stopped = taosTmrResetPriority(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, (void*)pSyncNode->rid,
337✔
1668
                                        syncEnv()->pTimerManager, &pSyncNode->pPingTimer, 2);
337✔
1669
    if (stopped) {
337!
1670
      sError("vgId:%d, failed to reset ping timer, ms:%d", pSyncNode->vgId, pSyncNode->pingTimerMS);
×
1671
      return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1672
    }
1673
    atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
337✔
1674
  } else {
1675
    sError("vgId:%d, start ping timer error, sync env is stop", pSyncNode->vgId);
×
1676
  }
1677
  return code;
337✔
1678
}
1679

1680
int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
674✔
1681
  int32_t code = 0;
674✔
1682
  (void)atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
674✔
1683
  bool stop = taosTmrStop(pSyncNode->pPingTimer);
674✔
1684
  sDebug("vgId:%d, stop ping timer, stop:%d", pSyncNode->vgId, stop);
674✔
1685
  pSyncNode->pPingTimer = NULL;
674✔
1686
  return code;
674✔
1687
}
1688

1689
int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
68,441✔
1690
  int32_t code = 0;
68,441✔
1691
  if (syncIsInit()) {
68,441!
1692
    pSyncNode->electTimerMS = ms;
68,441✔
1693

1694
    int64_t execTime = taosGetTimestampMs() + ms;
68,441✔
1695
    atomic_store_64(&(pSyncNode->electTimerParam.executeTime), execTime);
68,441✔
1696
    atomic_store_64(&(pSyncNode->electTimerParam.logicClock), pSyncNode->electTimerLogicClock);
68,441✔
1697
    pSyncNode->electTimerParam.pSyncNode = pSyncNode;
68,441✔
1698
    pSyncNode->electTimerParam.pData = NULL;
68,441✔
1699

1700
    bool stopped = taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, (void*)(pSyncNode->rid),
68,440✔
1701
                                syncEnv()->pTimerManager, &pSyncNode->pElectTimer);
68,441✔
1702
    if (stopped) sError("vgId:%d, failed to reset elect timer, ms:%d", pSyncNode->vgId, ms);
68,441!
1703
  } else {
1704
    sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
×
1705
  }
1706
  return code;
68,441✔
1707
}
1708

1709
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
69,397✔
1710
  int32_t code = 0;
69,397✔
1711
  (void)atomic_add_fetch_64(&pSyncNode->electTimerLogicClock, 1);
69,397✔
1712
  bool stop = taosTmrStop(pSyncNode->pElectTimer);
69,397✔
1713
  sTrace("vgId:%d, stop elect timer, stop:%d", pSyncNode->vgId, stop);
69,397✔
1714
  pSyncNode->pElectTimer = NULL;
69,396✔
1715

1716
  return code;
69,396✔
1717
}
1718

1719
int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
68,441✔
1720
  int32_t ret = 0;
68,441✔
1721
  TAOS_CHECK_RETURN(syncNodeStopElectTimer(pSyncNode));
68,441!
1722
  TAOS_CHECK_RETURN(syncNodeStartElectTimer(pSyncNode, ms));
68,441!
1723
  return ret;
68,441✔
1724
}
1725

1726
void syncNodeResetElectTimer(SSyncNode* pSyncNode) {
68,439✔
1727
  int32_t code = 0;
68,439✔
1728
  int32_t electMS;
1729

1730
  if (pSyncNode->raftCfg.isStandBy) {
68,439!
1731
    electMS = TIMER_MAX_MS;
×
1732
  } else {
1733
    electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
68,439✔
1734
  }
1735

1736
  if ((code = syncNodeRestartElectTimer(pSyncNode, electMS)) != 0) {
68,441!
1737
    sWarn("vgId:%d, failed to restart elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
1738
    return;
×
1739
  };
1740

1741
  sNTrace(pSyncNode, "reset elect timer, min:%d, max:%d, ms:%d", pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine,
68,441!
1742
          electMS);
1743
}
1744

1745
#ifdef BUILD_NO_CALL
1746
static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) {
1747
  int32_t code = 0;
1748
  if (syncIsInit()) {
1749
    TAOS_CHECK_RETURN(taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, (void*)pSyncNode->rid,
1750
                                   syncEnv()->pTimerManager, &pSyncNode->pHeartbeatTimer));
1751
    atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
1752
  } else {
1753
    sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId);
1754
  }
1755

1756
  sNTrace(pSyncNode, "start heartbeat timer, ms:%d", pSyncNode->heartbeatTimerMS);
1757
  return code;
1758
}
1759
#endif
1760

1761
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
333✔
1762
  int32_t ret = 0;
333✔
1763

1764
#if 0
1765
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
1766
  ret = syncNodeDoStartHeartbeatTimer(pSyncNode);
1767
#endif
1768

1769
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
395✔
1770
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
62✔
1771
    if (pSyncTimer != NULL) {
62!
1772
      TAOS_CHECK_RETURN(syncHbTimerStart(pSyncNode, pSyncTimer));
62!
1773
    }
1774
  }
1775

1776
  return ret;
333✔
1777
}
1778

1779
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
854✔
1780
  int32_t code = 0;
854✔
1781

1782
#if 0
1783
  TAOS_CHECK_RETURN(atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1));
1784
  bool stop = taosTmrStop(pSyncNode->pHeartbeatTimer);
1785
  sDebug("vgId:%d, stop heartbeat timer, stop:%d", pSyncNode->vgId, stop);
1786
  pSyncNode->pHeartbeatTimer = NULL;
1787
#endif
1788

1789
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1,389✔
1790
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
535✔
1791
    if (pSyncTimer != NULL) {
535!
1792
      TAOS_CHECK_RETURN(syncHbTimerStop(pSyncNode, pSyncTimer));
535!
1793
    }
1794
  }
1795

1796
  return code;
854✔
1797
}
1798

1799
#ifdef BUILD_NO_CALL
1800
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) {
1801
  // TODO check return value
1802
  int32_t code = 0;
1803
  TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
1804
  TAOS_CHECK_RETURN(syncNodeStartHeartbeatTimer(pSyncNode));
1805
  return 0;
1806
}
1807
#endif
1808

1809
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pNode, SRpcMsg* pMsg) {
137,079✔
1810
  SEpSet* epSet = NULL;
137,079✔
1811
  for (int32_t i = 0; i < pNode->peersNum; ++i) {
192,456!
1812
    if (destRaftId->addr == pNode->peersId[i].addr) {
192,461✔
1813
      epSet = &pNode->peersEpset[i];
137,084✔
1814
      break;
137,084✔
1815
    }
1816
  }
1817

1818
  int32_t code = -1;
137,079✔
1819
  if (pNode->syncSendMSg != NULL && epSet != NULL) {
137,079!
1820
    syncUtilMsgHtoN(pMsg->pCont);
137,087✔
1821
    pMsg->info.noResp = 1;
137,081✔
1822
    code = pNode->syncSendMSg(epSet, pMsg);
137,081✔
1823
  }
1824

1825
  if (code < 0) {
137,087!
1826
    sError("vgId:%d, failed to send sync msg since %s. epset:%p dnode:%d addr:0x%" PRIx64, pNode->vgId, tstrerror(code),
×
1827
           epSet, DID(destRaftId), destRaftId->addr);
1828
    rpcFreeCont(pMsg->pCont);
×
1829
  }
1830

1831
  TAOS_RETURN(code);
137,087✔
1832
}
1833

1834
inline bool syncNodeInConfig(SSyncNode* pNode, const SSyncCfg* pCfg) {
83✔
1835
  bool b1 = false;
83✔
1836
  bool b2 = false;
83✔
1837

1838
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
101!
1839
    if (strcmp(pCfg->nodeInfo[i].nodeFqdn, pNode->myNodeInfo.nodeFqdn) == 0 &&
101!
1840
        pCfg->nodeInfo[i].nodePort == pNode->myNodeInfo.nodePort) {
101✔
1841
      b1 = true;
83✔
1842
      break;
83✔
1843
    }
1844
  }
1845

1846
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
101!
1847
    SRaftId raftId = {
101✔
1848
        .addr = SYNC_ADDR(&pCfg->nodeInfo[i]),
101✔
1849
        .vgId = pNode->vgId,
101✔
1850
    };
1851

1852
    if (syncUtilSameId(&raftId, &pNode->myRaftId)) {
101✔
1853
      b2 = true;
83✔
1854
      break;
83✔
1855
    }
1856
  }
1857

1858
  if (b1 != b2) {
83!
1859
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1860
    return false;
×
1861
  }
1862
  return b1;
83✔
1863
}
1864

1865
static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg) {
95✔
1866
  if (pOldCfg->totalReplicaNum != pNewCfg->totalReplicaNum) return true;
95✔
1867
  if (pOldCfg->myIndex != pNewCfg->myIndex) return true;
65!
1868
  for (int32_t i = 0; i < pOldCfg->totalReplicaNum; ++i) {
142✔
1869
    const SNodeInfo* pOldInfo = &pOldCfg->nodeInfo[i];
95✔
1870
    const SNodeInfo* pNewInfo = &pNewCfg->nodeInfo[i];
95✔
1871
    if (strcmp(pOldInfo->nodeFqdn, pNewInfo->nodeFqdn) != 0) return true;
95!
1872
    if (pOldInfo->nodePort != pNewInfo->nodePort) return true;
95!
1873
    if (pOldInfo->nodeRole != pNewInfo->nodeRole) return true;
95✔
1874
  }
1875

1876
  return false;
47✔
1877
}
1878

1879
int32_t syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) {
59✔
1880
  int32_t  code = 0;
59✔
1881
  SSyncCfg oldConfig = pSyncNode->raftCfg.cfg;
59✔
1882
  if (!syncIsConfigChanged(&oldConfig, pNewConfig)) {
59✔
1883
    sInfo("vgId:1, sync not reconfig since not changed");
47✔
1884
    return 0;
47✔
1885
  }
1886

1887
  pSyncNode->raftCfg.cfg = *pNewConfig;
12✔
1888
  pSyncNode->raftCfg.lastConfigIndex = lastConfigChangeIndex;
12✔
1889

1890
  pSyncNode->configChangeNum++;
12✔
1891

1892
  bool IamInOld = syncNodeInConfig(pSyncNode, &oldConfig);
12✔
1893
  bool IamInNew = syncNodeInConfig(pSyncNode, pNewConfig);
12✔
1894

1895
  bool isDrop = false;
12✔
1896
  bool isAdd = false;
12✔
1897

1898
  if (IamInOld && !IamInNew) {
12!
1899
    isDrop = true;
×
1900
  } else {
1901
    isDrop = false;
12✔
1902
  }
1903

1904
  if (!IamInOld && IamInNew) {
12!
1905
    isAdd = true;
×
1906
  } else {
1907
    isAdd = false;
12✔
1908
  }
1909

1910
  // log begin config change
1911
  sNInfo(pSyncNode, "begin do config change, from %d to %d, from %" PRId64 " to %" PRId64 ", replicas:%d",
12!
1912
         pSyncNode->vgId, oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum, oldConfig.lastIndex,
1913
         pNewConfig->lastIndex);
1914

1915
  if (IamInNew) {
12!
1916
    pSyncNode->raftCfg.isStandBy = 0;  // change isStandBy to normal
12✔
1917
  }
1918
  if (isDrop) {
12!
1919
    pSyncNode->raftCfg.isStandBy = 1;  // set standby
×
1920
  }
1921

1922
  // add last config index
1923
  SRaftCfg* pCfg = &pSyncNode->raftCfg;
12✔
1924
  if (pCfg->configIndexCount >= MAX_CONFIG_INDEX_COUNT) {
12!
1925
    sNError(pSyncNode, "failed to add cfg index:%d since out of range", pCfg->configIndexCount);
×
1926
    terrno = TSDB_CODE_OUT_OF_RANGE;
×
1927
    return -1;
×
1928
  }
1929

1930
  pCfg->configIndexArr[pCfg->configIndexCount] = lastConfigChangeIndex;
12✔
1931
  pCfg->configIndexCount++;
12✔
1932

1933
  if (IamInNew) {
12!
1934
    //-----------------------------------------
1935
    int32_t ret = 0;
12✔
1936

1937
    // save snapshot senders
1938
    SRaftId oldReplicasId[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
1939
    memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId));
12✔
1940
    SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
1941
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
192✔
1942
      oldSenders[i] = pSyncNode->senders[i];
180✔
1943
      sSTrace(oldSenders[i], "snapshot sender save old");
180!
1944
    }
1945

1946
    // init internal
1947
    pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
12✔
1948
    if (syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId) == false) return terrno;
12!
1949

1950
    // init peersNum, peers, peersId
1951
    pSyncNode->peersNum = pSyncNode->raftCfg.cfg.totalReplicaNum - 1;
12✔
1952
    int32_t j = 0;
12✔
1953
    for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
44✔
1954
      if (i != pSyncNode->raftCfg.cfg.myIndex) {
32✔
1955
        pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
20✔
1956
        syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
20✔
1957
        j++;
20✔
1958
      }
1959
    }
1960
    for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
32✔
1961
      if (syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]) == false)
20!
1962
        return terrno;
×
1963
    }
1964

1965
    // init replicaNum, replicasId
1966
    pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
12✔
1967
    pSyncNode->totalReplicaNum = pSyncNode->raftCfg.cfg.totalReplicaNum;
12✔
1968
    for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
44✔
1969
      if (syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]) ==
32!
1970
          false)
1971
        return terrno;
×
1972
    }
1973

1974
    // update quorum first
1975
    pSyncNode->quorum = syncUtilQuorum(pSyncNode->raftCfg.cfg.replicaNum);
12✔
1976

1977
    syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode);
12✔
1978
    syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode);
12✔
1979
    voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode);
12✔
1980
    votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode);
12✔
1981

1982
    // reset snapshot senders
1983

1984
    // clear new
1985
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
192✔
1986
      pSyncNode->senders[i] = NULL;
180✔
1987
    }
1988

1989
    // reset new
1990
    for (int32_t i = 0; i < pSyncNode->totalReplicaNum; ++i) {
44✔
1991
      // reset sender
1992
      bool reset = false;
32✔
1993
      for (int32_t j = 0; j < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++j) {
140✔
1994
        if (syncUtilSameId(&(pSyncNode->replicasId)[i], &oldReplicasId[j]) && oldSenders[j] != NULL) {
134!
1995
          sNTrace(pSyncNode, "snapshot sender reset for:%" PRId64 ", newIndex:%d, dnode:%d, %p",
26!
1996
                  (pSyncNode->replicasId)[i].addr, i, DID(&pSyncNode->replicasId[i]), oldSenders[j]);
1997

1998
          pSyncNode->senders[i] = oldSenders[j];
26✔
1999
          oldSenders[j] = NULL;
26✔
2000
          reset = true;
26✔
2001

2002
          // reset replicaIndex
2003
          int32_t oldreplicaIndex = pSyncNode->senders[i]->replicaIndex;
26✔
2004
          pSyncNode->senders[i]->replicaIndex = i;
26✔
2005

2006
          sNTrace(pSyncNode, "snapshot sender udpate replicaIndex from %d to %d, dnode:%d, %p, reset:%d",
26!
2007
                  oldreplicaIndex, i, DID(&pSyncNode->replicasId[i]), pSyncNode->senders[i], reset);
2008

2009
          break;
26✔
2010
        }
2011
      }
2012
    }
2013

2014
    // create new
2015
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
192✔
2016
      if (pSyncNode->senders[i] == NULL) {
180✔
2017
        TAOS_CHECK_RETURN(snapshotSenderCreate(pSyncNode, i, &pSyncNode->senders[i]));
154!
2018
        if (pSyncNode->senders[i] == NULL) {
154!
2019
          // will be created later while send snapshot
2020
          sSError(pSyncNode->senders[i], "snapshot sender create failed while reconfig");
×
2021
        } else {
2022
          sSDebug(pSyncNode->senders[i], "snapshot sender create while reconfig, data:%p", pSyncNode->senders[i]);
154!
2023
        }
2024
      } else {
2025
        sSDebug(pSyncNode->senders[i], "snapshot sender already exist, data:%p", pSyncNode->senders[i]);
26!
2026
      }
2027
    }
2028

2029
    // free old
2030
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
192✔
2031
      if (oldSenders[i] != NULL) {
180✔
2032
        sSDebug(oldSenders[i], "snapshot sender destroy old, data:%p replica-index:%d", oldSenders[i], i);
154!
2033
        snapshotSenderDestroy(oldSenders[i]);
154✔
2034
        oldSenders[i] = NULL;
154✔
2035
      }
2036
    }
2037

2038
    // persist cfg
2039
    TAOS_CHECK_RETURN(syncWriteCfgFile(pSyncNode));
12!
2040
  } else {
2041
    // persist cfg
2042
    TAOS_CHECK_RETURN(syncWriteCfgFile(pSyncNode));
×
2043
    sNInfo(pSyncNode, "do not config change from %d to %d", oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum);
×
2044
  }
2045

2046
_END:
×
2047
  // log end config change
2048
  sNInfo(pSyncNode, "end do config change, from %d to %d", oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum);
12!
2049
  return 0;
12✔
2050
}
2051

2052
// raft state change --------------
2053
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
×
2054
  if (term > raftStoreGetTerm(pSyncNode)) {
×
2055
    raftStoreSetTerm(pSyncNode, term);
×
2056
  }
2057
}
×
2058

2059
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm, SRaftId id) {
64,091✔
2060
  SyncTerm currentTerm = raftStoreGetTerm(pSyncNode);
64,091✔
2061
  if (currentTerm > newTerm) {
64,092!
2062
    sNTrace(pSyncNode, "step down, ignore, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
×
2063
    return;
×
2064
  }
2065

2066
  do {
2067
    sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
64,092!
2068
  } while (0);
2069

2070
  if (pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
64,092!
2071
    (void)taosThreadMutexLock(&pSyncNode->arbTokenMutex);
×
2072
    syncUtilGenerateArbToken(pSyncNode->myNodeInfo.nodeId, pSyncNode->vgId, pSyncNode->arbToken);
×
2073
    sInfo("vgId:%d, step down as assigned leader, new arbToken:%s", pSyncNode->vgId, pSyncNode->arbToken);
×
2074
    (void)taosThreadMutexUnlock(&pSyncNode->arbTokenMutex);
×
2075
  }
2076

2077
  if (currentTerm < newTerm) {
64,092✔
2078
    raftStoreSetTerm(pSyncNode, newTerm);
55✔
2079
    char tmpBuf[64];
2080
    snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm);
55✔
2081
    syncNodeBecomeFollower(pSyncNode, id, tmpBuf);
55✔
2082
    raftStoreClearVote(pSyncNode);
55✔
2083
  } else {
2084
    if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
64,037!
2085
      syncNodeBecomeFollower(pSyncNode, id, "step down");
×
2086
    }
2087
  }
2088
}
2089

2090
void syncNodeLeaderChangeRsp(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); }
129✔
2091

2092
void syncNodeBecomeFollower(SSyncNode* pSyncNode, SRaftId leaderId, const char* debugStr) {
129✔
2093
  int32_t code = 0;  // maybe clear leader cache
129✔
2094
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
129!
2095
    pSyncNode->leaderCache = EMPTY_RAFT_ID;
×
2096
  }
2097

2098
  pSyncNode->hbSlowNum = 0;
129✔
2099

2100
  pSyncNode->leaderCache = leaderId;  // state change
129✔
2101
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
129✔
2102
  pSyncNode->roleTimeMs = taosGetTimestampMs();
129✔
2103
  if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
129!
2104
    sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
2105
    return;
×
2106
  }
2107

2108
  // trace log
2109
  sNTrace(pSyncNode, "become follower %s", debugStr);
129!
2110

2111
  // send rsp to client
2112
  syncNodeLeaderChangeRsp(pSyncNode);
129✔
2113

2114
  // call back
2115
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeFollowerCb != NULL) {
129!
2116
    pSyncNode->pFsm->FpBecomeFollowerCb(pSyncNode->pFsm);
129✔
2117
  }
2118

2119
  // min match index
2120
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
129✔
2121

2122
  // reset log buffer
2123
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
129!
2124
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2125
    return;
×
2126
  }
2127

2128
  // reset elect timer
2129
  syncNodeResetElectTimer(pSyncNode);
129✔
2130

2131
  sInfo("vgId:%d, become follower. %s", pSyncNode->vgId, debugStr);
129!
2132
}
2133

2134
void syncNodeBecomeLearner(SSyncNode* pSyncNode, const char* debugStr) {
8✔
2135
  pSyncNode->hbSlowNum = 0;
8✔
2136

2137
  // state change
2138
  pSyncNode->state = TAOS_SYNC_STATE_LEARNER;
8✔
2139
  pSyncNode->roleTimeMs = taosGetTimestampMs();
8✔
2140

2141
  // trace log
2142
  sNTrace(pSyncNode, "become learner %s", debugStr);
8!
2143

2144
  // call back
2145
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLearnerCb != NULL) {
8!
2146
    pSyncNode->pFsm->FpBecomeLearnerCb(pSyncNode->pFsm);
8✔
2147
  }
2148

2149
  // min match index
2150
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
8✔
2151

2152
  // reset log buffer
2153
  int32_t code = 0;
8✔
2154
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
8!
2155
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2156
    return;
×
2157
  };
2158
}
2159

2160
// TLA+ Spec
2161
// \* Candidate i transitions to leader.
2162
// BecomeLeader(i) ==
2163
//     /\ state[i] = Candidate
2164
//     /\ votesGranted[i] \in Quorum
2165
//     /\ state'      = [state EXCEPT ![i] = Leader]
2166
//     /\ nextIndex'  = [nextIndex EXCEPT ![i] =
2167
//                          [j \in Server |-> Len(log[i]) + 1]]
2168
//     /\ matchIndex' = [matchIndex EXCEPT ![i] =
2169
//                          [j \in Server |-> 0]]
2170
//     /\ elections'  = elections \cup
2171
//                          {[eterm     |-> currentTerm[i],
2172
//                            eleader   |-> i,
2173
//                            elog      |-> log[i],
2174
//                            evotes    |-> votesGranted[i],
2175
//                            evoterLog |-> voterLog[i]]}
2176
//     /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
2177
//
2178
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
282✔
2179
  int32_t code = 0;
282✔
2180
  pSyncNode->becomeLeaderNum++;
282✔
2181
  pSyncNode->hbrSlowNum = 0;
282✔
2182

2183
  // reset restoreFinish
2184
  pSyncNode->restoreFinish = false;
282✔
2185

2186
  // state change
2187
  pSyncNode->state = TAOS_SYNC_STATE_LEADER;
282✔
2188
  pSyncNode->roleTimeMs = taosGetTimestampMs();
282✔
2189

2190
  // set leader cache
2191
  pSyncNode->leaderCache = pSyncNode->myRaftId;
282✔
2192

2193
  for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
610✔
2194
    SyncIndex lastIndex;
2195
    SyncTerm  lastTerm;
2196
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
328✔
2197
    if (code != 0) {
328!
2198
      sError("vgId:%d, failed to become leader since %s", pSyncNode->vgId, tstrerror(code));
×
2199
      return;
×
2200
    }
2201
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
328✔
2202
  }
2203

2204
  for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
610✔
2205
    // maybe overwrite myself, no harm
2206
    // just do it!
2207
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
328✔
2208
  }
2209

2210
  // init peer mgr
2211
  if ((code = syncNodePeerStateInit(pSyncNode)) != 0) {
282!
2212
    sError("vgId:%d, failed to init peer state since %s", pSyncNode->vgId, tstrerror(code));
×
2213
    return;
×
2214
  }
2215

2216
#if 0
2217
  // update sender private term
2218
  SSyncSnapshotSender* pMySender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->myRaftId));
2219
  if (pMySender != NULL) {
2220
    for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
2221
      if (pSyncNode->senders[i]->privateTerm > pMySender->privateTerm) {
2222
        pMySender->privateTerm = pSyncNode->senders[i]->privateTerm;
2223
      }
2224
    }
2225
    (pMySender->privateTerm) += 100;
2226
  }
2227
#endif
2228

2229
  // close receiver
2230
  if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
282!
2231
    snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
2232
  }
2233

2234
  // stop elect timer
2235
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
282!
2236
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
2237
    return;
×
2238
  }
2239

2240
  // start heartbeat timer
2241
  if ((code = syncNodeStartHeartbeatTimer(pSyncNode)) != 0) {
282!
2242
    sError("vgId:%d, failed to start heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
2243
    return;
×
2244
  }
2245

2246
  // send heartbeat right now
2247
  if ((code = syncNodeHeartbeatPeers(pSyncNode)) != 0) {
282!
2248
    sError("vgId:%d, failed to send heartbeat to peers since %s", pSyncNode->vgId, tstrerror(code));
×
2249
    return;
×
2250
  }
2251

2252
  // call back
2253
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLeaderCb != NULL) {
282!
2254
    pSyncNode->pFsm->FpBecomeLeaderCb(pSyncNode->pFsm);
282✔
2255
  }
2256

2257
  // min match index
2258
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
282✔
2259

2260
  // reset log buffer
2261
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
282!
2262
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2263
    return;
×
2264
  }
2265

2266
  // trace log
2267
  sNInfo(pSyncNode, "become leader %s", debugStr);
282✔
2268
}
2269

2270
void syncNodeBecomeAssignedLeader(SSyncNode* pSyncNode) {
×
2271
  int32_t code = 0;
×
2272
  pSyncNode->becomeAssignedLeaderNum++;
×
2273
  pSyncNode->hbrSlowNum = 0;
×
2274

2275
  // reset restoreFinish
2276
  // pSyncNode->restoreFinish = false;
2277

2278
  // state change
2279
  pSyncNode->state = TAOS_SYNC_STATE_ASSIGNED_LEADER;
×
2280
  pSyncNode->roleTimeMs = taosGetTimestampMs();
×
2281

2282
  // set leader cache
2283
  pSyncNode->leaderCache = pSyncNode->myRaftId;
×
2284

2285
  for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
×
2286
    SyncIndex lastIndex;
2287
    SyncTerm  lastTerm;
2288
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
×
2289
    if (code != 0) {
×
2290
      sError("vgId:%d, failed to become assigned leader since %s", pSyncNode->vgId, tstrerror(code));
×
2291
      return;
×
2292
    }
2293
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
×
2294
  }
2295

2296
  for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
×
2297
    // maybe overwrite myself, no harm
2298
    // just do it!
2299
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
×
2300
  }
2301

2302
  // init peer mgr
2303
  if ((code = syncNodePeerStateInit(pSyncNode)) != 0) {
×
2304
    sError("vgId:%d, failed to init peer state since %s", pSyncNode->vgId, tstrerror(code));
×
2305
    return;
×
2306
  }
2307

2308
  // close receiver
2309
  if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
×
2310
    snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
2311
  }
2312

2313
  // stop elect timer
2314
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
×
2315
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
2316
    return;
×
2317
  }
2318

2319
  // start heartbeat timer
2320
  if ((code = syncNodeStartHeartbeatTimer(pSyncNode)) != 0) {
×
2321
    sError("vgId:%d, failed to start heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
2322
    return;
×
2323
  }
2324

2325
  // send heartbeat right now
2326
  if ((code = syncNodeHeartbeatPeers(pSyncNode)) != 0) {
×
2327
    sError("vgId:%d, failed to send heartbeat to peers since %s", pSyncNode->vgId, tstrerror(code));
×
2328
    return;
×
2329
  }
2330

2331
  // call back
2332
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeAssignedLeaderCb != NULL) {
×
2333
    pSyncNode->pFsm->FpBecomeAssignedLeaderCb(pSyncNode->pFsm);
×
2334
  }
2335

2336
  // min match index
2337
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
×
2338

2339
  // reset log buffer
2340
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
×
2341
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2342
    return;
×
2343
  }
2344

2345
  // trace log
2346
  sNInfo(pSyncNode, "become assigned leader");
×
2347
}
2348

2349
void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
27✔
2350
  if (pSyncNode->state != TAOS_SYNC_STATE_CANDIDATE) {
27!
2351
    sError("vgId:%d, failed leader from candidate since node state is wrong:%d", pSyncNode->vgId, pSyncNode->state);
×
2352
    return;
×
2353
  }
2354
  bool granted = voteGrantedMajority(pSyncNode->pVotesGranted);
27✔
2355
  if (!granted) {
27!
2356
    sError("vgId:%d, not granted by majority.", pSyncNode->vgId);
×
2357
    return;
×
2358
  }
2359
  syncNodeBecomeLeader(pSyncNode, "candidate to leader");
27✔
2360

2361
  sNTrace(pSyncNode, "state change syncNodeCandidate2Leader");
27!
2362

2363
  int32_t ret = syncNodeAppendNoop(pSyncNode);
27✔
2364
  if (ret < 0) {
27!
2365
    sError("vgId:%d, failed to append noop entry since %s", pSyncNode->vgId, terrstr());
×
2366
  }
2367

2368
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
27✔
2369

2370
  sInfo("vgId:%d, become leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64, pSyncNode->vgId,
27!
2371
        raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
2372
}
2373

2374
bool syncNodeIsMnode(SSyncNode* pSyncNode) { return (pSyncNode->vgId == 1); }
5,474✔
2375

2376
int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) {
622✔
2377
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
9,952✔
2378
    pSyncNode->peerStates[i].lastSendIndex = SYNC_INDEX_INVALID;
9,330✔
2379
    pSyncNode->peerStates[i].lastSendTime = 0;
9,330✔
2380
  }
2381

2382
  return 0;
622✔
2383
}
2384

2385
void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
31✔
2386
  if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
31!
2387
    sError("vgId:%d, failed candidate from follower since node state is wrong:%d", pSyncNode->vgId, pSyncNode->state);
×
2388
    return;
×
2389
  }
2390
  pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
31✔
2391
  pSyncNode->roleTimeMs = taosGetTimestampMs();
31✔
2392
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
31✔
2393
  sInfo("vgId:%d, become candidate from follower. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
31!
2394
        pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
2395

2396
  sNTrace(pSyncNode, "follower to candidate");
31!
2397
}
2398

2399
int32_t syncNodeAssignedLeader2Leader(SSyncNode* pSyncNode) {
×
2400
  if (pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) return TSDB_CODE_SYN_INTERNAL_ERROR;
×
2401
  syncNodeBecomeLeader(pSyncNode, "assigned leader to leader");
×
2402

2403
  sNTrace(pSyncNode, "assigned leader to leader");
×
2404

2405
  int32_t ret = syncNodeAppendNoop(pSyncNode);
×
2406
  if (ret < 0) {
×
2407
    sError("vgId:%d, failed to append noop entry since %s", pSyncNode->vgId, tstrerror(ret));
×
2408
  }
2409

2410
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
×
2411
  sInfo("vgId:%d, become leader from assigned leader. term:%" PRId64 ", commit index:%" PRId64
×
2412
        "assigned commit index:%" PRId64 ", last index:%" PRId64,
2413
        pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, pSyncNode->assignedCommitIndex,
2414
        lastIndex);
2415
  return 0;
×
2416
}
2417

2418
// just called by syncNodeVoteForSelf
2419
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
36✔
2420
  SyncTerm storeTerm = raftStoreGetTerm(pSyncNode);
36✔
2421
  if (term != storeTerm) {
36!
2422
    sError("vgId:%d, failed to vote for term, term:%" PRId64 ", storeTerm:%" PRId64, pSyncNode->vgId, term, storeTerm);
×
2423
    return;
×
2424
  }
2425
  sTrace("vgId:%d, begin hasVoted", pSyncNode->vgId);
36!
2426
  bool voted = raftStoreHasVoted(pSyncNode);
36✔
2427
  if (voted) {
36!
2428
    sError("vgId:%d, failed to vote for term since not voted", pSyncNode->vgId);
×
2429
    return;
×
2430
  }
2431

2432
  raftStoreVote(pSyncNode, pRaftId);
36✔
2433
}
2434

2435
// simulate get vote from outside
2436
void syncNodeVoteForSelf(SSyncNode* pSyncNode, SyncTerm currentTerm) {
36✔
2437
  syncNodeVoteForTerm(pSyncNode, currentTerm, &pSyncNode->myRaftId);
36✔
2438

2439
  SRpcMsg rpcMsg = {0};
36✔
2440
  int32_t ret = syncBuildRequestVoteReply(&rpcMsg, pSyncNode->vgId);
36✔
2441
  if (ret != 0) return;
36!
2442

2443
  SyncRequestVoteReply* pMsg = rpcMsg.pCont;
36✔
2444
  pMsg->srcId = pSyncNode->myRaftId;
36✔
2445
  pMsg->destId = pSyncNode->myRaftId;
36✔
2446
  pMsg->term = currentTerm;
36✔
2447
  pMsg->voteGranted = true;
36✔
2448

2449
  voteGrantedVote(pSyncNode->pVotesGranted, pMsg);
36✔
2450
  votesRespondAdd(pSyncNode->pVotesRespond, pMsg);
36✔
2451
  rpcFreeCont(rpcMsg.pCont);
36✔
2452
}
2453

2454
// return if has a snapshot
2455
bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
437✔
2456
  bool      ret = false;
437✔
2457
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
437✔
2458
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
437!
2459
    // TODO check return value
2460
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
437✔
2461
    if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) {
437✔
2462
      ret = true;
83✔
2463
    }
2464
  }
2465
  return ret;
437✔
2466
}
2467

2468
// return max(logLastIndex, snapshotLastIndex)
2469
// if no snapshot and log, return -1
2470
SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode) {
437✔
2471
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
437✔
2472
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
437!
2473
    // TODO check return value
2474
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
437✔
2475
  }
2476
  SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
437✔
2477

2478
  SyncIndex lastIndex = logLastIndex > snapshot.lastApplyIndex ? logLastIndex : snapshot.lastApplyIndex;
437✔
2479
  return lastIndex;
437✔
2480
}
2481

2482
// return the last term of snapshot and log
2483
// if error, return SYNC_TERM_INVALID (by syncLogLastTerm)
2484
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) {
437✔
2485
  SyncTerm lastTerm = 0;
437✔
2486
  if (syncNodeHasSnapshot(pSyncNode)) {
437✔
2487
    // has snapshot
2488
    SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
83✔
2489
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
83!
2490
      // TODO check return value
2491
      (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
83✔
2492
    }
2493

2494
    SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
83✔
2495
    if (logLastIndex > snapshot.lastApplyIndex) {
83✔
2496
      lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
61✔
2497
    } else {
2498
      lastTerm = snapshot.lastApplyTerm;
22✔
2499
    }
2500

2501
  } else {
2502
    // no snapshot
2503
    lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
354✔
2504
  }
2505

2506
  return lastTerm;
437✔
2507
}
2508

2509
// get last index and term along with snapshot
2510
int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm) {
384✔
2511
  *pLastIndex = syncNodeGetLastIndex(pSyncNode);
384✔
2512
  *pLastTerm = syncNodeGetLastTerm(pSyncNode);
384✔
2513
  return 0;
384✔
2514
}
2515

2516
#ifdef BUILD_NO_CALL
2517
// return append-entries first try index
2518
SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) {
2519
  SyncIndex syncStartIndex = syncNodeGetLastIndex(pSyncNode) + 1;
2520
  return syncStartIndex;
2521
}
2522

2523
// if index > 0, return index - 1
2524
// else, return -1
2525
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
2526
  SyncIndex preIndex = index - 1;
2527
  if (preIndex < SYNC_INDEX_INVALID) {
2528
    preIndex = SYNC_INDEX_INVALID;
2529
  }
2530

2531
  return preIndex;
2532
}
2533

2534
// if index < 0, return SYNC_TERM_INVALID
2535
// if index == 0, return 0
2536
// if index > 0, return preTerm
2537
// if error, return SYNC_TERM_INVALID
2538
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
2539
  if (index < SYNC_INDEX_BEGIN) {
2540
    return SYNC_TERM_INVALID;
2541
  }
2542

2543
  if (index == SYNC_INDEX_BEGIN) {
2544
    return 0;
2545
  }
2546

2547
  SyncTerm  preTerm = 0;
2548
  SyncIndex preIndex = index - 1;
2549

2550
  SSyncRaftEntry* pPreEntry = NULL;
2551
  SLRUCache*      pCache = pSyncNode->pLogStore->pCache;
2552
  LRUHandle*      h = taosLRUCacheLookup(pCache, &preIndex, sizeof(preIndex));
2553
  int32_t         code = 0;
2554
  if (h) {
2555
    pPreEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
2556
    code = 0;
2557

2558
    pSyncNode->pLogStore->cacheHit++;
2559
    sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", preIndex, pPreEntry->bytes, pPreEntry);
2560

2561
  } else {
2562
    pSyncNode->pLogStore->cacheMiss++;
2563
    sNTrace(pSyncNode, "miss cache index:%" PRId64, preIndex);
2564

2565
    code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry);
2566
  }
2567

2568
  SSnapshot snapshot = {.data = NULL,
2569
                        .lastApplyIndex = SYNC_INDEX_INVALID,
2570
                        .lastApplyTerm = SYNC_TERM_INVALID,
2571
                        .lastConfigIndex = SYNC_INDEX_INVALID};
2572

2573
  if (code == 0) {
2574
    if (pPreEntry == NULL) return -1;
2575
    preTerm = pPreEntry->term;
2576

2577
    if (h) {
2578
      taosLRUCacheRelease(pCache, h, false);
2579
    } else {
2580
      syncEntryDestroy(pPreEntry);
2581
    }
2582

2583
    return preTerm;
2584
  } else {
2585
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
2586
      // TODO check return value
2587
      (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
2588
      if (snapshot.lastApplyIndex == preIndex) {
2589
        return snapshot.lastApplyTerm;
2590
      }
2591
    }
2592
  }
2593

2594
  sNError(pSyncNode, "sync node get pre term error, index:%" PRId64 ", snap-index:%" PRId64 ", snap-term:%" PRId64,
2595
          index, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
2596
  return SYNC_TERM_INVALID;
2597
}
2598

2599
// get pre index and term of "index"
2600
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm) {
2601
  *pPreIndex = syncNodeGetPreIndex(pSyncNode, index);
2602
  *pPreTerm = syncNodeGetPreTerm(pSyncNode, index);
2603
  return 0;
2604
}
2605
#endif
2606

2607
static void syncNodeEqPingTimer(void* param, void* tmrId) {
2,331✔
2608
  if (!syncIsInit()) return;
2,331!
2609

2610
  int64_t    rid = (int64_t)param;
2,331✔
2611
  SSyncNode* pNode = syncNodeAcquire(rid);
2,331✔
2612

2613
  if (pNode == NULL) return;
2,331!
2614

2615
  if (atomic_load_64(&pNode->pingTimerLogicClockUser) <= atomic_load_64(&pNode->pingTimerLogicClock)) {
2,331!
2616
    SRpcMsg rpcMsg = {0};
2,331✔
2617
    int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock),
2,331✔
2618
                                    pNode->pingTimerMS, pNode);
2619
    if (code != 0) {
2,331!
2620
      sError("failed to build ping msg");
×
2621
      rpcFreeCont(rpcMsg.pCont);
×
2622
      goto _out;
×
2623
    }
2624

2625
    // sTrace("enqueue ping msg");
2626
    code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
2,331✔
2627
    if (code != 0) {
2,331!
2628
      sError("failed to sync enqueue ping msg since %s", terrstr());
×
2629
      rpcFreeCont(rpcMsg.pCont);
×
2630
      goto _out;
×
2631
    }
2632

2633
  _out:
2,331✔
2634
    if (taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager,
2,331!
2635
                     &pNode->pPingTimer))
2636
      sError("failed to reset ping timer");
×
2637
  }
2638
  syncNodeRelease(pNode);
2,331✔
2639
}
2640

2641
static void syncNodeEqElectTimer(void* param, void* tmrId) {
37✔
2642
  if (!syncIsInit()) return;
38!
2643

2644
  int64_t    rid = (int64_t)param;
37✔
2645
  SSyncNode* pNode = syncNodeAcquire(rid);
37✔
2646

2647
  if (pNode == NULL) return;
37!
2648

2649
  if (pNode->syncEqMsg == NULL) {
37!
2650
    syncNodeRelease(pNode);
×
2651
    return;
×
2652
  }
2653

2654
  int64_t tsNow = taosGetTimestampMs();
37✔
2655
  if (tsNow < pNode->electTimerParam.executeTime) {
37✔
2656
    syncNodeRelease(pNode);
1✔
2657
    return;
1✔
2658
  }
2659

2660
  SRpcMsg rpcMsg = {0};
36✔
2661
  int32_t code =
2662
      syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pNode->electTimerParam.logicClock, pNode->electTimerMS, pNode);
36✔
2663

2664
  if (code != 0) {
36!
2665
    sError("failed to build elect msg");
×
2666
    syncNodeRelease(pNode);
×
2667
    return;
×
2668
  }
2669

2670
  SyncTimeout* pTimeout = rpcMsg.pCont;
36✔
2671
  sNTrace(pNode, "enqueue elect msg lc:%" PRId64, pTimeout->logicClock);
36!
2672

2673
  code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
36✔
2674
  if (code != 0) {
36!
2675
    sError("failed to sync enqueue elect msg since %s", terrstr());
×
2676
    rpcFreeCont(rpcMsg.pCont);
×
2677
    syncNodeRelease(pNode);
×
2678
    return;
×
2679
  }
2680

2681
  syncNodeRelease(pNode);
36✔
2682
}
2683

2684
#ifdef BUILD_NO_CALL
2685
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
2686
  if (!syncIsInit()) return;
2687

2688
  int64_t    rid = (int64_t)param;
2689
  SSyncNode* pNode = syncNodeAcquire(rid);
2690

2691
  if (pNode == NULL) return;
2692

2693
  if (pNode->totalReplicaNum > 1) {
2694
    if (atomic_load_64(&pNode->heartbeatTimerLogicClockUser) <= atomic_load_64(&pNode->heartbeatTimerLogicClock)) {
2695
      SRpcMsg rpcMsg = {0};
2696
      int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pNode->heartbeatTimerLogicClock),
2697
                                      pNode->heartbeatTimerMS, pNode);
2698

2699
      if (code != 0) {
2700
        sError("failed to build heartbeat msg");
2701
        goto _out;
2702
      }
2703

2704
      sTrace("vgId:%d, enqueue heartbeat timer", pNode->vgId);
2705
      code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
2706
      if (code != 0) {
2707
        sError("failed to enqueue heartbeat msg since %s", terrstr());
2708
        rpcFreeCont(rpcMsg.pCont);
2709
        goto _out;
2710
      }
2711

2712
    _out:
2713
      if (taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager,
2714
                       &pNode->pHeartbeatTimer) != 0)
2715
        return;
2716

2717
    } else {
2718
      sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRId64 ", heartbeatTimerLogicClockUser:%" PRId64,
2719
             pNode->heartbeatTimerLogicClock, pNode->heartbeatTimerLogicClockUser);
2720
    }
2721
  }
2722
}
2723
#endif
2724

2725
static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
4,300✔
2726
  int32_t code = 0;
4,300✔
2727
  int64_t hbDataRid = (int64_t)param;
4,300✔
2728
  int64_t tsNow = taosGetTimestampMs();
4,300✔
2729

2730
  SSyncHbTimerData* pData = syncHbTimerDataAcquire(hbDataRid);
4,300✔
2731
  if (pData == NULL) {
4,300!
2732
    sError("hb timer get pData NULL, %" PRId64, hbDataRid);
×
2733
    return;
×
2734
  }
2735

2736
  SSyncNode* pSyncNode = syncNodeAcquire(pData->syncNodeRid);
4,300✔
2737
  if (pSyncNode == NULL) {
4,300!
2738
    syncHbTimerDataRelease(pData);
×
2739
    sError("hb timer get pSyncNode NULL");
×
2740
    return;
×
2741
  }
2742

2743
  SSyncTimer* pSyncTimer = pData->pTimer;
4,300✔
2744

2745
  if (!pSyncNode->isStart) {
4,300!
2746
    syncNodeRelease(pSyncNode);
×
2747
    syncHbTimerDataRelease(pData);
×
2748
    sError("vgId:%d, hb timer sync node already stop", pSyncNode->vgId);
×
2749
    return;
×
2750
  }
2751

2752
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
4,300!
2753
    syncNodeRelease(pSyncNode);
×
2754
    syncHbTimerDataRelease(pData);
×
2755
    sError("vgId:%d, hb timer sync node not leader", pSyncNode->vgId);
×
2756
    return;
×
2757
  }
2758

2759
  sTrace("vgId:%d, peer hb timer execution, rid:%" PRId64 " addr:0x%" PRIx64, pSyncNode->vgId, hbDataRid,
4,300!
2760
         pData->destId.addr);
2761

2762
  if (pSyncNode->totalReplicaNum > 1) {
4,300!
2763
    int64_t timerLogicClock = atomic_load_64(&pSyncTimer->logicClock);
4,300✔
2764
    int64_t msgLogicClock = atomic_load_64(&pData->logicClock);
4,300✔
2765

2766
    if (timerLogicClock == msgLogicClock) {
4,300!
2767
      if (tsNow > pData->execTime) {
4,300✔
2768
        pData->execTime += pSyncTimer->timerMS;
4,291✔
2769

2770
        SRpcMsg rpcMsg = {0};
4,291✔
2771
        if ((code = syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId)) != 0) {
4,291!
2772
          sError("vgId:%d, failed to build heartbeat msg since %s", pSyncNode->vgId, tstrerror(code));
×
2773
          syncNodeRelease(pSyncNode);
×
2774
          syncHbTimerDataRelease(pData);
×
2775
          return;
×
2776
        }
2777

2778
        pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode);
4,291✔
2779

2780
        SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
4,291✔
2781
        pSyncMsg->srcId = pSyncNode->myRaftId;
4,291✔
2782
        pSyncMsg->destId = pData->destId;
4,291✔
2783
        pSyncMsg->term = raftStoreGetTerm(pSyncNode);
4,291✔
2784
        pSyncMsg->commitIndex = pSyncNode->commitIndex;
4,291✔
2785
        pSyncMsg->minMatchIndex = pSyncNode->minMatchIndex;
4,291✔
2786
        pSyncMsg->privateTerm = 0;
4,291✔
2787
        pSyncMsg->timeStamp = tsNow;
4,291✔
2788

2789
        // update reset time
2790
        int64_t timerElapsed = tsNow - pSyncTimer->timeStamp;
4,291✔
2791
        pSyncTimer->timeStamp = tsNow;
4,291✔
2792

2793
        // send msg
2794
        TRACE_SET_MSGID(&(rpcMsg.info.traceId), tGenIdPI64());
4,291✔
2795
        TRACE_SET_ROOTID(&(rpcMsg.info.traceId), tGenIdPI64());
4,291✔
2796
        sGTrace(&rpcMsg.info.traceId, "vgId:%d, send sync-heartbeat to dnode:%d", pSyncNode->vgId,
4,291!
2797
                DID(&(pSyncMsg->destId)));
2798
        syncLogSendHeartbeat(pSyncNode, pSyncMsg, false, timerElapsed, pData->execTime);
4,291✔
2799
        int ret = syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
4,291✔
2800
        if (ret != 0) {
4,291!
2801
          sError("vgId:%d, failed to send heartbeat since %s", pSyncNode->vgId, tstrerror(ret));
×
2802
        }
2803
      }
2804

2805
      if (syncIsInit()) {
4,300!
2806
        sTrace("vgId:%d, reset peer hb timer at %d", pSyncNode->vgId, pSyncTimer->timerMS);
4,300!
2807
        bool stopped = taosTmrResetPriority(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS, (void*)hbDataRid,
4,300✔
2808
                                            syncEnv()->pTimerManager, &pSyncTimer->pTimer, 2);
4,300✔
2809
        if (stopped) sError("vgId:%d, reset peer hb timer error, %s", pSyncNode->vgId, tstrerror(code));
4,300!
2810

2811
      } else {
2812
        sError("sync env is stop, reset peer hb timer error");
×
2813
      }
2814

2815
    } else {
2816
      sTrace("vgId:%d, do not send hb, timerLogicClock:%" PRId64 ", msgLogicClock:%" PRId64, pSyncNode->vgId,
×
2817
             timerLogicClock, msgLogicClock);
2818
    }
2819
  }
2820

2821
  syncHbTimerDataRelease(pData);
4,300✔
2822
  syncNodeRelease(pSyncNode);
4,300✔
2823
}
2824

2825
#ifdef BUILD_NO_CALL
2826
static void deleteCacheEntry(const void* key, size_t keyLen, void* value, void* ud) {
2827
  (void)ud;
2828
  taosMemoryFree(value);
2829
}
2830

2831
int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) {
2832
  SSyncLogStoreData* pData = pLogStore->data;
2833
  sNTrace(pData->pSyncNode, "in cache index:%" PRId64 ", bytes:%u, %p", pEntry->index, pEntry->bytes, pEntry);
2834

2835
  int32_t   code = 0;
2836
  int32_t   entryLen = sizeof(*pEntry) + pEntry->dataLen;
2837
  LRUStatus status = taosLRUCacheInsert(pLogStore->pCache, &pEntry->index, sizeof(pEntry->index), pEntry, entryLen,
2838
                                        deleteCacheEntry, h, TAOS_LRU_PRIORITY_LOW, NULL);
2839
  if (status != TAOS_LRU_STATUS_OK) {
2840
    code = -1;
2841
  }
2842

2843
  return code;
2844
}
2845
#endif
2846

2847
void syncBuildConfigFromReq(SAlterVnodeReplicaReq* pReq, SSyncCfg* cfg) {  // TODO SAlterVnodeReplicaReq name is proper?
×
2848
  cfg->replicaNum = 0;
×
2849
  cfg->totalReplicaNum = 0;
×
2850
  int32_t code = 0;
×
2851

2852
  for (int i = 0; i < pReq->replica; ++i) {
×
2853
    SNodeInfo* pNode = &cfg->nodeInfo[i];
×
2854
    pNode->nodeId = pReq->replicas[i].id;
×
2855
    pNode->nodePort = pReq->replicas[i].port;
×
2856
    tstrncpy(pNode->nodeFqdn, pReq->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
×
2857
    pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
×
2858
    bool update = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
×
2859
    sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d, update:%d", pReq->vgId, i, pNode->nodeFqdn,
×
2860
          pNode->nodePort, pNode->nodeId, pNode->nodeRole, update);
2861
    cfg->replicaNum++;
×
2862
  }
2863
  if (pReq->selfIndex != -1) {
×
2864
    cfg->myIndex = pReq->selfIndex;
×
2865
  }
2866
  for (int i = cfg->replicaNum; i < pReq->replica + pReq->learnerReplica; ++i) {
×
2867
    SNodeInfo* pNode = &cfg->nodeInfo[i];
×
2868
    pNode->nodeId = pReq->learnerReplicas[cfg->totalReplicaNum].id;
×
2869
    pNode->nodePort = pReq->learnerReplicas[cfg->totalReplicaNum].port;
×
2870
    pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
2871
    tstrncpy(pNode->nodeFqdn, pReq->learnerReplicas[cfg->totalReplicaNum].fqdn, sizeof(pNode->nodeFqdn));
×
2872
    bool update = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
×
2873
    sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d, update:%d", pReq->vgId, i, pNode->nodeFqdn,
×
2874
          pNode->nodePort, pNode->nodeId, pNode->nodeRole, update);
2875
    cfg->totalReplicaNum++;
×
2876
  }
2877
  cfg->totalReplicaNum += pReq->replica;
×
2878
  if (pReq->learnerSelfIndex != -1) {
×
2879
    cfg->myIndex = pReq->replica + pReq->learnerSelfIndex;
×
2880
  }
2881
  cfg->changeVersion = pReq->changeVersion;
×
2882
}
×
2883

2884
int32_t syncNodeCheckChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry) {
×
2885
  int32_t code = 0;
×
2886
  if (pEntry->originalRpcType != TDMT_SYNC_CONFIG_CHANGE) {
×
2887
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
2888
  }
2889

2890
  SMsgHead* head = (SMsgHead*)pEntry->data;
×
2891
  void*     pReq = POINTER_SHIFT(head, sizeof(SMsgHead));
×
2892

2893
  SAlterVnodeTypeReq req = {0};
×
2894
  if (tDeserializeSAlterVnodeReplicaReq(pReq, head->contLen, &req) != 0) {
×
2895
    code = TSDB_CODE_INVALID_MSG;
×
2896
    TAOS_RETURN(code);
×
2897
  }
2898

2899
  SSyncCfg cfg = {0};
×
2900
  syncBuildConfigFromReq(&req, &cfg);
×
2901

2902
  if (cfg.totalReplicaNum >= 1 &&
×
2903
      (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER)) {
×
2904
    bool incfg = false;
×
2905
    for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
×
2906
      if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
×
2907
          ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
×
2908
        incfg = true;
×
2909
        break;
×
2910
      }
2911
    }
2912

2913
    if (!incfg) {
×
2914
      SyncTerm currentTerm = raftStoreGetTerm(ths);
×
2915
      SRaftId  id = EMPTY_RAFT_ID;
×
2916
      syncNodeStepDown(ths, currentTerm, id);
×
2917
      return 1;
×
2918
    }
2919
  }
2920
  return 0;
×
2921
}
2922

2923
void syncNodeLogConfigInfo(SSyncNode* ths, SSyncCfg* cfg, char* str) {
×
2924
  sInfo("vgId:%d, %s. SyncNode, replicaNum:%d, peersNum:%d, lastConfigIndex:%" PRId64
×
2925
        ", changeVersion:%d, "
2926
        "restoreFinish:%d",
2927
        ths->vgId, str, ths->replicaNum, ths->peersNum, ths->raftCfg.lastConfigIndex, ths->raftCfg.cfg.changeVersion,
2928
        ths->restoreFinish);
2929

2930
  sInfo("vgId:%d, %s, myNodeInfo, clusterId:0x%" PRIx64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str,
×
2931
        ths->myNodeInfo.clusterId, ths->myNodeInfo.nodeId, ths->myNodeInfo.nodeFqdn, ths->myNodeInfo.nodePort,
2932
        ths->myNodeInfo.nodeRole);
2933

2934
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
2935
    sInfo("vgId:%d, %s, peersNodeInfo%d, clusterId:0x%" PRIx64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str,
×
2936
          i, ths->peersNodeInfo[i].clusterId, ths->peersNodeInfo[i].nodeId, ths->peersNodeInfo[i].nodeFqdn,
2937
          ths->peersNodeInfo[i].nodePort, ths->peersNodeInfo[i].nodeRole);
2938
  }
2939

2940
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
2941
    char    buf[256];
2942
    int32_t len = 256;
×
2943
    int32_t n = 0;
×
2944
    n += tsnprintf(buf + n, len - n, "%s", "{");
×
2945
    for (int i = 0; i < ths->peersEpset->numOfEps; i++) {
×
2946
      n += tsnprintf(buf + n, len - n, "%s:%d%s", ths->peersEpset->eps[i].fqdn, ths->peersEpset->eps[i].port,
×
2947
                     (i + 1 < ths->peersEpset->numOfEps ? ", " : ""));
×
2948
    }
2949
    n += tsnprintf(buf + n, len - n, "%s", "}");
×
2950

2951
    sInfo("vgId:%d, %s, peersEpset%d, %s, inUse:%d", ths->vgId, str, i, buf, ths->peersEpset->inUse);
×
2952
  }
2953

2954
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
2955
    sInfo("vgId:%d, %s, peersId%d, addr:0x%" PRIx64, ths->vgId, str, i, ths->peersId[i].addr);
×
2956
  }
2957

2958
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
2959
    sInfo("vgId:%d, %s, nodeInfo%d, clusterId:0x%" PRIx64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str, i,
×
2960
          ths->raftCfg.cfg.nodeInfo[i].clusterId, ths->raftCfg.cfg.nodeInfo[i].nodeId,
2961
          ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, ths->raftCfg.cfg.nodeInfo[i].nodePort,
2962
          ths->raftCfg.cfg.nodeInfo[i].nodeRole);
2963
  }
2964

2965
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
2966
    sInfo("vgId:%d, %s, replicasId%d, addr:0x%" PRIx64, ths->vgId, str, i, ths->replicasId[i].addr);
×
2967
  }
2968
}
×
2969

2970
int32_t syncNodeRebuildPeerAndCfg(SSyncNode* ths, SSyncCfg* cfg) {
×
2971
  int32_t i = 0;
×
2972

2973
  // change peersNodeInfo
2974
  i = 0;
×
2975
  for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
2976
    if (!(strcmp(ths->myNodeInfo.nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
2977
          ths->myNodeInfo.nodePort == cfg->nodeInfo[j].nodePort)) {
×
2978
      ths->peersNodeInfo[i].nodeRole = cfg->nodeInfo[j].nodeRole;
×
2979
      ths->peersNodeInfo[i].clusterId = cfg->nodeInfo[j].clusterId;
×
2980
      tstrncpy(ths->peersNodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn, TSDB_FQDN_LEN);
×
2981
      ths->peersNodeInfo[i].nodeId = cfg->nodeInfo[j].nodeId;
×
2982
      ths->peersNodeInfo[i].nodePort = cfg->nodeInfo[j].nodePort;
×
2983

2984
      syncUtilNodeInfo2EpSet(&ths->peersNodeInfo[i], &ths->peersEpset[i]);
×
2985

2986
      if (syncUtilNodeInfo2RaftId(&ths->peersNodeInfo[i], ths->vgId, &ths->peersId[i]) == false) {
×
2987
        sError("vgId:%d, failed to determine raft member id, peer:%d", ths->vgId, i);
×
2988
        return terrno;
×
2989
      }
2990

2991
      i++;
×
2992
    }
2993
  }
2994
  ths->peersNum = i;
×
2995

2996
  // change cfg nodeInfo
2997
  ths->raftCfg.cfg.replicaNum = 0;
×
2998
  i = 0;
×
2999
  for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
3000
    if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3001
      ths->raftCfg.cfg.replicaNum++;
×
3002
    }
3003
    ths->raftCfg.cfg.nodeInfo[i].nodeRole = cfg->nodeInfo[j].nodeRole;
×
3004
    ths->raftCfg.cfg.nodeInfo[i].clusterId = cfg->nodeInfo[j].clusterId;
×
3005
    tstrncpy(ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn, TSDB_FQDN_LEN);
×
3006
    ths->raftCfg.cfg.nodeInfo[i].nodeId = cfg->nodeInfo[j].nodeId;
×
3007
    ths->raftCfg.cfg.nodeInfo[i].nodePort = cfg->nodeInfo[j].nodePort;
×
3008
    if ((strcmp(ths->myNodeInfo.nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
3009
         ths->myNodeInfo.nodePort == cfg->nodeInfo[j].nodePort)) {
×
3010
      ths->raftCfg.cfg.myIndex = i;
×
3011
    }
3012
    i++;
×
3013
  }
3014
  ths->raftCfg.cfg.totalReplicaNum = i;
×
3015

3016
  return 0;
×
3017
}
3018

3019
void syncNodeChangePeerAndCfgToVoter(SSyncNode* ths, SSyncCfg* cfg) {
×
3020
  // change peersNodeInfo
3021
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
3022
    for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
3023
      if (strcmp(ths->peersNodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
3024
          ths->peersNodeInfo[i].nodePort == cfg->nodeInfo[j].nodePort) {
×
3025
        if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3026
          ths->peersNodeInfo[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3027
        }
3028
      }
3029
    }
3030
  }
3031

3032
  // change cfg nodeInfo
3033
  ths->raftCfg.cfg.replicaNum = 0;
×
3034
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3035
    for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
3036
      if (strcmp(ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
3037
          ths->raftCfg.cfg.nodeInfo[i].nodePort == cfg->nodeInfo[j].nodePort) {
×
3038
        if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3039
          ths->raftCfg.cfg.nodeInfo[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3040
          ths->raftCfg.cfg.replicaNum++;
×
3041
        }
3042
      }
3043
    }
3044
  }
3045
}
×
3046

3047
int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum) {
×
3048
  int32_t code = 0;
×
3049
  // 1.rebuild replicasId, remove deleted one
3050
  SRaftId oldReplicasId[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
3051
  memcpy(oldReplicasId, ths->replicasId, sizeof(oldReplicasId));
×
3052

3053
  ths->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3054
  ths->totalReplicaNum = ths->raftCfg.cfg.totalReplicaNum;
×
3055
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3056
    if (syncUtilNodeInfo2RaftId(&ths->raftCfg.cfg.nodeInfo[i], ths->vgId, &ths->replicasId[i]) == false) return terrno;
×
3057
  }
3058

3059
  // 2.rebuild MatchIndex, remove deleted one
3060
  SSyncIndexMgr* oldIndex = ths->pMatchIndex;
×
3061

3062
  ths->pMatchIndex = syncIndexMgrCreate(ths);
×
3063
  if (ths->pMatchIndex == NULL) {
×
3064
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
3065
    if (terrno != 0) code = terrno;
×
3066
    TAOS_RETURN(code);
×
3067
  }
3068

3069
  syncIndexMgrCopyIfExist(ths->pMatchIndex, oldIndex, oldReplicasId);
×
3070

3071
  syncIndexMgrDestroy(oldIndex);
×
3072

3073
  // 3.rebuild NextIndex, remove deleted one
3074
  SSyncIndexMgr* oldNextIndex = ths->pNextIndex;
×
3075

3076
  ths->pNextIndex = syncIndexMgrCreate(ths);
×
3077
  if (ths->pNextIndex == NULL) {
×
3078
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
3079
    if (terrno != 0) code = terrno;
×
3080
    TAOS_RETURN(code);
×
3081
  }
3082

3083
  syncIndexMgrCopyIfExist(ths->pNextIndex, oldNextIndex, oldReplicasId);
×
3084

3085
  syncIndexMgrDestroy(oldNextIndex);
×
3086

3087
  // 4.rebuild pVotesGranted, pVotesRespond, no need to keep old vote state, only rebuild
3088
  voteGrantedUpdate(ths->pVotesGranted, ths);
×
3089
  votesRespondUpdate(ths->pVotesRespond, ths);
×
3090

3091
  // 5.rebuild logReplMgr
3092
  for (int i = 0; i < oldtotalReplicaNum; ++i) {
×
3093
    sDebug("vgId:%d, old logReplMgrs i:%d, peerId:%d, restoreed:%d, [%" PRId64 " %" PRId64 ", %" PRId64 ")", ths->vgId,
×
3094
           i, ths->logReplMgrs[i]->peerId, ths->logReplMgrs[i]->restored, ths->logReplMgrs[i]->startIndex,
3095
           ths->logReplMgrs[i]->matchIndex, ths->logReplMgrs[i]->endIndex);
3096
  }
3097

3098
  SSyncLogReplMgr* oldLogReplMgrs = NULL;
×
3099
  int64_t          length = sizeof(SSyncLogReplMgr) * (TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA);
×
3100
  oldLogReplMgrs = taosMemoryMalloc(length);
×
3101
  if (NULL == oldLogReplMgrs) return terrno;
×
3102
  memset(oldLogReplMgrs, 0, length);
×
3103

3104
  for (int i = 0; i < oldtotalReplicaNum; i++) {
×
3105
    oldLogReplMgrs[i] = *(ths->logReplMgrs[i]);
×
3106
  }
3107

3108
  syncNodeLogReplDestroy(ths);
×
3109
  if ((code = syncNodeLogReplInit(ths)) != 0) {
×
3110
    taosMemoryFree(oldLogReplMgrs);
×
3111
    TAOS_RETURN(code);
×
3112
  }
3113

3114
  for (int i = 0; i < ths->totalReplicaNum; ++i) {
×
3115
    for (int j = 0; j < oldtotalReplicaNum; j++) {
×
3116
      if (syncUtilSameId(&ths->replicasId[i], &oldReplicasId[j])) {
×
3117
        *(ths->logReplMgrs[i]) = oldLogReplMgrs[j];
×
3118
        ths->logReplMgrs[i]->peerId = i;
×
3119
      }
3120
    }
3121
  }
3122

3123
  for (int i = 0; i < ths->totalReplicaNum; ++i) {
×
3124
    sDebug("vgId:%d, new logReplMgrs i:%d, peerId:%d, restoreed:%d, [%" PRId64 " %" PRId64 ", %" PRId64 ")", ths->vgId,
×
3125
           i, ths->logReplMgrs[i]->peerId, ths->logReplMgrs[i]->restored, ths->logReplMgrs[i]->startIndex,
3126
           ths->logReplMgrs[i]->matchIndex, ths->logReplMgrs[i]->endIndex);
3127
  }
3128

3129
  // 6.rebuild sender
3130
  for (int i = 0; i < oldtotalReplicaNum; ++i) {
×
3131
    sDebug("vgId:%d, old sender i:%d, replicaIndex:%d, lastSendTime:%" PRId64, ths->vgId, i,
×
3132
           ths->senders[i]->replicaIndex, ths->senders[i]->lastSendTime)
3133
  }
3134

3135
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
×
3136
    if (ths->senders[i] != NULL) {
×
3137
      sDebug("vgId:%d, snapshot sender destroy while close, data:%p", ths->vgId, ths->senders[i]);
×
3138

3139
      if (snapshotSenderIsStart(ths->senders[i])) {
×
3140
        snapshotSenderStop(ths->senders[i], false);
×
3141
      }
3142

3143
      snapshotSenderDestroy(ths->senders[i]);
×
3144
      ths->senders[i] = NULL;
×
3145
    }
3146
  }
3147

3148
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
×
3149
    SSyncSnapshotSender* pSender = NULL;
×
3150
    int32_t              code = snapshotSenderCreate(ths, i, &pSender);
×
3151
    if (pSender == NULL) return terrno = code;
×
3152

3153
    ths->senders[i] = pSender;
×
3154
    sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender);
×
3155
  }
3156

3157
  for (int i = 0; i < ths->totalReplicaNum; i++) {
×
3158
    sDebug("vgId:%d, new sender i:%d, replicaIndex:%d, lastSendTime:%" PRId64, ths->vgId, i,
×
3159
           ths->senders[i]->replicaIndex, ths->senders[i]->lastSendTime)
3160
  }
3161

3162
  // 7.rebuild synctimer
3163
  if ((code = syncNodeStopHeartbeatTimer(ths)) != 0) {
×
3164
    taosMemoryFree(oldLogReplMgrs);
×
3165
    TAOS_RETURN(code);
×
3166
  }
3167

3168
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
×
3169
    if ((code = syncHbTimerInit(ths, &ths->peerHeartbeatTimerArr[i], ths->replicasId[i])) != 0) {
×
3170
      taosMemoryFree(oldLogReplMgrs);
×
3171
      TAOS_RETURN(code);
×
3172
    }
3173
  }
3174

3175
  if ((code = syncNodeStartHeartbeatTimer(ths)) != 0) {
×
3176
    taosMemoryFree(oldLogReplMgrs);
×
3177
    TAOS_RETURN(code);
×
3178
  }
3179

3180
  // 8.rebuild peerStates
3181
  SPeerState oldState[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA] = {0};
×
3182
  for (int i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; i++) {
×
3183
    oldState[i] = ths->peerStates[i];
×
3184
  }
3185

3186
  for (int i = 0; i < ths->totalReplicaNum; i++) {
×
3187
    for (int j = 0; j < oldtotalReplicaNum; j++) {
×
3188
      if (syncUtilSameId(&ths->replicasId[i], &oldReplicasId[j])) {
×
3189
        ths->peerStates[i] = oldState[j];
×
3190
      }
3191
    }
3192
  }
3193

3194
  taosMemoryFree(oldLogReplMgrs);
×
3195

3196
  return 0;
×
3197
}
3198

3199
void syncNodeChangeToVoter(SSyncNode* ths) {
×
3200
  // replicasId, only need to change replicaNum when 1->3
3201
  ths->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3202
  sDebug("vgId:%d, totalReplicaNum:%d", ths->vgId, ths->totalReplicaNum);
×
3203
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
×
3204
    sDebug("vgId:%d, i:%d, replicaId.addr:0x%" PRIx64, ths->vgId, i, ths->replicasId[i].addr);
×
3205
  }
3206

3207
  // pMatchIndex, pNextIndex, only need to change replicaNum when 1->3
3208
  ths->pMatchIndex->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3209
  ths->pNextIndex->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3210

3211
  sDebug("vgId:%d, pMatchIndex->totalReplicaNum:%d", ths->vgId, ths->pMatchIndex->totalReplicaNum);
×
3212
  for (int32_t i = 0; i < ths->pMatchIndex->totalReplicaNum; ++i) {
×
3213
    sDebug("vgId:%d, i:%d, match.index:%" PRId64, ths->vgId, i, ths->pMatchIndex->index[i]);
×
3214
  }
3215

3216
  // pVotesGranted, pVotesRespond
3217
  voteGrantedUpdate(ths->pVotesGranted, ths);
×
3218
  votesRespondUpdate(ths->pVotesRespond, ths);
×
3219

3220
  // logRepMgrs
3221
  // no need to change logRepMgrs when 1->3
3222
}
×
3223

3224
void syncNodeResetPeerAndCfg(SSyncNode* ths) {
×
3225
  SNodeInfo node = {0};
×
3226
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
3227
    memcpy(&ths->peersNodeInfo[i], &node, sizeof(SNodeInfo));
×
3228
  }
3229

3230
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3231
    memcpy(&ths->raftCfg.cfg.nodeInfo[i], &node, sizeof(SNodeInfo));
×
3232
  }
3233
}
×
3234

3235
int32_t syncNodeChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry, char* str) {
×
3236
  int32_t code = 0;
×
3237
  if (pEntry->originalRpcType != TDMT_SYNC_CONFIG_CHANGE) {
×
3238
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3239
  }
3240

3241
  SMsgHead* head = (SMsgHead*)pEntry->data;
×
3242
  void*     pReq = POINTER_SHIFT(head, sizeof(SMsgHead));
×
3243

3244
  SAlterVnodeTypeReq req = {0};
×
3245
  if (tDeserializeSAlterVnodeReplicaReq(pReq, head->contLen, &req) != 0) {
×
3246
    code = TSDB_CODE_INVALID_MSG;
×
3247
    TAOS_RETURN(code);
×
3248
  }
3249

3250
  SSyncCfg cfg = {0};
×
3251
  syncBuildConfigFromReq(&req, &cfg);
×
3252

3253
  if (cfg.changeVersion <= ths->raftCfg.cfg.changeVersion) {
×
3254
    sInfo(
×
3255
        "vgId:%d, skip conf change entry since lower version. "
3256
        "this entry, index:%" PRId64 ", term:%" PRId64
3257
        ", totalReplicaNum:%d, changeVersion:%d; "
3258
        "current node, replicaNum:%d, peersNum:%d, lastConfigIndex:%" PRId64 ", changeVersion:%d",
3259
        ths->vgId, pEntry->index, pEntry->term, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum, ths->peersNum,
3260
        ths->raftCfg.lastConfigIndex, ths->raftCfg.cfg.changeVersion);
3261
    return 0;
×
3262
  }
3263

3264
  if (strcmp(str, "Commit") == 0) {
×
3265
    sInfo(
×
3266
        "vgId:%d, change config from %s. "
3267
        "this, i:%" PRId64
3268
        ", trNum:%d, vers:%d; "
3269
        "node, rNum:%d, pNum:%d, trNum:%d, "
3270
        "buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
3271
        "), "
3272
        "cond:(next i:%" PRId64 ", t:%" PRId64 " ==%s)",
3273
        ths->vgId, str, pEntry->index - 1, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum, ths->peersNum,
3274
        ths->totalReplicaNum, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex, ths->pLogBuf->matchIndex,
3275
        ths->pLogBuf->endIndex, pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType));
3276
  } else {
3277
    sInfo(
×
3278
        "vgId:%d, change config from %s. "
3279
        "this, i:%" PRId64 ", t:%" PRId64
3280
        ", trNum:%d, vers:%d; "
3281
        "node, rNum:%d, pNum:%d, trNum:%d, "
3282
        "buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
3283
        "), "
3284
        "cond:(pre i:%" PRId64 "==ci:%" PRId64 ", bci:%" PRId64 ")",
3285
        ths->vgId, str, pEntry->index, pEntry->term, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum,
3286
        ths->peersNum, ths->totalReplicaNum, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
3287
        ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex, pEntry->index - 1, ths->commitIndex,
3288
        ths->pLogBuf->commitIndex);
3289
  }
3290

3291
  syncNodeLogConfigInfo(ths, &cfg, "before config change");
×
3292

3293
  int32_t oldTotalReplicaNum = ths->totalReplicaNum;
×
3294

3295
  if (cfg.totalReplicaNum == 1 || cfg.totalReplicaNum == 2) {  // remove replica
×
3296

3297
    bool incfg = false;
×
3298
    for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
×
3299
      if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
×
3300
          ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
×
3301
        incfg = true;
×
3302
        break;
×
3303
      }
3304
    }
3305

3306
    if (incfg) {  // remove other
×
3307
      syncNodeResetPeerAndCfg(ths);
×
3308

3309
      // no need to change myNodeInfo
3310

3311
      if ((code = syncNodeRebuildPeerAndCfg(ths, &cfg)) != 0) {
×
3312
        TAOS_RETURN(code);
×
3313
      };
3314

3315
      if ((code = syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum)) != 0) {
×
3316
        TAOS_RETURN(code);
×
3317
      };
3318
    } else {  // remove myself
3319
      // no need to do anything actually, to change the following to reduce distruptive server chance
3320

3321
      syncNodeResetPeerAndCfg(ths);
×
3322

3323
      // change myNodeInfo
3324
      ths->myNodeInfo.nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3325

3326
      // change peer and cfg
3327
      ths->peersNum = 0;
×
3328
      memcpy(&ths->raftCfg.cfg.nodeInfo[0], &ths->myNodeInfo, sizeof(SNodeInfo));
×
3329
      ths->raftCfg.cfg.replicaNum = 0;
×
3330
      ths->raftCfg.cfg.totalReplicaNum = 1;
×
3331

3332
      // change other
3333
      if ((code = syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum)) != 0) {
×
3334
        TAOS_RETURN(code);
×
3335
      }
3336

3337
      // change state
3338
      ths->state = TAOS_SYNC_STATE_LEARNER;
×
3339
    }
3340

3341
    ths->restoreFinish = false;
×
3342
  } else {                            // add replica, or change replica type
3343
    if (ths->totalReplicaNum == 3) {  // change replica type
×
3344
      sInfo("vgId:%d, begin change replica type", ths->vgId);
×
3345

3346
      // change myNodeInfo
3347
      for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
×
3348
        if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
×
3349
            ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
×
3350
          if (cfg.nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3351
            ths->myNodeInfo.nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3352
          }
3353
        }
3354
      }
3355

3356
      // change peer and cfg
3357
      syncNodeChangePeerAndCfgToVoter(ths, &cfg);
×
3358

3359
      // change other
3360
      syncNodeChangeToVoter(ths);
×
3361

3362
      // change state
3363
      if (ths->state == TAOS_SYNC_STATE_LEARNER) {
×
3364
        if (ths->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3365
          ths->state = TAOS_SYNC_STATE_FOLLOWER;
×
3366
        }
3367
      }
3368

3369
      ths->restoreFinish = false;
×
3370
    } else {  // add replica
3371
      sInfo("vgId:%d, begin add replica", ths->vgId);
×
3372

3373
      // no need to change myNodeInfo
3374

3375
      // change peer and cfg
3376
      if ((code = syncNodeRebuildPeerAndCfg(ths, &cfg)) != 0) {
×
3377
        TAOS_RETURN(code);
×
3378
      };
3379

3380
      // change other
3381
      if ((code = syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum)) != 0) {
×
3382
        TAOS_RETURN(code);
×
3383
      };
3384

3385
      // no need to change state
3386

3387
      if (ths->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_LEARNER) {
×
3388
        ths->restoreFinish = false;
×
3389
      }
3390
    }
3391
  }
3392

3393
  ths->quorum = syncUtilQuorum(ths->replicaNum);
×
3394

3395
  ths->raftCfg.lastConfigIndex = pEntry->index;
×
3396
  ths->raftCfg.cfg.lastIndex = pEntry->index;
×
3397
  ths->raftCfg.cfg.changeVersion = cfg.changeVersion;
×
3398

3399
  syncNodeLogConfigInfo(ths, &cfg, "after config change");
×
3400

3401
  if ((code = syncWriteCfgFile(ths)) != 0) {
×
3402
    sError("vgId:%d, failed to create sync cfg file", ths->vgId);
×
3403
    TAOS_RETURN(code);
×
3404
  };
3405

3406
  TAOS_RETURN(code);
×
3407
}
3408

3409
int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry, SRpcMsg* pMsg) {
71,167✔
3410
  int32_t code = -1;
71,167✔
3411
  if (pEntry->dataLen < sizeof(SMsgHead)) {
71,167!
3412
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
3413
    sError("vgId:%d, msg:%p, cannot append an invalid client request with no msg head, type:%s dataLen:%d", ths->vgId,
×
3414
           pMsg, TMSG_INFO(pEntry->originalRpcType), pEntry->dataLen);
3415
    syncEntryDestroy(pEntry);
×
3416
    pEntry = NULL;
×
3417
    goto _out;
×
3418
  }
3419

3420
  // append to log buffer
3421
  if ((code = syncLogBufferAppend(ths->pLogBuf, ths, pEntry)) < 0) {
71,167✔
3422
    sError("vgId:%d, index:%" PRId64 ", failed to enqueue sync log buffer", ths->vgId, pEntry->index);
1!
3423
    int32_t ret = 0;
1✔
3424
    if ((ret = syncFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, terrno, false)) != 0) {
1!
3425
      sError("vgId:%d, index:%" PRId64 ", failed to execute fsm since %s", ths->vgId, pEntry->index, tstrerror(ret));
×
3426
    }
3427
    syncEntryDestroy(pEntry);
×
3428
    pEntry = NULL;
×
3429
    goto _out;
×
3430
  }
3431

3432
  code = 0;
71,164✔
3433
_out:;
71,164✔
3434
  // proceed match index, with replicating on needed
3435
  SyncIndex matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, NULL, "Append", pMsg);
71,164✔
3436
  const STraceId* trace = pEntry ? &pEntry->originRpcTraceId : NULL;
71,169!
3437

3438
  if (pEntry != NULL) {
71,169!
3439
    sGDebug(trace,
71,170!
3440
            "vgId:%d, index:%" PRId64 ", raft entry appended, msg:%p term:%" PRId64 " buf:[%" PRId64 " %" PRId64
3441
            " %" PRId64 ", %" PRId64 ")",
3442
            ths->vgId, pEntry->index, pMsg, pEntry->term, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
3443
            ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex);
3444
  }
3445

3446
  if (code == 0 && ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
71,170!
3447
    int64_t index = syncNodeUpdateAssignedCommitIndex(ths, matchIndex);
×
3448
    sGTrace(trace, "vgId:%d, index:%" PRId64 ", update assigned commit, msg:%p", ths->vgId, index, pMsg);
×
3449

3450
    if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
×
3451
        syncLogBufferCommit(ths->pLogBuf, ths, ths->assignedCommitIndex, trace, "append-entry") < 0) {
×
3452
      sGError(trace, "vgId:%d, index:%" PRId64 ", failed to commit, msg:%p commit index:%" PRId64, ths->vgId, index,
×
3453
              pMsg, ths->commitIndex);
3454
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
3455
    }
3456
  }
3457

3458
  // multi replica
3459
  if (ths->replicaNum > 1) {
71,170✔
3460
    TAOS_RETURN(code);
31,956✔
3461
  }
3462

3463
  // single replica
3464
  SyncIndex returnIndex = syncNodeUpdateCommitIndex(ths, matchIndex);
39,214✔
3465
  sGTrace(trace, "vgId:%d, index:%" PRId64 ", raft entry update commit, msg:%p return index:%" PRId64, ths->vgId,
39,213!
3466
          matchIndex, pMsg, returnIndex);
3467

3468
  if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
78,428!
3469
      (code = syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex, trace, "append-entry")) < 0) {
39,213✔
3470
    sGError(trace,
×
3471
            "vgId:%d, index:%" PRId64 ", failed to commit, msg:%p commit index:%" PRId64 " return index:%" PRId64,
3472
            ths->vgId, matchIndex, pMsg, ths->commitIndex, returnIndex);
3473
  }
3474

3475
  TAOS_RETURN(code);
39,215✔
3476
}
3477

3478
bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode) {
70,910✔
3479
  if (pSyncNode->totalReplicaNum == 1) {
70,910✔
3480
    return false;
38,948✔
3481
  }
3482

3483
  int32_t toCount = 0;
31,962✔
3484
  int64_t tsNow = taosGetTimestampMs();
31,970✔
3485
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
95,877✔
3486
    if (pSyncNode->peersNodeInfo[i].nodeRole == TAOS_SYNC_ROLE_LEARNER) {
63,909✔
3487
      continue;
16✔
3488
    }
3489
    int64_t recvTime = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
63,893✔
3490
    if (recvTime == 0 || recvTime == -1) {
63,891!
3491
      continue;
×
3492
    }
3493

3494
    if (tsNow - recvTime > tsHeartbeatTimeout) {
63,891!
3495
      toCount++;
×
3496
    }
3497
  }
3498

3499
  bool b = (toCount >= pSyncNode->quorum ? true : false);
31,968✔
3500

3501
  return b;
31,968✔
3502
}
3503

3504
bool syncNodeSnapshotSending(SSyncNode* pSyncNode) {
×
3505
  if (pSyncNode == NULL) return false;
×
3506
  bool b = false;
×
3507
  for (int32_t i = 0; i < pSyncNode->totalReplicaNum; ++i) {
×
3508
    if (pSyncNode->senders[i] != NULL && pSyncNode->senders[i]->start) {
×
3509
      b = true;
×
3510
      break;
×
3511
    }
3512
  }
3513
  return b;
×
3514
}
3515

3516
bool syncNodeSnapshotRecving(SSyncNode* pSyncNode) {
×
3517
  if (pSyncNode == NULL) return false;
×
3518
  if (pSyncNode->pNewNodeReceiver == NULL) return false;
×
3519
  if (pSyncNode->pNewNodeReceiver->start) return true;
×
3520
  return false;
×
3521
}
3522

3523
static int32_t syncNodeAppendNoop(SSyncNode* ths) {
282✔
3524
  int32_t   code = 0;
282✔
3525
  SyncIndex index = syncLogBufferGetEndIndex(ths->pLogBuf);
282✔
3526
  SyncTerm  term = raftStoreGetTerm(ths);
282✔
3527

3528
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
282✔
3529
  if (pEntry == NULL) {
282!
3530
    code = TSDB_CODE_OUT_OF_MEMORY;
×
3531
    TAOS_RETURN(code);
×
3532
  }
3533

3534
  code = syncNodeAppend(ths, pEntry, NULL);
282✔
3535
  TAOS_RETURN(code);
282✔
3536
}
3537

3538
#ifdef BUILD_NO_CALL
3539
static int32_t syncNodeAppendNoopOld(SSyncNode* ths) {
3540
  int32_t ret = 0;
3541

3542
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
3543
  SyncTerm        term = raftStoreGetTerm(ths);
3544
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
3545
  if (pEntry == NULL) return -1;
3546

3547
  LRUHandle* h = NULL;
3548

3549
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
3550
    int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry, false);
3551
    if (code != 0) {
3552
      sError("append noop error");
3553
      return -1;
3554
    }
3555

3556
    syncCacheEntry(ths->pLogStore, pEntry, &h);
3557
  }
3558

3559
  if (h) {
3560
    taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
3561
  } else {
3562
    syncEntryDestroy(pEntry);
3563
  }
3564

3565
  return ret;
3566
}
3567
#endif
3568

3569
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
4,237✔
3570
  SyncHeartbeat* pMsg = pRpcMsg->pCont;
4,237✔
3571
  bool           resetElect = false;
4,237✔
3572

3573
  int64_t tsMs = taosGetTimestampMs();
4,237✔
3574
  int64_t timeDiff = tsMs - pMsg->timeStamp;
4,237✔
3575
  syncLogRecvHeartbeat(ths, pMsg, timeDiff, &pRpcMsg->info.traceId);
4,237✔
3576

3577
  if (!syncNodeInRaftGroup(ths, &pMsg->srcId)) {
4,237!
3578
    sWarn(
×
3579
        "vgId:%d, drop heartbeat msg from dnode:%d, because it come from another cluster:%d, differ from current "
3580
        "cluster:%d",
3581
        ths->vgId, DID(&(pMsg->srcId)), CID(&(pMsg->srcId)), CID(&(ths->myRaftId)));
3582
    return 0;
×
3583
  }
3584

3585
  SRpcMsg rpcMsg = {0};
4,237✔
3586
  TAOS_CHECK_RETURN(syncBuildHeartbeatReply(&rpcMsg, ths->vgId));
4,237!
3587
  SyncTerm currentTerm = raftStoreTryGetTerm(ths);
4,237✔
3588

3589
  SyncHeartbeatReply* pMsgReply = rpcMsg.pCont;
4,236✔
3590
  pMsgReply->destId = pMsg->srcId;
4,236✔
3591
  pMsgReply->srcId = ths->myRaftId;
4,236✔
3592
  pMsgReply->term = currentTerm;
4,236✔
3593
  pMsgReply->privateTerm = 8864;  // magic number
4,236✔
3594
  pMsgReply->startTime = ths->startTime;
4,236✔
3595
  pMsgReply->timeStamp = tsMs;
4,236✔
3596
  rpcMsg.info.traceId = pRpcMsg->info.traceId;
4,236✔
3597

3598
  // reply
3599
  TRACE_SET_MSGID(&(rpcMsg.info.traceId), tGenIdPI64());
4,236✔
3600
  sGDebug(&rpcMsg.info.traceId, "vgId:%d, send sync-heartbeat-reply to dnode:%d term:%" PRId64 " timestamp:%" PRId64,
4,237!
3601
          ths->vgId, DID(&(pMsgReply->destId)), pMsgReply->term, pMsgReply->timeStamp);
3602

3603
  TAOS_CHECK_RETURN(syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg));
4,237!
3604

3605
  if (currentTerm == 0) currentTerm = raftStoreGetTerm(ths);
4,237✔
3606
  sGDebug(&rpcMsg.info.traceId,
4,237!
3607
          "vgId:%d, process sync-heartbeat msg from dnode:%d, commit-index:%" PRId64 ", cluster:%d msgTerm:%" PRId64
3608
          " currentTerm:%" PRId64,
3609
          ths->vgId, DID(&(pMsg->srcId)), pMsg->commitIndex, CID(&(pMsg->srcId)), pMsg->term, currentTerm);
3610

3611
  if (pMsg->term > currentTerm && ths->state == TAOS_SYNC_STATE_LEARNER) {
4,237!
3612
    raftStoreSetTerm(ths, pMsg->term);
6✔
3613
    currentTerm = pMsg->term;
6✔
3614
  }
3615

3616
  if (pMsg->term == currentTerm &&
4,237!
3617
      (ths->state != TAOS_SYNC_STATE_LEADER && ths->state != TAOS_SYNC_STATE_ASSIGNED_LEADER)) {
4,237!
3618
    syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs);
4,237✔
3619
    resetElect = true;
4,236✔
3620

3621
    ths->minMatchIndex = pMsg->minMatchIndex;
4,236✔
3622

3623
    if (ths->state == TAOS_SYNC_STATE_FOLLOWER || ths->state == TAOS_SYNC_STATE_LEARNER) {
4,236!
3624
      SRpcMsg rpcMsgLocalCmd = {0};
4,236✔
3625
      TAOS_CHECK_RETURN(syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId));
4,236!
3626
      rpcMsgLocalCmd.info.traceId = rpcMsg.info.traceId;
4,237✔
3627

3628
      SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
4,237✔
3629
      pSyncMsg->cmd =
4,237✔
3630
          (ths->state == TAOS_SYNC_STATE_LEARNER) ? SYNC_LOCAL_CMD_LEARNER_CMT : SYNC_LOCAL_CMD_FOLLOWER_CMT;
4,237✔
3631
      pSyncMsg->commitIndex = pMsg->commitIndex;
4,237✔
3632
      pSyncMsg->currentTerm = pMsg->term;
4,237✔
3633

3634
      if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
4,237!
3635
        int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
4,237✔
3636
        if (code != 0) {
4,237!
3637
          sError("vgId:%d, failed to enqueue commit msg from heartbeat since %s, code:%d", ths->vgId, terrstr(), code);
×
3638
          rpcFreeCont(rpcMsgLocalCmd.pCont);
×
3639
        } else {
3640
          sGTrace(&rpcMsg.info.traceId,
4,237!
3641
                  "vgId:%d, enqueue commit msg from heartbeat, commit-index:%" PRId64 ", term:%" PRId64, ths->vgId,
3642
                  pMsg->commitIndex, pMsg->term);
3643
        }
3644
      }
3645
    }
3646
  }
3647

3648
  if (pMsg->term >= currentTerm &&
4,236!
3649
      (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER)) {
4,236!
3650
    SRpcMsg rpcMsgLocalCmd = {0};
×
3651
    TAOS_CHECK_RETURN(syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId));
×
3652
    rpcMsgLocalCmd.info.traceId = rpcMsg.info.traceId;
×
3653

3654
    SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
×
3655
    pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN;
×
3656
    pSyncMsg->currentTerm = pMsg->term;
×
3657
    pSyncMsg->commitIndex = pMsg->commitIndex;
×
3658

3659
    if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
×
3660
      int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
×
3661
      if (code != 0) {
×
3662
        sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code);
×
3663
        rpcFreeCont(rpcMsgLocalCmd.pCont);
×
3664
      } else {
3665
        sTrace("vgId:%d, sync enqueue step-down msg, new-term:%" PRId64, ths->vgId, pMsg->term);
×
3666
      }
3667
    }
3668
  }
3669

3670
  if (resetElect) syncNodeResetElectTimer(ths);
4,236!
3671
  return 0;
4,237✔
3672
}
3673

3674
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
4,236✔
3675
  int32_t code = 0;
4,236✔
3676

3677
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
4,236✔
3678
  SSyncLogReplMgr*    pMgr = syncNodeGetLogReplMgr(ths, &pMsg->srcId);
4,236✔
3679
  if (pMgr == NULL) {
4,235!
3680
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
3681
    if (terrno != 0) code = terrno;
×
3682
    sError("vgId:%d, failed to get log repl mgr for the peer at addr 0x016%" PRIx64, ths->vgId, pMsg->srcId.addr);
×
3683
    TAOS_RETURN(code);
×
3684
  }
3685

3686
  int64_t tsMs = taosGetTimestampMs();
4,234✔
3687
  syncLogRecvHeartbeatReply(ths, pMsg, tsMs - pMsg->timeStamp, &pRpcMsg->info.traceId);
4,234✔
3688

3689
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);
4,234✔
3690

3691
  return syncLogReplProcessHeartbeatReply(pMgr, ths, pMsg);
4,237✔
3692
}
3693

3694
#ifdef BUILD_NO_CALL
3695
int32_t syncNodeOnHeartbeatReplyOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
3696
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
3697

3698
  int64_t tsMs = taosGetTimestampMs();
3699
  int64_t timeDiff = tsMs - pMsg->timeStamp;
3700
  syncLogRecvHeartbeatReply(ths, pMsg, timeDiff, &pRpcMsg->info.traceId);
3701

3702
  // update last reply time, make decision whether the other node is alive or not
3703
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);
3704
  return 0;
3705
}
3706
#endif
3707

3708
int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
4,236✔
3709
  SyncLocalCmd* pMsg = pRpcMsg->pCont;
4,236✔
3710
  syncLogRecvLocalCmd(ths, pMsg, &pRpcMsg->info.traceId);
4,236✔
3711

3712
  if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
4,236!
3713
    SRaftId id = EMPTY_RAFT_ID;
×
3714
    syncNodeStepDown(ths, pMsg->currentTerm, id);
×
3715

3716
  } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT || pMsg->cmd == SYNC_LOCAL_CMD_LEARNER_CMT) {
8,473!
3717
    if (syncLogBufferIsEmpty(ths->pLogBuf)) {
4,236!
3718
      sError("vgId:%d, sync log buffer is empty", ths->vgId);
×
3719
      return 0;
×
3720
    }
3721
    SyncTerm matchTerm = syncLogBufferGetLastMatchTerm(ths->pLogBuf);
4,237✔
3722
    if (matchTerm < 0) {
4,237!
3723
      return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3724
    }
3725
    if (pMsg->currentTerm == matchTerm) {
4,237✔
3726
      SyncIndex returnIndex = syncNodeUpdateCommitIndex(ths, pMsg->commitIndex);
4,162✔
3727
      sTrace("vgId:%d, raft entry update commit, return index:%" PRId64, ths->vgId, returnIndex);
4,162!
3728
    }
3729
    if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
8,474!
3730
        syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex, &pRpcMsg->info.traceId, "heartbeat") < 0) {
4,237✔
3731
      sError("vgId:%d, failed to commit raft log since %s. commit index:%" PRId64, ths->vgId, terrstr(),
×
3732
             ths->commitIndex);
3733
    }
3734
  } else {
3735
    sError("error local cmd");
×
3736
  }
3737

3738
  return 0;
4,237✔
3739
}
3740

3741
// TLA+ Spec
3742
// ClientRequest(i, v) ==
3743
//     /\ state[i] = Leader
3744
//     /\ LET entry == [term  |-> currentTerm[i],
3745
//                      value |-> v]
3746
//            newLog == Append(log[i], entry)
3747
//        IN  log' = [log EXCEPT ![i] = newLog]
3748
//     /\ UNCHANGED <<messages, serverVars, candidateVars,
3749
//                    leaderVars, commitIndex>>
3750
//
3751

3752
int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex) {
70,874✔
3753
  sGDebug(&pMsg->info.traceId, "vgId:%d, msg:%p, process client request", ths->vgId, pMsg);
70,874!
3754
  int32_t code = 0;
70,883✔
3755

3756
  SyncIndex       index = syncLogBufferGetEndIndex(ths->pLogBuf);
70,883✔
3757
  SyncTerm        term = raftStoreGetTerm(ths);
70,888✔
3758
  SSyncRaftEntry* pEntry = NULL;
70,889✔
3759
  if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
70,889✔
3760
    pEntry = syncEntryBuildFromClientRequest(pMsg->pCont, term, index, &pMsg->info.traceId);
34,190✔
3761
  } else {
3762
    pEntry = syncEntryBuildFromRpcMsg(pMsg, term, index);
36,699✔
3763
  }
3764

3765
  if (pEntry == NULL) {
70,886!
3766
    sGError(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to process client request since %s", ths->vgId, pMsg, terrstr());
×
3767
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3768
  }
3769

3770
  // 1->2, config change is add in write thread, and will continue in sync thread
3771
  // need save message for it
3772
  if (pMsg->msgType == TDMT_SYNC_CONFIG_CHANGE) {
70,886!
3773
    SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
×
3774
    uint64_t  seqNum = syncRespMgrAdd(ths->pSyncRespMgr, &stub);
×
3775
    pEntry->seqNum = seqNum;
×
3776
  }
3777

3778
  if (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
70,886!
3779
    if (pRetIndex) {
70,886✔
3780
      (*pRetIndex) = index;
36,694✔
3781
    }
3782

3783
    if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) {
70,886!
3784
      int32_t code = syncNodeCheckChangeConfig(ths, pEntry);
×
3785
      if (code < 0) {
×
3786
        sGError(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to check change config since %s", ths->vgId, pMsg, terrstr());
×
3787
        syncEntryDestroy(pEntry);
×
3788
        pEntry = NULL;
×
3789
        TAOS_RETURN(code);
×
3790
      }
3791

3792
      if (code > 0) {
×
3793
        SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
×
3794
        int32_t num = syncRespMgrGetAndDel(ths->pSyncRespMgr, pEntry->seqNum, &rsp.info);
×
3795
        sGDebug(&pMsg->info.traceId, "vgId:%d, msg:%p, get response stub for config change, seqNum:%" PRIu64 " num:%d",
×
3796
                ths->vgId, pMsg, pEntry->seqNum, num);
3797
        if (rsp.info.handle != NULL) {
×
3798
          tmsgSendRsp(&rsp);
×
3799
        }
3800
        syncEntryDestroy(pEntry);
×
3801
        pEntry = NULL;
×
3802
        TAOS_RETURN(code);
×
3803
      }
3804
    }
3805

3806
    code = syncNodeAppend(ths, pEntry, pMsg);
70,886✔
3807
    return code;
70,889✔
3808
  } else {
3809
    syncEntryDestroy(pEntry);
×
3810
    pEntry = NULL;
×
3811
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3812
  }
3813
}
3814

3815
const char* syncStr(ESyncState state) {
8,819✔
3816
  switch (state) {
8,819!
3817
    case TAOS_SYNC_STATE_FOLLOWER:
3,271✔
3818
      return "follower";
3,271✔
3819
    case TAOS_SYNC_STATE_CANDIDATE:
98✔
3820
      return "candidate";
98✔
3821
    case TAOS_SYNC_STATE_LEADER:
4,996✔
3822
      return "leader";
4,996✔
3823
    case TAOS_SYNC_STATE_ERROR:
×
3824
      return "error";
×
3825
    case TAOS_SYNC_STATE_OFFLINE:
122✔
3826
      return "offline";
122✔
3827
    case TAOS_SYNC_STATE_LEARNER:
331✔
3828
      return "learner";
331✔
3829
    case TAOS_SYNC_STATE_ASSIGNED_LEADER:
×
3830
      return "assigned leader";
×
3831
    default:
1✔
3832
      return "unknown";
1✔
3833
  }
3834
}
3835

3836
int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) {
59✔
3837
  for (int32_t i = 0; i < pNewCfg->totalReplicaNum; ++i) {
69!
3838
    SRaftId raftId = {
69✔
3839
        .addr = SYNC_ADDR(&pNewCfg->nodeInfo[i]),
69✔
3840
        .vgId = ths->vgId,
69✔
3841
    };
3842

3843
    if (syncUtilSameId(&(ths->myRaftId), &raftId)) {
69✔
3844
      pNewCfg->myIndex = i;
59✔
3845
      return 0;
59✔
3846
    }
3847
  }
3848

3849
  return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3850
}
3851

3852
bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) {
70,909✔
3853
  return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1);
70,909!
3854
}
3855

3856
bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId) {
132,726✔
3857
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
251,822!
3858
    if (syncUtilSameId(&((ths->replicasId)[i]), pRaftId)) {
251,824✔
3859
      return true;
132,728✔
3860
    }
3861
  }
3862
  return false;
3863
}
3864

3865
SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId) {
2,408✔
3866
  SSyncSnapshotSender* pSender = NULL;
2,408✔
3867
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
9,579✔
3868
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
7,172✔
3869
      pSender = (ths->senders)[i];
2,407✔
3870
    }
3871
  }
3872
  return pSender;
2,407✔
3873
}
3874

3875
SSyncTimer* syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId) {
597✔
3876
  SSyncTimer* pTimer = NULL;
597✔
3877
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
2,281✔
3878
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
1,684✔
3879
      pTimer = &((ths->peerHeartbeatTimerArr)[i]);
597✔
3880
    }
3881
  }
3882
  return pTimer;
597✔
3883
}
3884

3885
SPeerState* syncNodeGetPeerState(SSyncNode* ths, const SRaftId* pDestId) {
703✔
3886
  SPeerState* pState = NULL;
703✔
3887
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
1,406✔
3888
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
703!
3889
      pState = &((ths->peerStates)[i]);
703✔
3890
    }
3891
  }
3892
  return pState;
703✔
3893
}
3894

3895
#ifdef BUILD_NO_CALL
3896
bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const SyncAppendEntries* pMsg) {
3897
  SPeerState* pState = syncNodeGetPeerState(ths, pDestId);
3898
  if (pState == NULL) {
3899
    sError("vgId:%d, replica maybe dropped", ths->vgId);
3900
    return false;
3901
  }
3902

3903
  SyncIndex sendIndex = pMsg->prevLogIndex + 1;
3904
  int64_t   tsNow = taosGetTimestampMs();
3905

3906
  if (pState->lastSendIndex == sendIndex && tsNow - pState->lastSendTime < SYNC_APPEND_ENTRIES_TIMEOUT_MS) {
3907
    return false;
3908
  }
3909

3910
  return true;
3911
}
3912

3913
bool syncNodeCanChange(SSyncNode* pSyncNode) {
3914
  if (pSyncNode->changing) {
3915
    sError("sync cannot change");
3916
    return false;
3917
  }
3918

3919
  if ((pSyncNode->commitIndex >= SYNC_INDEX_BEGIN)) {
3920
    SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
3921
    if (pSyncNode->commitIndex != lastIndex) {
3922
      sError("sync cannot change2");
3923
      return false;
3924
    }
3925
  }
3926

3927
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
3928
    SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->peersId)[i]);
3929
    if (pSender != NULL && pSender->start) {
3930
      sError("sync cannot change3");
3931
      return false;
3932
    }
3933
  }
3934

3935
  return true;
3936
}
3937
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc