• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3593

24 Jan 2025 08:57AM UTC coverage: 63.239% (-0.3%) from 63.546%
#3593

push

travis-ci

web-flow
Merge pull request #29638 from taosdata/docs/TS-5846-3.0

enh: TDengine modify taosBenchmark new query rule cases and add doc

140619 of 285630 branches covered (49.23%)

Branch coverage included in aggregate %.

218877 of 282844 relevant lines covered (77.38%)

19647377.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

51.93
/source/libs/sync/src/syncMain.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "sync.h"
18
#include "syncAppendEntries.h"
19
#include "syncAppendEntriesReply.h"
20
#include "syncCommit.h"
21
#include "syncElection.h"
22
#include "syncEnv.h"
23
#include "syncIndexMgr.h"
24
#include "syncInt.h"
25
#include "syncMessage.h"
26
#include "syncPipeline.h"
27
#include "syncRaftCfg.h"
28
#include "syncRaftLog.h"
29
#include "syncRaftStore.h"
30
#include "syncReplication.h"
31
#include "syncRequestVote.h"
32
#include "syncRequestVoteReply.h"
33
#include "syncRespMgr.h"
34
#include "syncSnapshot.h"
35
#include "syncTimeout.h"
36
#include "syncUtil.h"
37
#include "syncVoteMgr.h"
38
#include "tglobal.h"
39
#include "tmisce.h"
40
#include "tref.h"
41

42
static void    syncNodeEqPingTimer(void* param, void* tmrId);
43
static void    syncNodeEqElectTimer(void* param, void* tmrId);
44
static void    syncNodeEqHeartbeatTimer(void* param, void* tmrId);
45
static int32_t syncNodeAppendNoop(SSyncNode* ths);
46
static void    syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId);
47
static bool    syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg);
48
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId);
49
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
50
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
51
static int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg);
52
static bool    syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config);
53
static int32_t syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex);
54
static bool    syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg);
55

56
static bool    syncNodeCanChange(SSyncNode* pSyncNode);
57
static int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode);
58
static int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader);
59
static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry);
60

61
static ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
62

63
int64_t syncOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
13,992✔
64
  sInfo("vgId:%d, start to open sync", pSyncInfo->vgId);
13,992✔
65
  SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo, vnodeVersion);
13,992✔
66
  if (pSyncNode == NULL) {
13,997!
67
    sError("vgId:%d, failed to open sync node", pSyncInfo->vgId);
×
68
    return -1;
×
69
  }
70

71
  pSyncNode->rid = syncNodeAdd(pSyncNode);
13,997✔
72
  if (pSyncNode->rid < 0) {
13,997!
73
    syncNodeClose(pSyncNode);
×
74
    return -1;
×
75
  }
76

77
  pSyncNode->pingBaseLine = pSyncInfo->pingMs;
13,997✔
78
  pSyncNode->pingTimerMS = pSyncInfo->pingMs;
13,997✔
79
  pSyncNode->electBaseLine = pSyncInfo->electMs;
13,997✔
80
  pSyncNode->hbBaseLine = pSyncInfo->heartbeatMs;
13,997✔
81
  pSyncNode->heartbeatTimerMS = pSyncInfo->heartbeatMs;
13,997✔
82
  pSyncNode->msgcb = pSyncInfo->msgcb;
13,997✔
83
  sInfo("vgId:%d, sync opened", pSyncInfo->vgId);
13,997✔
84
  return pSyncNode->rid;
13,997✔
85
}
86

87
int32_t syncStart(int64_t rid) {
13,996✔
88
  int32_t    code = 0;
13,996✔
89
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
13,996✔
90
  if (pSyncNode == NULL) {
13,996!
91
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
92
    if (terrno != 0) code = terrno;
×
93
    sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid);
×
94
    TAOS_RETURN(code);
×
95
  }
96
  sInfo("vgId:%d, begin to start sync", pSyncNode->vgId);
13,996✔
97

98
  if ((code = syncNodeRestore(pSyncNode)) < 0) {
13,996!
99
    sError("vgId:%d, failed to restore sync log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
100
    goto _err;
×
101
  }
102

103
  if ((code = syncNodeStart(pSyncNode)) < 0) {
13,996!
104
    sError("vgId:%d, failed to start sync node since %s", pSyncNode->vgId, tstrerror(code));
×
105
    goto _err;
×
106
  }
107

108
  syncNodeRelease(pSyncNode);
13,996✔
109

110
  sInfo("vgId:%d, sync started", pSyncNode->vgId);
13,996✔
111

112
  TAOS_RETURN(code);
13,996✔
113

114
_err:
×
115
  syncNodeRelease(pSyncNode);
×
116
  TAOS_RETURN(code);
×
117
}
118

119
int32_t syncNodeGetConfig(int64_t rid, SSyncCfg* cfg) {
29,842✔
120
  int32_t    code = 0;
29,842✔
121
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
29,842✔
122

123
  if (pSyncNode == NULL) {
29,843!
124
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
125
    if (terrno != 0) code = terrno;
×
126
    sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid);
×
127
    TAOS_RETURN(code);
×
128
  }
129

130
  *cfg = pSyncNode->raftCfg.cfg;
29,843✔
131

132
  syncNodeRelease(pSyncNode);
29,843✔
133

134
  return 0;
29,842✔
135
}
136

137
void syncStop(int64_t rid) {
13,996✔
138
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
13,996✔
139
  if (pSyncNode != NULL) {
13,996!
140
    pSyncNode->isStart = false;
13,996✔
141
    syncNodeRelease(pSyncNode);
13,996✔
142
    syncNodeRemove(rid);
13,996✔
143
  }
144
}
13,994✔
145

146
void syncPreStop(int64_t rid) {
13,995✔
147
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
13,995✔
148
  if (pSyncNode != NULL) {
13,996!
149
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
13,996✔
150
      sInfo("vgId:%d, stop snapshot receiver", pSyncNode->vgId);
1!
151
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
1✔
152
    }
153
    syncNodePreClose(pSyncNode);
13,996✔
154
    syncNodeRelease(pSyncNode);
13,995✔
155
  }
156
}
13,996✔
157

158
void syncPostStop(int64_t rid) {
12,197✔
159
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
12,197✔
160
  if (pSyncNode != NULL) {
12,198!
161
    syncNodePostClose(pSyncNode);
12,198✔
162
    syncNodeRelease(pSyncNode);
12,198✔
163
  }
164
}
12,198✔
165

166
static bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pCfg) {
1,703✔
167
  if (!syncNodeInConfig(pSyncNode, pCfg)) return false;
1,703!
168
  return abs(pCfg->replicaNum - pSyncNode->replicaNum) <= 1;
1,703!
169
}
170

171
int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) {
1,817✔
172
  int32_t    code = 0;
1,817✔
173
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
1,817✔
174
  if (pSyncNode == NULL) {
1,817!
175
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
176
    if (terrno != 0) code = terrno;
×
177
    TAOS_RETURN(code);
×
178
  }
179

180
  if (pSyncNode->raftCfg.lastConfigIndex >= pNewCfg->lastIndex) {
1,817✔
181
    syncNodeRelease(pSyncNode);
114✔
182
    sInfo("vgId:%d, no need Reconfig, current index:%" PRId64 ", new index:%" PRId64, pSyncNode->vgId,
114!
183
          pSyncNode->raftCfg.lastConfigIndex, pNewCfg->lastIndex);
184
    return 0;
114✔
185
  }
186

187
  if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) {
1,703!
188
    syncNodeRelease(pSyncNode);
×
189
    code = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
×
190
    sError("vgId:%d, failed to reconfig since invalid new config", pSyncNode->vgId);
×
191
    TAOS_RETURN(code);
×
192
  }
193

194
  TAOS_CHECK_RETURN(syncNodeUpdateNewConfigIndex(pSyncNode, pNewCfg));
1,703!
195

196
  if (syncNodeDoConfigChange(pSyncNode, pNewCfg, pNewCfg->lastIndex) != 0) {
1,703!
197
    code = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
×
198
    sError("vgId:%d, failed to reconfig since do change error", pSyncNode->vgId);
×
199
    TAOS_RETURN(code);
×
200
  }
201

202
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER || pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
1,703!
203
    // TODO check return value
204
    TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
1,506!
205

206
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
24,096✔
207
      TAOS_CHECK_RETURN(syncHbTimerInit(pSyncNode, &pSyncNode->peerHeartbeatTimerArr[i], pSyncNode->replicasId[i]));
22,590!
208
    }
209

210
    TAOS_CHECK_RETURN(syncNodeStartHeartbeatTimer(pSyncNode));
1,506!
211
    // syncNodeReplicate(pSyncNode);
212
  }
213

214
  syncNodeRelease(pSyncNode);
1,703✔
215
  TAOS_RETURN(code);
1,703✔
216
}
217

218
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
7,265,004✔
219
  int32_t code = -1;
7,265,004✔
220
  if (!syncIsInit()) {
7,265,004!
221
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
222
    if (terrno != 0) code = terrno;
×
223
    TAOS_RETURN(code);
×
224
  }
225

226
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
7,265,005✔
227
  if (pSyncNode == NULL) {
7,265,462✔
228
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
337✔
229
    if (terrno != 0) code = terrno;
337!
230
    TAOS_RETURN(code);
×
231
  }
232

233
  switch (pMsg->msgType) {
7,265,125!
234
    case TDMT_SYNC_HEARTBEAT:
38,673✔
235
      code = syncNodeOnHeartbeat(pSyncNode, pMsg);
38,673✔
236
      break;
38,673✔
237
    case TDMT_SYNC_HEARTBEAT_REPLY:
38,092✔
238
      code = syncNodeOnHeartbeatReply(pSyncNode, pMsg);
38,092✔
239
      break;
38,092✔
240
    case TDMT_SYNC_TIMEOUT:
212,085✔
241
      code = syncNodeOnTimeout(pSyncNode, pMsg);
212,085✔
242
      break;
212,116✔
243
    case TDMT_SYNC_TIMEOUT_ELECTION:
1,258✔
244
      code = syncNodeOnTimeout(pSyncNode, pMsg);
1,258✔
245
      break;
1,258✔
246
    case TDMT_SYNC_CLIENT_REQUEST:
223,698✔
247
      code = syncNodeOnClientRequest(pSyncNode, pMsg, NULL);
223,698✔
248
      break;
223,706✔
249
    case TDMT_SYNC_REQUEST_VOTE:
2,107✔
250
      code = syncNodeOnRequestVote(pSyncNode, pMsg);
2,107✔
251
      break;
2,107✔
252
    case TDMT_SYNC_REQUEST_VOTE_REPLY:
1,995✔
253
      code = syncNodeOnRequestVoteReply(pSyncNode, pMsg);
1,995✔
254
      break;
1,995✔
255
    case TDMT_SYNC_APPEND_ENTRIES:
3,268,983✔
256
      code = syncNodeOnAppendEntries(pSyncNode, pMsg);
3,268,983✔
257
      break;
3,268,987✔
258
    case TDMT_SYNC_APPEND_ENTRIES_REPLY:
3,265,729✔
259
      code = syncNodeOnAppendEntriesReply(pSyncNode, pMsg);
3,265,729✔
260
      break;
3,265,729✔
261
    case TDMT_SYNC_SNAPSHOT_SEND:
86,983✔
262
      code = syncNodeOnSnapshot(pSyncNode, pMsg);
86,983✔
263
      break;
86,983✔
264
    case TDMT_SYNC_SNAPSHOT_RSP:
87,015✔
265
      code = syncNodeOnSnapshotRsp(pSyncNode, pMsg);
87,015✔
266
      break;
87,015✔
267
    case TDMT_SYNC_LOCAL_CMD:
38,502✔
268
      code = syncNodeOnLocalCmd(pSyncNode, pMsg);
38,502✔
269
      break;
38,503✔
270
    case TDMT_SYNC_FORCE_FOLLOWER:
5✔
271
      code = syncForceBecomeFollower(pSyncNode, pMsg);
5✔
272
      break;
5✔
273
    case TDMT_SYNC_SET_ASSIGNED_LEADER:
×
274
      code = syncBecomeAssignedLeader(pSyncNode, pMsg);
×
275
      break;
×
276
    default:
×
277
      code = TSDB_CODE_MSG_NOT_PROCESSED;
×
278
  }
279

280
  syncNodeRelease(pSyncNode);
7,265,169✔
281
  if (code != 0) {
7,265,155✔
282
    sDebug("vgId:%d, failed to process sync msg:%p type:%s since %s", pSyncNode->vgId, pMsg, TMSG_INFO(pMsg->msgType),
143!
283
           tstrerror(code));
284
  }
285
  TAOS_RETURN(code);
7,265,155✔
286
}
287

288
int32_t syncLeaderTransfer(int64_t rid) {
13,996✔
289
  int32_t    code = 0;
13,996✔
290
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
13,996✔
291
  if (pSyncNode == NULL) {
13,996!
292
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
293
    if (terrno != 0) code = terrno;
×
294
    TAOS_RETURN(code);
×
295
  }
296

297
  int32_t ret = syncNodeLeaderTransfer(pSyncNode);
13,996✔
298
  syncNodeRelease(pSyncNode);
13,996✔
299
  return ret;
13,996✔
300
}
301

302
int32_t syncForceBecomeFollower(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
5✔
303
  SRaftId id = {0};
5✔
304
  syncNodeBecomeFollower(ths, id, "force election");
5✔
305

306
  SRpcMsg rsp = {
5✔
307
      .code = 0,
308
      .pCont = pRpcMsg->info.rsp,
5✔
309
      .contLen = pRpcMsg->info.rspLen,
5✔
310
      .info = pRpcMsg->info,
311
  };
312
  tmsgSendRsp(&rsp);
5✔
313

314
  return 0;
5✔
315
}
316

317
int32_t syncBecomeAssignedLeader(SSyncNode* ths, SRpcMsg* pRpcMsg) {
×
318
  int32_t code = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
×
319
  void*   pHead = NULL;
×
320
  int32_t contLen = 0;
×
321

322
  SVArbSetAssignedLeaderReq req = {0};
×
323
  if (tDeserializeSVArbSetAssignedLeaderReq((char*)pRpcMsg->pCont + sizeof(SMsgHead), pRpcMsg->contLen, &req) != 0) {
×
324
    sError("vgId:%d, failed to deserialize SVArbSetAssignedLeaderReq", ths->vgId);
×
325
    code = TSDB_CODE_INVALID_MSG;
×
326
    goto _OVER;
×
327
  }
328

329
  if (ths->arbTerm > req.arbTerm) {
×
330
    sInfo("vgId:%d, skip to set assigned leader, msg with lower term, local:%" PRId64 "msg:%" PRId64, ths->vgId,
×
331
          ths->arbTerm, req.arbTerm);
332
    goto _OVER;
×
333
  }
334

335
  ths->arbTerm = TMAX(req.arbTerm, ths->arbTerm);
×
336

337
  if (strncmp(req.memberToken, ths->arbToken, TSDB_ARB_TOKEN_SIZE) != 0) {
×
338
    sInfo("vgId:%d, skip to set assigned leader, token mismatch, local:%s, msg:%s", ths->vgId, ths->arbToken,
×
339
          req.memberToken);
340
    goto _OVER;
×
341
  }
342

343
  if (ths->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
×
344
    code = TSDB_CODE_SUCCESS;
×
345
    raftStoreNextTerm(ths);
×
346
    if (terrno != TSDB_CODE_SUCCESS) {
×
347
      code = terrno;
×
348
      sError("vgId:%d, failed to set next term since:%s", ths->vgId, tstrerror(code));
×
349
      goto _OVER;
×
350
    }
351
    syncNodeBecomeAssignedLeader(ths);
×
352

353
    if ((code = syncNodeAppendNoop(ths)) < 0) {
×
354
      sError("vgId:%d, assigned leader failed to append noop entry since %s", ths->vgId, tstrerror(code));
×
355
    }
356
  }
357

358
  SVArbSetAssignedLeaderRsp rsp = {0};
×
359
  rsp.arbToken = req.arbToken;
×
360
  rsp.memberToken = req.memberToken;
×
361
  rsp.vgId = ths->vgId;
×
362

363
  contLen = tSerializeSVArbSetAssignedLeaderRsp(NULL, 0, &rsp);
×
364
  if (contLen <= 0) {
×
365
    code = TSDB_CODE_OUT_OF_MEMORY;
×
366
    sError("vgId:%d, failed to serialize SVArbSetAssignedLeaderRsp", ths->vgId);
×
367
    goto _OVER;
×
368
  }
369
  pHead = rpcMallocCont(contLen);
×
370
  if (!pHead) {
×
371
    code = terrno;
×
372
    sError("vgId:%d, failed to malloc memory for SVArbSetAssignedLeaderRsp", ths->vgId);
×
373
    goto _OVER;
×
374
  }
375
  if (tSerializeSVArbSetAssignedLeaderRsp(pHead, contLen, &rsp) <= 0) {
×
376
    code = TSDB_CODE_OUT_OF_MEMORY;
×
377
    sError("vgId:%d, failed to serialize SVArbSetAssignedLeaderRsp", ths->vgId);
×
378
    rpcFreeCont(pHead);
×
379
    goto _OVER;
×
380
  }
381

382
  code = TSDB_CODE_SUCCESS;
×
383

384
_OVER:;
×
385
  SRpcMsg rspMsg = {
×
386
      .code = code,
387
      .pCont = pHead,
388
      .contLen = contLen,
389
      .info = pRpcMsg->info,
390
  };
391

392
  tmsgSendRsp(&rspMsg);
×
393

394
  tFreeSVArbSetAssignedLeaderReq(&req);
×
395
  TAOS_RETURN(code);
×
396
}
397

398
int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) {
×
399
  int32_t    code = 0;
×
400
  SSyncNode* pNode = syncNodeAcquire(rid);
×
401
  if (pNode == NULL) {
×
402
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
403
    if (terrno != 0) code = terrno;
×
404
    TAOS_RETURN(code);
×
405
  }
406

407
  SRpcMsg rpcMsg = {0, .info.notFreeAhandle = 1};
×
408
  int32_t ret = syncRespMgrGetAndDel(pNode->pSyncRespMgr, seq, &rpcMsg.info);
×
409
  rpcMsg.code = TSDB_CODE_SYN_TIMEOUT;
×
410

411
  syncNodeRelease(pNode);
×
412
  if (ret == 1) {
×
413
    sInfo("send timeout response, seq:%" PRId64 " handle:%p ahandle:%p", seq, rpcMsg.info.handle, rpcMsg.info.ahandle);
×
414
    code = rpcSendResponse(&rpcMsg);
×
415
    return code;
×
416
  } else {
417
    sError("no message handle to send timeout response, seq:%" PRId64, seq);
×
418
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
419
  }
420
}
421

422
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) {
42,334✔
423
  SyncIndex minMatchIndex = SYNC_INDEX_INVALID;
42,334✔
424

425
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
126,402✔
426
    SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
84,068✔
427
    if (minMatchIndex == SYNC_INDEX_INVALID) {
84,068✔
428
      minMatchIndex = matchIndex;
45,460✔
429
    } else if (matchIndex > 0 && matchIndex < minMatchIndex) {
38,608✔
430
      minMatchIndex = matchIndex;
1,204✔
431
    }
432
  }
433
  return minMatchIndex;
42,334✔
434
}
435

436
static SyncIndex syncLogRetentionIndex(SSyncNode* pSyncNode, int64_t bytes) {
863✔
437
  return pSyncNode->pLogStore->syncLogIndexRetention(pSyncNode->pLogStore, bytes);
863✔
438
}
439

440
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
33,019✔
441
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
33,019✔
442
  int32_t    code = 0;
33,019✔
443
  if (pSyncNode == NULL) {
33,019✔
444
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
22✔
445
    if (terrno != 0) code = terrno;
22!
446
    sError("sync begin snapshot error");
22!
447
    TAOS_RETURN(code);
22✔
448
  }
449

450
  SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
32,997✔
451
  SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
32,997✔
452
  bool      isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);
32,996✔
453

454
  if (isEmpty || !(lastApplyIndex >= beginIndex && lastApplyIndex <= endIndex)) {
32,996!
455
    sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", empty:%d, do not delete wal", lastApplyIndex, isEmpty);
76!
456
    syncNodeRelease(pSyncNode);
76✔
457
    return 0;
76✔
458
  }
459

460
  int64_t logRetention = 0;
32,920✔
461

462
  if (syncNodeIsMnode(pSyncNode)) {
32,920✔
463
    // mnode
464
    logRetention = tsMndLogRetention;
3,154✔
465
  } else {
466
    // vnode
467
    if (pSyncNode->replicaNum > 1) {
29,766✔
468
      logRetention = SYNC_VNODE_LOG_RETENTION;
437✔
469
    }
470
  }
471

472
  if (pSyncNode->totalReplicaNum > 1) {
32,920✔
473
    if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER &&
881✔
474
        pSyncNode->state != TAOS_SYNC_STATE_LEARNER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
167!
475
      sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " candidate or unknown state, do not delete wal",
18!
476
              lastApplyIndex);
477
      syncNodeRelease(pSyncNode);
18✔
478
      return 0;
18✔
479
    }
480
    SyncIndex retentionIndex =
863✔
481
        TMAX(pSyncNode->minMatchIndex, syncLogRetentionIndex(pSyncNode, SYNC_WAL_LOG_RETENTION_SIZE));
863✔
482
    logRetention += TMAX(0, lastApplyIndex - retentionIndex);
863✔
483
  }
484

485
_DEL_WAL:
32,039✔
486

487
  do {
488
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
32,902✔
489
    SyncIndex          snapshotVer = walGetSnapshotVer(pData->pWal);
32,902✔
490
    SyncIndex          walCommitVer = walGetCommittedVer(pData->pWal);
32,902✔
491
    SyncIndex          wallastVer = walGetLastVer(pData->pWal);
32,902✔
492
    if (lastApplyIndex <= walCommitVer) {
32,902!
493
      SyncIndex snapshottingIndex = atomic_load_64(&pSyncNode->snapshottingIndex);
32,902✔
494

495
      if (snapshottingIndex == SYNC_INDEX_INVALID) {
32,902!
496
        atomic_store_64(&pSyncNode->snapshottingIndex, lastApplyIndex);
32,902✔
497
        pSyncNode->snapshottingTime = taosGetTimestampMs();
32,902✔
498

499
        code = walBeginSnapshot(pData->pWal, lastApplyIndex, logRetention);
32,902✔
500
        if (code == 0) {
32,903!
501
          sNTrace(pSyncNode, "wal snapshot begin, index:%" PRId64 ", last apply index:%" PRId64,
32,903✔
502
                  pSyncNode->snapshottingIndex, lastApplyIndex);
503
        } else {
504
          sNError(pSyncNode, "wal snapshot begin error since:%s, index:%" PRId64 ", last apply index:%" PRId64,
×
505
                  terrstr(), pSyncNode->snapshottingIndex, lastApplyIndex);
506
          atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
×
507
        }
508

509
      } else {
510
        sNTrace(pSyncNode, "snapshotting for %" PRId64 ", do not delete wal for new-snapshot-index:%" PRId64,
×
511
                snapshottingIndex, lastApplyIndex);
512
      }
513
    }
514
  } while (0);
515

516
  syncNodeRelease(pSyncNode);
32,903✔
517
  TAOS_RETURN(code);
32,903✔
518
}
519

520
int32_t syncEndSnapshot(int64_t rid) {
32,997✔
521
  int32_t    code = 0;
32,997✔
522
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
32,997✔
523
  if (pSyncNode == NULL) {
32,997!
524
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
525
    if (terrno != 0) code = terrno;
×
526
    sError("sync end snapshot error");
×
527
    TAOS_RETURN(code);
×
528
  }
529

530
  if (atomic_load_64(&pSyncNode->snapshottingIndex) != SYNC_INDEX_INVALID) {
32,997✔
531
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
32,903✔
532
    code = walEndSnapshot(pData->pWal);
32,903✔
533
    if (code != 0) {
32,903!
534
      sNError(pSyncNode, "wal snapshot end error since:%s", tstrerror(code));
×
535
      syncNodeRelease(pSyncNode);
×
536
      TAOS_RETURN(code);
×
537
    } else {
538
      sNTrace(pSyncNode, "wal snapshot end, index:%" PRId64, atomic_load_64(&pSyncNode->snapshottingIndex));
32,903✔
539
      atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
32,903✔
540
    }
541
  }
542

543
  syncNodeRelease(pSyncNode);
32,997✔
544
  TAOS_RETURN(code);
32,997✔
545
}
546

547
bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) {
14,743,584✔
548
  if (pSyncNode == NULL) {
14,743,584!
549
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
550
    sError("sync ready for read error");
×
551
    return false;
×
552
  }
553

554
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
14,743,584!
555
    terrno = TSDB_CODE_SYN_NOT_LEADER;
93,240✔
556
    return false;
93,240✔
557
  }
558

559
  if (!pSyncNode->restoreFinish) {
14,650,344✔
560
    terrno = TSDB_CODE_SYN_RESTORING;
41,377✔
561
    return false;
41,376✔
562
  }
563

564
  return true;
14,608,967✔
565
}
566

567
bool syncIsReadyForRead(int64_t rid) {
13,011,132✔
568
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
13,011,132✔
569
  if (pSyncNode == NULL) {
13,018,021!
570
    sError("sync ready for read error");
×
571
    return false;
×
572
  }
573

574
  bool ready = syncNodeIsReadyForRead(pSyncNode);
13,018,021✔
575

576
  syncNodeRelease(pSyncNode);
13,018,296✔
577
  return ready;
13,013,406✔
578
}
579

580
#ifdef BUILD_NO_CALL
581
bool syncSnapshotSending(int64_t rid) {
582
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
583
  if (pSyncNode == NULL) {
584
    return false;
585
  }
586

587
  bool b = syncNodeSnapshotSending(pSyncNode);
588
  syncNodeRelease(pSyncNode);
589
  return b;
590
}
591

592
bool syncSnapshotRecving(int64_t rid) {
593
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
594
  if (pSyncNode == NULL) {
595
    return false;
596
  }
597

598
  bool b = syncNodeSnapshotRecving(pSyncNode);
599
  syncNodeRelease(pSyncNode);
600
  return b;
601
}
602
#endif
603

604
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) {
13,996✔
605
  if (pSyncNode->peersNum == 0) {
13,996✔
606
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
10,568✔
607
    return 0;
10,568✔
608
  }
609

610
  int32_t ret = 0;
3,428✔
611
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && pSyncNode->replicaNum > 1) {
3,428✔
612
    SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0];
1,025✔
613
    if (pSyncNode->peersNum == 2) {
1,025✔
614
      SyncIndex matchIndex0 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
722✔
615
      SyncIndex matchIndex1 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[1]));
722✔
616
      if (matchIndex1 > matchIndex0) {
722✔
617
        newLeader = (pSyncNode->peersNodeInfo)[1];
28✔
618
      }
619
    }
620
    ret = syncNodeLeaderTransferTo(pSyncNode, newLeader);
1,025✔
621
  }
622

623
  return ret;
3,428✔
624
}
625

626
int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) {
1,025✔
627
  if (pSyncNode->replicaNum == 1) {
1,025!
628
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
×
629
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
630
  }
631

632
  sNTrace(pSyncNode, "begin leader transfer to %s:%u", newLeader.nodeFqdn, newLeader.nodePort);
1,025!
633

634
  SRpcMsg rpcMsg = {0};
1,025✔
635
  TAOS_CHECK_RETURN(syncBuildLeaderTransfer(&rpcMsg, pSyncNode->vgId));
1,025!
636

637
  SyncLeaderTransfer* pMsg = rpcMsg.pCont;
1,025✔
638
  pMsg->newLeaderId.addr = SYNC_ADDR(&newLeader);
1,025✔
639
  pMsg->newLeaderId.vgId = pSyncNode->vgId;
1,025✔
640
  pMsg->newNodeInfo = newLeader;
1,025✔
641

642
  int32_t ret = syncNodePropose(pSyncNode, &rpcMsg, false, NULL);
1,025✔
643
  rpcFreeCont(rpcMsg.pCont);
1,025✔
644
  return ret;
1,025✔
645
}
646

647
SSyncState syncGetState(int64_t rid) {
5,845,168✔
648
  SSyncState state = {.state = TAOS_SYNC_STATE_ERROR};
5,845,168✔
649

650
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
5,845,168✔
651
  if (pSyncNode != NULL) {
5,845,989✔
652
    state.state = pSyncNode->state;
5,845,945✔
653
    state.roleTimeMs = pSyncNode->roleTimeMs;
5,845,945✔
654
    state.startTimeMs = pSyncNode->startTime;
5,845,945✔
655
    state.restored = pSyncNode->restoreFinish;
5,845,945✔
656
    if (pSyncNode->vgId != 1) {
5,845,945✔
657
      state.canRead = syncNodeIsReadyForRead(pSyncNode);
1,729,382✔
658
    } else {
659
      state.canRead = state.restored;
4,116,563✔
660
    }
661
    /*
662
    double progress = 0;
663
    if(pSyncNode->pLogBuf->totalIndex > 0 && pSyncNode->pLogBuf->commitIndex > 0){
664
      progress = (double)pSyncNode->pLogBuf->commitIndex/(double)pSyncNode->pLogBuf->totalIndex;
665
      state.progress = (int32_t)(progress * 100);
666
    }
667
    else{
668
      state.progress = -1;
669
    }
670
    sDebug("vgId:%d, learner progress state, commitIndex:%" PRId64 " totalIndex:%" PRId64 ", "
671
            "progress:%lf, progress:%d",
672
          pSyncNode->vgId,
673
         pSyncNode->pLogBuf->commitIndex, pSyncNode->pLogBuf->totalIndex, progress, state.progress);
674
    */
675
    state.term = raftStoreGetTerm(pSyncNode);
5,845,854✔
676
    syncNodeRelease(pSyncNode);
5,846,055✔
677
  }
678

679
  return state;
5,846,008✔
680
}
681

682
int32_t syncGetArbToken(int64_t rid, char* outToken) {
18,201✔
683
  int32_t    code = 0;
18,201✔
684
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
18,201✔
685
  if (pSyncNode == NULL) {
18,201!
686
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
687
    if (terrno != 0) code = terrno;
×
688
    TAOS_RETURN(code);
×
689
  }
690

691
  memset(outToken, 0, TSDB_ARB_TOKEN_SIZE);
18,201✔
692
  (void)taosThreadMutexLock(&pSyncNode->arbTokenMutex);
18,201✔
693
  tstrncpy(outToken, pSyncNode->arbToken, TSDB_ARB_TOKEN_SIZE);
18,201✔
694
  (void)taosThreadMutexUnlock(&pSyncNode->arbTokenMutex);
18,201✔
695

696
  syncNodeRelease(pSyncNode);
18,201✔
697
  TAOS_RETURN(code);
18,201✔
698
}
699

700
int32_t syncGetAssignedLogSynced(int64_t rid) {
2✔
701
  int32_t    code = 0;
2✔
702
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
2✔
703
  if (pSyncNode == NULL) {
2!
704
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
705
    if (terrno != 0) code = terrno;
×
706
    TAOS_RETURN(code);
×
707
  }
708

709
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
2!
710
    code = TSDB_CODE_VND_ARB_NOT_SYNCED;
×
711
    syncNodeRelease(pSyncNode);
×
712
    TAOS_RETURN(code);
×
713
  }
714

715
  bool isSync = pSyncNode->commitIndex >= pSyncNode->assignedCommitIndex;
2✔
716
  code = (isSync ? TSDB_CODE_SUCCESS : TSDB_CODE_VND_ARB_NOT_SYNCED);
2!
717

718
  syncNodeRelease(pSyncNode);
2✔
719
  TAOS_RETURN(code);
2✔
720
}
721

722
int32_t syncUpdateArbTerm(int64_t rid, SyncTerm arbTerm) {
34✔
723
  int32_t    code = 0;
34✔
724
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
34✔
725
  if (pSyncNode == NULL) {
34!
726
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
727
    if (terrno != 0) code = terrno;
×
728
    TAOS_RETURN(code);
×
729
  }
730

731
  pSyncNode->arbTerm = TMAX(arbTerm, pSyncNode->arbTerm);
34✔
732
  syncNodeRelease(pSyncNode);
34✔
733
  TAOS_RETURN(code);
34✔
734
}
735

736
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex) {
3,781,825✔
737
  if (!(pSyncNode->raftCfg.configIndexCount >= 1)) {
3,781,825!
738
    sError("vgId:%d, failed get snapshot config index, configIndexCount:%d", pSyncNode->vgId,
×
739
           pSyncNode->raftCfg.configIndexCount);
740
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
741
    return -2;
×
742
  }
743
  SyncIndex lastIndex = (pSyncNode->raftCfg.configIndexArr)[0];
3,781,825✔
744

745
  for (int32_t i = 0; i < pSyncNode->raftCfg.configIndexCount; ++i) {
7,604,541✔
746
    if ((pSyncNode->raftCfg.configIndexArr)[i] > lastIndex &&
3,822,716✔
747
        (pSyncNode->raftCfg.configIndexArr)[i] <= snapshotLastApplyIndex) {
40,888!
748
      lastIndex = (pSyncNode->raftCfg.configIndexArr)[i];
40,888✔
749
    }
750
  }
751
  sTrace("vgId:%d, sync get last config index, index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId,
3,781,825✔
752
         snapshotLastApplyIndex, lastIndex);
753

754
  return lastIndex;
3,781,825✔
755
}
756

757
static SRaftId syncGetRaftIdByEp(SSyncNode* pSyncNode, const SEp* pEp) {
106,373✔
758
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
213,335✔
759
    if (strcmp(pEp->fqdn, (pSyncNode->peersNodeInfo)[i].nodeFqdn) == 0 &&
175,910!
760
        pEp->port == (pSyncNode->peersNodeInfo)[i].nodePort) {
175,910✔
761
      return pSyncNode->peersId[i];
68,948✔
762
    }
763
  }
764
  return EMPTY_RAFT_ID;
37,425✔
765
}
766

767
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
37,564✔
768
  pEpSet->numOfEps = 0;
37,564✔
769

770
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
37,564✔
771
  if (pSyncNode == NULL) return;
37,564!
772

773
  int index = -1;
37,564✔
774

775
  int j = 0;
37,564✔
776
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
145,104✔
777
    if (pSyncNode->raftCfg.cfg.nodeInfo[i].nodeRole == TAOS_SYNC_ROLE_LEARNER) continue;
107,540✔
778
    SEp* pEp = &pEpSet->eps[j];
106,373✔
779
    tstrncpy(pEp->fqdn, pSyncNode->raftCfg.cfg.nodeInfo[i].nodeFqdn, TSDB_FQDN_LEN);
106,373✔
780
    pEp->port = (pSyncNode->raftCfg.cfg.nodeInfo)[i].nodePort;
106,373✔
781
    pEpSet->numOfEps++;
106,373✔
782
    sDebug("vgId:%d, sync get retry epset, index:%d %s:%d", pSyncNode->vgId, i, pEp->fqdn, pEp->port);
106,373✔
783
    SRaftId id = syncGetRaftIdByEp(pSyncNode, pEp);
106,373✔
784
    if (id.addr == pSyncNode->leaderCache.addr && id.vgId == pSyncNode->leaderCache.vgId && id.addr != 0 &&
106,373!
785
        id.vgId != 0)
14,714!
786
      index = j;
14,714✔
787
    j++;
106,373✔
788
  }
789
  if (pEpSet->numOfEps > 0) {
37,564!
790
    if (index != -1) {
37,564✔
791
      pEpSet->inUse = index;
14,714✔
792
    } else {
793
      if (pSyncNode->myRaftId.addr == pSyncNode->leaderCache.addr &&
22,850✔
794
          pSyncNode->myRaftId.vgId == pSyncNode->leaderCache.vgId) {
958!
795
        pEpSet->inUse = pSyncNode->raftCfg.cfg.myIndex;
958✔
796
      } else {
797
        pEpSet->inUse = (pSyncNode->raftCfg.cfg.myIndex + 1) % pEpSet->numOfEps;
21,892✔
798
      }
799
    }
800
    // pEpSet->inUse = 0;
801
  }
802
  epsetSort(pEpSet);
37,564✔
803

804
  sInfo("vgId:%d, sync get retry epset numOfEps:%d inUse:%d", pSyncNode->vgId, pEpSet->numOfEps, pEpSet->inUse);
37,563!
805
  syncNodeRelease(pSyncNode);
37,564✔
806
}
807

808
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
10,383,905✔
809
  int32_t    code = 0;
10,383,905✔
810
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
10,383,905✔
811
  if (pSyncNode == NULL) {
10,384,235✔
812
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
1✔
813
    if (terrno != 0) code = terrno;
1!
814
    sError("sync propose error");
1!
815
    TAOS_RETURN(code);
1✔
816
  }
817

818
  int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak, seq);
10,384,234✔
819
  syncNodeRelease(pSyncNode);
10,383,633✔
820
  return ret;
10,384,012✔
821
}
822

823
int32_t syncCheckMember(int64_t rid) {
×
824
  int32_t    code = 0;
×
825
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
×
826
  if (pSyncNode == NULL) {
×
827
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
828
    if (terrno != 0) code = terrno;
×
829
    sError("sync propose error");
×
830
    TAOS_RETURN(code);
×
831
  }
832

833
  if (pSyncNode->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_LEARNER) {
×
834
    syncNodeRelease(pSyncNode);
×
835
    return TSDB_CODE_SYN_WRONG_ROLE;
×
836
  }
837

838
  syncNodeRelease(pSyncNode);
×
839
  return 0;
×
840
}
841

842
int32_t syncIsCatchUp(int64_t rid) {
5,804✔
843
  int32_t    code = 0;
5,804✔
844
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
5,804✔
845
  if (pSyncNode == NULL) {
5,804!
846
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
847
    if (terrno != 0) code = terrno;
×
848
    sError("sync Node Acquire error since %d", errno);
×
849
    TAOS_RETURN(code);
×
850
  }
851

852
  int32_t isCatchUp = 0;
5,804✔
853
  if (pSyncNode->pLogBuf->totalIndex < 0 || pSyncNode->pLogBuf->commitIndex < 0 ||
5,804!
854
      pSyncNode->pLogBuf->totalIndex < pSyncNode->pLogBuf->commitIndex ||
1,367!
855
      pSyncNode->pLogBuf->totalIndex - pSyncNode->pLogBuf->commitIndex > SYNC_LEARNER_CATCHUP) {
1,367✔
856
    sInfo("vgId:%d, Not catch up, wait one second, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64,
5,529!
857
          pSyncNode->vgId, pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex,
858
          pSyncNode->pLogBuf->matchIndex);
859
    isCatchUp = 0;
5,529✔
860
  } else {
861
    sInfo("vgId:%d, Catch up, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64, pSyncNode->vgId,
275!
862
          pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex, pSyncNode->pLogBuf->matchIndex);
863
    isCatchUp = 1;
275✔
864
  }
865

866
  syncNodeRelease(pSyncNode);
5,804✔
867
  return isCatchUp;
5,804✔
868
}
869

870
ESyncRole syncGetRole(int64_t rid) {
5,804✔
871
  int32_t    code = 0;
5,804✔
872
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
5,804✔
873
  if (pSyncNode == NULL) {
5,804!
874
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
875
    if (terrno != 0) code = terrno;
×
876
    sError("sync Node Acquire error since %d", errno);
×
877
    TAOS_RETURN(code);
×
878
  }
879

880
  ESyncRole role = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole;
5,804✔
881

882
  syncNodeRelease(pSyncNode);
5,804✔
883
  return role;
5,804✔
884
}
885

886
int64_t syncGetTerm(int64_t rid) {
5,889✔
887
  int32_t    code = 0;
5,889✔
888
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
5,889✔
889
  if (pSyncNode == NULL) {
5,889!
890
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
891
    if (terrno != 0) code = terrno;
×
892
    sError("sync Node Acquire error since %d", errno);
×
893
    TAOS_RETURN(code);
×
894
  }
895

896
  int64_t term = raftStoreGetTerm(pSyncNode);
5,889✔
897

898
  syncNodeRelease(pSyncNode);
5,889✔
899
  return term;
5,889✔
900
}
901

902
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
10,384,885✔
903
  int32_t code = 0;
10,384,885✔
904
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
10,384,885!
905
    code = TSDB_CODE_SYN_NOT_LEADER;
9,860✔
906
    sNWarn(pSyncNode, "sync propose not leader, type:%s", TMSG_INFO(pMsg->msgType));
9,860!
907
    TAOS_RETURN(code);
9,860✔
908
  }
909

910
  if (!pSyncNode->restoreFinish) {
10,375,025✔
911
    code = TSDB_CODE_SYN_PROPOSE_NOT_READY;
3✔
912
    sNWarn(pSyncNode, "failed to sync propose since not ready, type:%s, last:%" PRId64 ", cmt:%" PRId64,
3!
913
           TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
914
    TAOS_RETURN(code);
3✔
915
  }
916

917
  // heartbeat timeout
918
  if (pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER && syncNodeHeartbeatReplyTimeout(pSyncNode)) {
10,375,022!
919
    code = TSDB_CODE_SYN_PROPOSE_NOT_READY;
3✔
920
    sNError(pSyncNode, "failed to sync propose since heartbeat timeout, type:%s, last:%" PRId64 ", cmt:%" PRId64,
3!
921
            TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
922
    TAOS_RETURN(code);
3✔
923
  }
924

925
  // optimized one replica
926
  if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) {
10,374,871✔
927
    SyncIndex retIndex;
928
    int32_t   code = syncNodeOnClientRequest(pSyncNode, pMsg, &retIndex);
10,150,307✔
929
    if (code >= 0) {
10,149,755!
930
      pMsg->info.conn.applyIndex = retIndex;
10,149,867✔
931
      pMsg->info.conn.applyTerm = raftStoreGetTerm(pSyncNode);
10,149,867✔
932

933
      // after raft member change, need to handle 1->2 switching point
934
      // at this point, need to switch entry handling thread
935
      if (pSyncNode->replicaNum == 1) {
10,150,396✔
936
        sTrace("vgId:%d, propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
10,150,370!
937
               TMSG_INFO(pMsg->msgType));
938
        return 1;
10,150,292✔
939
      } else {
940
        sTrace("vgId:%d, propose optimized msg, return to normal, index:%" PRId64
26!
941
               " type:%s, "
942
               "handle:%p",
943
               pSyncNode->vgId, retIndex, TMSG_INFO(pMsg->msgType), pMsg->info.handle);
944
        return 0;
×
945
      }
946
    } else {
947
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
948
      sError("vgId:%d, failed to propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
×
949
             TMSG_INFO(pMsg->msgType));
950
      TAOS_RETURN(code);
×
951
    }
952
  } else {
953
    SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
225,032✔
954
    uint64_t  seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);
225,045✔
955
    SRpcMsg   rpcMsg = {0};
225,051✔
956
    int32_t   code = syncBuildClientRequest(&rpcMsg, pMsg, seqNum, isWeak, pSyncNode->vgId);
225,051✔
957
    if (code != 0) {
225,049!
958
      sError("vgId:%d, failed to propose msg while serialize since %s", pSyncNode->vgId, terrstr());
×
959
      code = syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
×
960
      TAOS_RETURN(code);
×
961
    }
962

963
    sNTrace(pSyncNode, "propose msg, type:%s", TMSG_INFO(pMsg->msgType));
225,049!
964
    code = (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg);
225,049✔
965
    if (code != 0) {
225,049✔
966
      sWarn("vgId:%d, failed to propose msg while enqueue since %s", pSyncNode->vgId, terrstr());
1,346!
967
      TAOS_CHECK_RETURN(syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum));
1,346✔
968
    }
969

970
    if (seq != NULL) *seq = seqNum;
224,993✔
971
    TAOS_RETURN(code);
224,993✔
972
  }
973
}
974

975
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId) {
232,539✔
976
  pSyncTimer->pTimer = NULL;
232,539✔
977
  pSyncTimer->counter = 0;
232,539✔
978
  pSyncTimer->timerMS = pSyncNode->hbBaseLine;
232,539✔
979
  pSyncTimer->timerCb = syncNodeEqPeerHeartbeatTimer;
232,539✔
980
  pSyncTimer->destId = destId;
232,539✔
981
  pSyncTimer->timeStamp = taosGetTimestampMs();
232,540✔
982
  atomic_store_64(&pSyncTimer->logicClock, 0);
232,540✔
983
  return 0;
232,539✔
984
}
985

986
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
2,394✔
987
  int32_t code = 0;
2,394✔
988
  int64_t tsNow = taosGetTimestampMs();
2,394✔
989
  if (syncIsInit()) {
2,394!
990
    SSyncHbTimerData* pData = syncHbTimerDataAcquire(pSyncTimer->hbDataRid);
2,394✔
991
    if (pData == NULL) {
2,394!
992
      pData = taosMemoryMalloc(sizeof(SSyncHbTimerData));
2,394!
993
      pData->rid = syncHbTimerDataAdd(pData);
2,394✔
994
    }
995
    pSyncTimer->hbDataRid = pData->rid;
2,394✔
996
    pSyncTimer->timeStamp = tsNow;
2,394✔
997

998
    pData->syncNodeRid = pSyncNode->rid;
2,394✔
999
    pData->pTimer = pSyncTimer;
2,394✔
1000
    pData->destId = pSyncTimer->destId;
2,394✔
1001
    pData->logicClock = pSyncTimer->logicClock;
2,394✔
1002
    pData->execTime = tsNow + pSyncTimer->timerMS;
2,394✔
1003

1004
    sTrace("vgId:%d, start hb timer, rid:%" PRId64 " addr:%" PRId64 " at %d", pSyncNode->vgId, pData->rid,
2,394!
1005
           pData->destId.addr, pSyncTimer->timerMS);
1006

1007
    bool stopped = taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS, (void*)(pData->rid), syncEnv()->pTimerManager,
2,394✔
1008
                                &pSyncTimer->pTimer);
2,394✔
1009
    if (stopped) {
2,394!
1010
      sError("vgId:%d, failed to reset hb timer success", pSyncNode->vgId);
×
1011
      return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1012
    }
1013
  } else {
1014
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1015
    sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId);
×
1016
  }
1017
  return code;
2,394✔
1018
}
1019

1020
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
24,819✔
1021
  int32_t ret = 0;
24,819✔
1022
  (void)atomic_add_fetch_64(&pSyncTimer->logicClock, 1);
24,819✔
1023
  bool stop = taosTmrStop(pSyncTimer->pTimer);
24,820✔
1024
  sDebug("vgId:%d, stop hb timer stop:%d", pSyncNode->vgId, stop);
24,819✔
1025
  pSyncTimer->pTimer = NULL;
24,819✔
1026
  syncHbTimerDataRemove(pSyncTimer->hbDataRid);
24,819✔
1027
  pSyncTimer->hbDataRid = -1;
24,820✔
1028
  return ret;
24,820✔
1029
}
1030

1031
int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) {
13,997✔
1032
  int32_t code = 0;
13,997✔
1033
  if (pNode->pLogStore == NULL) {
13,997!
1034
    sError("vgId:%d, log store not created", pNode->vgId);
×
1035
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1036
  }
1037
  if (pNode->pFsm == NULL) {
13,997!
1038
    sError("vgId:%d, pFsm not registered", pNode->vgId);
×
1039
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1040
  }
1041
  if (pNode->pFsm->FpGetSnapshotInfo == NULL) {
13,997!
1042
    sError("vgId:%d, FpGetSnapshotInfo not registered", pNode->vgId);
×
1043
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1044
  }
1045
  SSnapshot snapshot = {0};
13,997✔
1046
  // TODO check return value
1047
  (void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
13,997✔
1048

1049
  SyncIndex commitIndex = snapshot.lastApplyIndex;
13,997✔
1050
  SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
13,997✔
1051
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
13,997✔
1052
  if ((lastVer < commitIndex || firstVer > commitIndex + 1) || pNode->fsmState == SYNC_FSM_STATE_INCOMPLETE) {
13,997!
1053
    if ((code = pNode->pLogStore->syncLogRestoreFromSnapshot(pNode->pLogStore, commitIndex)) != 0) {
×
1054
      sError("vgId:%d, failed to restore log store from snapshot since %s. lastVer:%" PRId64 ", snapshotVer:%" PRId64,
×
1055
             pNode->vgId, terrstr(), lastVer, commitIndex);
1056
      TAOS_RETURN(code);
×
1057
    }
1058
  }
1059
  TAOS_RETURN(code);
13,997✔
1060
}
1061

1062
// open/close --------------
1063
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
13,994✔
1064
  int32_t    code = 0;
13,994✔
1065
  SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode));
13,994!
1066
  if (pSyncNode == NULL) {
13,996!
1067
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1068
    goto _error;
×
1069
  }
1070

1071
  if (!taosDirExist((char*)(pSyncInfo->path))) {
13,996✔
1072
    if (taosMkDir(pSyncInfo->path) != 0) {
11,218!
1073
      terrno = TAOS_SYSTEM_ERROR(errno);
×
1074
      sError("vgId:%d, failed to create dir:%s since %s", pSyncInfo->vgId, pSyncInfo->path, terrstr());
×
1075
      goto _error;
×
1076
    }
1077
  }
1078

1079
  memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
13,993✔
1080
  snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s%sraft_store.json", pSyncInfo->path,
13,993✔
1081
           TD_DIRSEP);
1082
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
13,993✔
1083

1084
  if (!taosCheckExistFile(pSyncNode->configPath)) {
13,993✔
1085
    // create a new raft config file
1086
    sInfo("vgId:%d, create a new raft config file", pSyncInfo->vgId);
11,213✔
1087
    pSyncNode->vgId = pSyncInfo->vgId;
11,216✔
1088
    pSyncNode->raftCfg.isStandBy = pSyncInfo->isStandBy;
11,216✔
1089
    pSyncNode->raftCfg.snapshotStrategy = pSyncInfo->snapshotStrategy;
11,216✔
1090
    pSyncNode->raftCfg.lastConfigIndex = pSyncInfo->syncCfg.lastIndex;
11,216✔
1091
    pSyncNode->raftCfg.batchSize = pSyncInfo->batchSize;
11,216✔
1092
    pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
11,216✔
1093
    pSyncNode->raftCfg.configIndexCount = 1;
11,216✔
1094
    pSyncNode->raftCfg.configIndexArr[0] = -1;
11,216✔
1095

1096
    if ((code = syncWriteCfgFile(pSyncNode)) != 0) {
11,216!
1097
      terrno = code;
×
1098
      sError("vgId:%d, failed to create sync cfg file", pSyncNode->vgId);
×
1099
      goto _error;
×
1100
    }
1101
  } else {
1102
    // update syncCfg by raft_config.json
1103
    if ((code = syncReadCfgFile(pSyncNode)) != 0) {
2,780!
1104
      terrno = code;
×
1105
      sError("vgId:%d, failed to read sync cfg file", pSyncNode->vgId);
×
1106
      goto _error;
×
1107
    }
1108

1109
    if (vnodeVersion > pSyncNode->raftCfg.cfg.changeVersion) {
2,779✔
1110
      if (pSyncInfo->syncCfg.totalReplicaNum > 0 && syncIsConfigChanged(&pSyncNode->raftCfg.cfg, &pSyncInfo->syncCfg)) {
1,749!
1111
        sInfo("vgId:%d, use sync config from input options and write to cfg file", pSyncNode->vgId);
1,449!
1112
        pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
1,449✔
1113
        if ((code = syncWriteCfgFile(pSyncNode)) != 0) {
1,449!
1114
          terrno = code;
×
1115
          sError("vgId:%d, failed to write sync cfg file", pSyncNode->vgId);
×
1116
          goto _error;
×
1117
        }
1118
      } else {
1119
        sInfo("vgId:%d, use sync config from sync cfg file", pSyncNode->vgId);
300!
1120
        pSyncInfo->syncCfg = pSyncNode->raftCfg.cfg;
300✔
1121
      }
1122
    } else {
1123
      sInfo("vgId:%d, skip save sync cfg file since request ver:%d <= file ver:%d", pSyncNode->vgId, vnodeVersion,
1,030!
1124
            pSyncInfo->syncCfg.changeVersion);
1125
    }
1126
  }
1127

1128
  // init by SSyncInfo
1129
  pSyncNode->vgId = pSyncInfo->vgId;
13,997✔
1130
  SSyncCfg* pCfg = &pSyncNode->raftCfg.cfg;
13,997✔
1131
  bool      updated = false;
13,997✔
1132
  sInfo("vgId:%d, start to open sync node, totalReplicaNum:%d replicaNum:%d selfIndex:%d", pSyncNode->vgId,
13,997✔
1133
        pCfg->totalReplicaNum, pCfg->replicaNum, pCfg->myIndex);
1134
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
34,822✔
1135
    SNodeInfo* pNode = &pCfg->nodeInfo[i];
20,825✔
1136
    if (tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort)) {
20,825!
1137
      updated = true;
×
1138
    }
1139
    sInfo("vgId:%d, index:%d ep:%s:%u dnode:%d cluster:%" PRId64, pSyncNode->vgId, i, pNode->nodeFqdn, pNode->nodePort,
20,824✔
1140
          pNode->nodeId, pNode->clusterId);
1141
  }
1142

1143
  if (vnodeVersion > pSyncInfo->syncCfg.changeVersion) {
13,997✔
1144
    if (updated) {
1,799!
1145
      sInfo("vgId:%d, save config info since dnode info changed", pSyncNode->vgId);
×
1146
      if ((code = syncWriteCfgFile(pSyncNode)) != 0) {
×
1147
        terrno = code;
×
1148
        sError("vgId:%d, failed to write sync cfg file on dnode info updated", pSyncNode->vgId);
×
1149
        goto _error;
×
1150
      }
1151
    }
1152
  }
1153

1154
  pSyncNode->pWal = pSyncInfo->pWal;
13,997✔
1155
  pSyncNode->msgcb = pSyncInfo->msgcb;
13,997✔
1156
  pSyncNode->syncSendMSg = pSyncInfo->syncSendMSg;
13,997✔
1157
  pSyncNode->syncEqMsg = pSyncInfo->syncEqMsg;
13,997✔
1158
  pSyncNode->syncEqCtrlMsg = pSyncInfo->syncEqCtrlMsg;
13,997✔
1159

1160
  // create raft log ring buffer
1161
  code = syncLogBufferCreate(&pSyncNode->pLogBuf);
13,997✔
1162
  if (pSyncNode->pLogBuf == NULL) {
13,995!
1163
    sError("failed to init sync log buffer since %s. vgId:%d", tstrerror(code), pSyncNode->vgId);
×
1164
    goto _error;
×
1165
  }
1166

1167
  // init replicaNum, replicasId
1168
  pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
13,995✔
1169
  pSyncNode->totalReplicaNum = pSyncNode->raftCfg.cfg.totalReplicaNum;
13,995✔
1170
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
34,820✔
1171
    if (syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]) ==
20,823!
1172
        false) {
1173
      sError("vgId:%d, failed to determine raft member id, replica:%d", pSyncNode->vgId, i);
×
1174
      goto _error;
×
1175
    }
1176
  }
1177

1178
  // init internal
1179
  pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
13,997✔
1180
  pSyncNode->myRaftId = pSyncNode->replicasId[pSyncNode->raftCfg.cfg.myIndex];
13,997✔
1181

1182
  // init peersNum, peers, peersId
1183
  pSyncNode->peersNum = pSyncNode->raftCfg.cfg.totalReplicaNum - 1;
13,997✔
1184
  int32_t j = 0;
13,997✔
1185
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
34,822✔
1186
    if (i != pSyncNode->raftCfg.cfg.myIndex) {
20,825✔
1187
      pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
6,828✔
1188
      pSyncNode->peersId[j] = pSyncNode->replicasId[i];
6,828✔
1189
      syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
6,828✔
1190
      j++;
6,828✔
1191
    }
1192
  }
1193

1194
  pSyncNode->arbTerm = -1;
13,997✔
1195
  (void)taosThreadMutexInit(&pSyncNode->arbTokenMutex, NULL);
13,997✔
1196
  syncUtilGenerateArbToken(pSyncNode->myNodeInfo.nodeId, pSyncInfo->vgId, pSyncNode->arbToken);
13,997✔
1197
  sInfo("vgId:%d, generate arb token:%s", pSyncNode->vgId, pSyncNode->arbToken);
13,997✔
1198

1199
  // init raft algorithm
1200
  pSyncNode->pFsm = pSyncInfo->pFsm;
13,997✔
1201
  pSyncInfo->pFsm = NULL;
13,997✔
1202
  pSyncNode->quorum = syncUtilQuorum(pSyncNode->raftCfg.cfg.replicaNum);
13,997✔
1203
  pSyncNode->leaderCache = EMPTY_RAFT_ID;
13,997✔
1204

1205
  // init life cycle outside
1206

1207
  // TLA+ Spec
1208
  // InitHistoryVars == /\ elections = {}
1209
  //                    /\ allLogs   = {}
1210
  //                    /\ voterLog  = [i \in Server |-> [j \in {} |-> <<>>]]
1211
  // InitServerVars == /\ currentTerm = [i \in Server |-> 1]
1212
  //                   /\ state       = [i \in Server |-> Follower]
1213
  //                   /\ votedFor    = [i \in Server |-> Nil]
1214
  // InitCandidateVars == /\ votesResponded = [i \in Server |-> {}]
1215
  //                      /\ votesGranted   = [i \in Server |-> {}]
1216
  // \* The values nextIndex[i][i] and matchIndex[i][i] are never read, since the
1217
  // \* leader does not send itself messages. It's still easier to include these
1218
  // \* in the functions.
1219
  // InitLeaderVars == /\ nextIndex  = [i \in Server |-> [j \in Server |-> 1]]
1220
  //                   /\ matchIndex = [i \in Server |-> [j \in Server |-> 0]]
1221
  // InitLogVars == /\ log          = [i \in Server |-> << >>]
1222
  //                /\ commitIndex  = [i \in Server |-> 0]
1223
  // Init == /\ messages = [m \in {} |-> 0]
1224
  //         /\ InitHistoryVars
1225
  //         /\ InitServerVars
1226
  //         /\ InitCandidateVars
1227
  //         /\ InitLeaderVars
1228
  //         /\ InitLogVars
1229
  //
1230

1231
  // init TLA+ server vars
1232
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
13,997✔
1233
  pSyncNode->roleTimeMs = taosGetTimestampMs();
13,997✔
1234
  if ((code = raftStoreOpen(pSyncNode)) != 0) {
13,997!
1235
    terrno = code;
×
1236
    sError("vgId:%d, failed to open raft store at path %s", pSyncNode->vgId, pSyncNode->raftStorePath);
×
1237
    goto _error;
×
1238
  }
1239

1240
  // init TLA+ candidate vars
1241
  pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode);
13,997✔
1242
  if (pSyncNode->pVotesGranted == NULL) {
13,997!
1243
    sError("vgId:%d, failed to create VotesGranted", pSyncNode->vgId);
×
1244
    goto _error;
×
1245
  }
1246
  pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode);
13,997✔
1247
  if (pSyncNode->pVotesRespond == NULL) {
13,997!
1248
    sError("vgId:%d, failed to create VotesRespond", pSyncNode->vgId);
×
1249
    goto _error;
×
1250
  }
1251

1252
  // init TLA+ leader vars
1253
  pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode);
13,997✔
1254
  if (pSyncNode->pNextIndex == NULL) {
13,997!
1255
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
×
1256
    goto _error;
×
1257
  }
1258
  pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode);
13,997✔
1259
  if (pSyncNode->pMatchIndex == NULL) {
13,997!
1260
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
×
1261
    goto _error;
×
1262
  }
1263

1264
  // init TLA+ log vars
1265
  pSyncNode->pLogStore = logStoreCreate(pSyncNode);
13,997✔
1266
  if (pSyncNode->pLogStore == NULL) {
13,997!
1267
    sError("vgId:%d, failed to create SyncLogStore", pSyncNode->vgId);
×
1268
    goto _error;
×
1269
  }
1270

1271
  SyncIndex commitIndex = SYNC_INDEX_INVALID;
13,997✔
1272
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
13,997!
1273
    SSnapshot snapshot = {0};
13,995✔
1274
    // TODO check return value
1275
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
13,995✔
1276
    if (snapshot.lastApplyIndex > commitIndex) {
13,996✔
1277
      commitIndex = snapshot.lastApplyIndex;
1,325✔
1278
      sNTrace(pSyncNode, "reset commit index by snapshot");
1,325!
1279
    }
1280
    pSyncNode->fsmState = snapshot.state;
13,996✔
1281
    if (pSyncNode->fsmState == SYNC_FSM_STATE_INCOMPLETE) {
13,996!
1282
      sError("vgId:%d, fsm state is incomplete.", pSyncNode->vgId);
×
1283
      if (pSyncNode->replicaNum == 1) {
×
1284
        terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1285
        goto _error;
×
1286
      }
1287
    }
1288
  }
1289
  pSyncNode->commitIndex = commitIndex;
13,998✔
1290
  sInfo("vgId:%d, sync node commitIndex initialized as %" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
13,998✔
1291

1292
  // restore log store on need
1293
  if ((code = syncNodeLogStoreRestoreOnNeed(pSyncNode)) < 0) {
13,999!
1294
    terrno = code;
×
1295
    sError("vgId:%d, failed to restore log store since %s.", pSyncNode->vgId, terrstr());
×
1296
    goto _error;
×
1297
  }
1298

1299
  // timer ms init
1300
  pSyncNode->pingBaseLine = PING_TIMER_MS;
13,997✔
1301
  pSyncNode->electBaseLine = tsElectInterval;
13,997✔
1302
  pSyncNode->hbBaseLine = tsHeartbeatInterval;
13,997✔
1303

1304
  // init ping timer
1305
  pSyncNode->pPingTimer = NULL;
13,997✔
1306
  pSyncNode->pingTimerMS = pSyncNode->pingBaseLine;
13,997✔
1307
  atomic_store_64(&pSyncNode->pingTimerLogicClock, 0);
13,997✔
1308
  atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0);
13,997✔
1309
  pSyncNode->FpPingTimerCB = syncNodeEqPingTimer;
13,997✔
1310
  pSyncNode->pingTimerCounter = 0;
13,997✔
1311

1312
  // init elect timer
1313
  pSyncNode->pElectTimer = NULL;
13,997✔
1314
  pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
13,997✔
1315
  atomic_store_64(&pSyncNode->electTimerLogicClock, 0);
13,997✔
1316
  pSyncNode->FpElectTimerCB = syncNodeEqElectTimer;
13,997✔
1317
  pSyncNode->electTimerCounter = 0;
13,997✔
1318

1319
  // init heartbeat timer
1320
  pSyncNode->pHeartbeatTimer = NULL;
13,997✔
1321
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
13,997✔
1322
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0);
13,997✔
1323
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0);
13,997✔
1324
#ifdef BUILD_NO_CALL
1325
  pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer;
1326
#endif
1327
  pSyncNode->heartbeatTimerCounter = 0;
13,996✔
1328

1329
  // init peer heartbeat timer
1330
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
223,946✔
1331
    if ((code = syncHbTimerInit(pSyncNode, &(pSyncNode->peerHeartbeatTimerArr[i]), (pSyncNode->replicasId)[i])) != 0) {
209,949!
1332
      errno = code;
×
1333
      goto _error;
×
1334
    }
1335
  }
1336

1337
  // tools
1338
  if ((code = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS, &pSyncNode->pSyncRespMgr)) != 0) {
13,997!
1339
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
×
1340
    goto _error;
×
1341
  }
1342
  if (pSyncNode->pSyncRespMgr == NULL) {
13,995!
1343
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
×
1344
    goto _error;
×
1345
  }
1346

1347
  // restore state
1348
  pSyncNode->restoreFinish = false;
13,995✔
1349

1350
  // snapshot senders
1351
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
223,842✔
1352
    SSyncSnapshotSender* pSender = NULL;
209,861✔
1353
    code = snapshotSenderCreate(pSyncNode, i, &pSender);
209,861✔
1354
    if (pSender == NULL) return NULL;
209,842!
1355

1356
    pSyncNode->senders[i] = pSender;
209,842✔
1357
    sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender);
209,842✔
1358
  }
1359

1360
  // snapshot receivers
1361
  code = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID, &pSyncNode->pNewNodeReceiver);
13,981✔
1362
  if (pSyncNode->pNewNodeReceiver == NULL) return NULL;
13,995!
1363
  sRDebug(pSyncNode->pNewNodeReceiver, "snapshot receiver create while open sync node, data:%p",
13,995✔
1364
          pSyncNode->pNewNodeReceiver);
1365

1366
  // is config changing
1367
  pSyncNode->changing = false;
13,995✔
1368

1369
  // replication mgr
1370
  if ((code = syncNodeLogReplInit(pSyncNode)) < 0) {
13,995!
1371
    terrno = code;
×
1372
    sError("vgId:%d, failed to init repl mgr since %s.", pSyncNode->vgId, terrstr());
×
1373
    goto _error;
×
1374
  }
1375

1376
  // peer state
1377
  if ((code = syncNodePeerStateInit(pSyncNode)) < 0) {
13,997!
1378
    terrno = code;
×
1379
    sError("vgId:%d, failed to init peer stat since %s.", pSyncNode->vgId, terrstr());
×
1380
    goto _error;
×
1381
  }
1382

1383
  //
1384
  // min match index
1385
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
13,996✔
1386

1387
  // start in syncNodeStart
1388
  // start raft
1389

1390
  int64_t timeNow = taosGetTimestampMs();
13,997✔
1391
  pSyncNode->startTime = timeNow;
13,997✔
1392
  pSyncNode->lastReplicateTime = timeNow;
13,997✔
1393

1394
  // snapshotting
1395
  atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
13,997✔
1396

1397
  // init log buffer
1398
  if ((code = syncLogBufferInit(pSyncNode->pLogBuf, pSyncNode)) < 0) {
13,997!
1399
    terrno = code;
×
1400
    sError("vgId:%d, failed to init sync log buffer since %s", pSyncNode->vgId, terrstr());
×
1401
    goto _error;
×
1402
  }
1403

1404
  pSyncNode->isStart = true;
13,997✔
1405
  pSyncNode->electNum = 0;
13,997✔
1406
  pSyncNode->becomeLeaderNum = 0;
13,997✔
1407
  pSyncNode->becomeAssignedLeaderNum = 0;
13,997✔
1408
  pSyncNode->configChangeNum = 0;
13,997✔
1409
  pSyncNode->hbSlowNum = 0;
13,997✔
1410
  pSyncNode->hbrSlowNum = 0;
13,997✔
1411
  pSyncNode->tmrRoutineNum = 0;
13,997✔
1412

1413
  sNInfo(pSyncNode, "sync node opened, node:%p electInterval:%d heartbeatInterval:%d heartbeatTimeout:%d", pSyncNode,
13,997✔
1414
         tsElectInterval, tsHeartbeatInterval, tsHeartbeatTimeout);
1415
  return pSyncNode;
13,997✔
1416

1417
_error:
×
1418
  if (pSyncInfo->pFsm) {
×
1419
    taosMemoryFree(pSyncInfo->pFsm);
×
1420
    pSyncInfo->pFsm = NULL;
×
1421
  }
1422
  syncNodeClose(pSyncNode);
×
1423
  pSyncNode = NULL;
×
1424
  return NULL;
×
1425
}
1426

1427
#ifdef BUILD_NO_CALL
1428
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
1429
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
1430
    SSnapshot snapshot = {0};
1431
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
1432
    if (snapshot.lastApplyIndex > pSyncNode->commitIndex) {
1433
      pSyncNode->commitIndex = snapshot.lastApplyIndex;
1434
    }
1435
  }
1436
}
1437
#endif
1438

1439
int32_t syncNodeRestore(SSyncNode* pSyncNode) {
13,996✔
1440
  int32_t code = 0;
13,996✔
1441
  if (pSyncNode->pLogStore == NULL) {
13,996!
1442
    sError("vgId:%d, log store not created", pSyncNode->vgId);
×
1443
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1444
  }
1445
  if (pSyncNode->pLogBuf == NULL) {
13,996!
1446
    sError("vgId:%d, ring log buffer not created", pSyncNode->vgId);
×
1447
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1448
  }
1449

1450
  (void)taosThreadMutexLock(&pSyncNode->pLogBuf->mutex);
13,996✔
1451
  SyncIndex lastVer = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
13,996✔
1452
  SyncIndex commitIndex = pSyncNode->pLogStore->syncLogCommitIndex(pSyncNode->pLogStore);
13,996✔
1453
  SyncIndex endIndex = pSyncNode->pLogBuf->endIndex;
13,996✔
1454
  (void)taosThreadMutexUnlock(&pSyncNode->pLogBuf->mutex);
13,996✔
1455

1456
  if (lastVer != -1 && endIndex != lastVer + 1) {
13,996!
1457
    code = TSDB_CODE_WAL_LOG_INCOMPLETE;
×
1458
    sWarn("vgId:%d, failed to restore sync node since %s. expected lastLogIndex:%" PRId64 ", lastVer:%" PRId64 "",
×
1459
          pSyncNode->vgId, terrstr(), endIndex - 1, lastVer);
1460
    // TAOS_RETURN(code);
1461
  }
1462

1463
  // if (endIndex != lastVer + 1) return TSDB_CODE_SYN_INTERNAL_ERROR;
1464
  pSyncNode->commitIndex = TMAX(pSyncNode->commitIndex, commitIndex);
13,996✔
1465
  sInfo("vgId:%d, restore began, and keep syncing until commitIndex:%" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
13,996✔
1466

1467
  if (pSyncNode->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
27,992!
1468
      (code = syncLogBufferCommit(pSyncNode->pLogBuf, pSyncNode, pSyncNode->commitIndex)) < 0) {
13,996✔
1469
    TAOS_RETURN(code);
×
1470
  }
1471

1472
  TAOS_RETURN(code);
13,996✔
1473
}
1474

1475
int32_t syncNodeStart(SSyncNode* pSyncNode) {
13,996✔
1476
  // start raft
1477
  sInfo("vgId:%d, begin to start sync node", pSyncNode->vgId);
13,996✔
1478
  if (pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole == TAOS_SYNC_ROLE_LEARNER) {
13,996✔
1479
    syncNodeBecomeLearner(pSyncNode, "first start");
277✔
1480
  } else {
1481
    if (pSyncNode->replicaNum == 1) {
13,719✔
1482
      raftStoreNextTerm(pSyncNode);
10,716✔
1483
      syncNodeBecomeLeader(pSyncNode, "one replica start");
10,716✔
1484

1485
      // Raft 3.6.2 Committing entries from previous terms
1486
      TAOS_CHECK_RETURN(syncNodeAppendNoop(pSyncNode));
10,716!
1487
    } else {
1488
      SRaftId id = {0};
3,003✔
1489
      syncNodeBecomeFollower(pSyncNode, id, "first start");
3,003✔
1490
    }
1491
  }
1492

1493
  int32_t ret = 0;
13,996✔
1494
  ret = syncNodeStartPingTimer(pSyncNode);
13,996✔
1495
  if (ret != 0) {
13,996!
1496
    sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, tstrerror(ret));
×
1497
  }
1498
  sInfo("vgId:%d, sync node started", pSyncNode->vgId);
13,996✔
1499
  return ret;
13,996✔
1500
}
1501

1502
#ifdef BUILD_NO_CALL
1503
int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) {
1504
  // state change
1505
  int32_t code = 0;
1506
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
1507
  pSyncNode->roleTimeMs = taosGetTimestampMs();
1508
  // TODO check return value
1509
  TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
1510

1511
  // reset elect timer, long enough
1512
  int32_t electMS = TIMER_MAX_MS;
1513
  code = syncNodeRestartElectTimer(pSyncNode, electMS);
1514
  if (code < 0) {
1515
    sError("vgId:%d, failed to restart elect timer since %s", pSyncNode->vgId, terrstr());
1516
    return -1;
1517
  }
1518

1519
  code = syncNodeStartPingTimer(pSyncNode);
1520
  if (code < 0) {
1521
    sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, terrstr());
1522
    return -1;
1523
  }
1524
  return code;
1525
}
1526
#endif
1527

1528
void syncNodePreClose(SSyncNode* pSyncNode) {
13,996✔
1529
  int32_t code = 0;
13,996✔
1530
  if (pSyncNode == NULL) {
13,996!
1531
    sError("failed to pre close sync node since sync node is null");
×
1532
    return;
×
1533
  }
1534
  if (pSyncNode->pFsm == NULL) {
13,996!
1535
    sError("failed to pre close sync node since fsm is null");
×
1536
    return;
×
1537
  }
1538
  if (pSyncNode->pFsm->FpApplyQueueItems == NULL) {
13,996!
1539
    sError("failed to pre close sync node since FpApplyQueueItems is null");
×
1540
    return;
×
1541
  }
1542

1543
  // stop elect timer
1544
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
13,996!
1545
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
1546
    return;
×
1547
  }
1548

1549
  // stop heartbeat timer
1550
  if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
13,994!
1551
    sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
1552
    return;
×
1553
  }
1554

1555
  // stop ping timer
1556
  if ((code = syncNodeStopPingTimer(pSyncNode)) != 0) {
13,995!
1557
    sError("vgId:%d, failed to stop ping timer since %s", pSyncNode->vgId, tstrerror(code));
×
1558
    return;
×
1559
  }
1560

1561
  // clean rsp
1562
  syncRespCleanRsp(pSyncNode->pSyncRespMgr);
13,996✔
1563
}
1564

1565
void syncNodePostClose(SSyncNode* pSyncNode) {
12,198✔
1566
  if (pSyncNode->pNewNodeReceiver != NULL) {
12,198!
1567
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
12,198!
1568
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
1569
    }
1570

1571
    sDebug("vgId:%d, snapshot receiver destroy while preclose sync node, data:%p", pSyncNode->vgId,
12,197✔
1572
           pSyncNode->pNewNodeReceiver);
1573
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
12,197✔
1574
    pSyncNode->pNewNodeReceiver = NULL;
12,198✔
1575
  }
1576
}
12,198✔
1577

1578
void syncHbTimerDataFree(SSyncHbTimerData* pData) { taosMemoryFree(pData); }
2,389!
1579

1580
void syncNodeClose(SSyncNode* pSyncNode) {
13,996✔
1581
  int32_t code = 0;
13,996✔
1582
  if (pSyncNode == NULL) return;
13,996!
1583
  sNInfo(pSyncNode, "sync close, node:%p", pSyncNode);
13,996✔
1584

1585
  syncRespCleanRsp(pSyncNode->pSyncRespMgr);
13,996✔
1586

1587
  if ((code = syncNodeStopPingTimer(pSyncNode)) != 0) {
13,996!
1588
    sError("vgId:%d, failed to stop ping timer since %s", pSyncNode->vgId, tstrerror(code));
×
1589
    return;
×
1590
  }
1591
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
13,996!
1592
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
1593
    return;
×
1594
  }
1595
  if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
13,996!
1596
    sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
1597
    return;
×
1598
  }
1599
  syncNodeLogReplDestroy(pSyncNode);
13,996✔
1600

1601
  syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
13,996✔
1602
  pSyncNode->pSyncRespMgr = NULL;
13,996✔
1603
  voteGrantedDestroy(pSyncNode->pVotesGranted);
13,996✔
1604
  pSyncNode->pVotesGranted = NULL;
13,996✔
1605
  votesRespondDestory(pSyncNode->pVotesRespond);
13,996✔
1606
  pSyncNode->pVotesRespond = NULL;
13,995✔
1607
  syncIndexMgrDestroy(pSyncNode->pNextIndex);
13,995✔
1608
  pSyncNode->pNextIndex = NULL;
13,995✔
1609
  syncIndexMgrDestroy(pSyncNode->pMatchIndex);
13,995✔
1610
  pSyncNode->pMatchIndex = NULL;
13,995✔
1611
  logStoreDestory(pSyncNode->pLogStore);
13,995✔
1612
  pSyncNode->pLogStore = NULL;
13,995✔
1613
  syncLogBufferDestroy(pSyncNode->pLogBuf);
13,995✔
1614
  pSyncNode->pLogBuf = NULL;
13,996✔
1615

1616
  (void)taosThreadMutexDestroy(&pSyncNode->arbTokenMutex);
13,996✔
1617

1618
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
223,888✔
1619
    if (pSyncNode->senders[i] != NULL) {
209,892✔
1620
      sDebug("vgId:%d, snapshot sender destroy while close, data:%p", pSyncNode->vgId, pSyncNode->senders[i]);
209,888✔
1621

1622
      if (snapshotSenderIsStart(pSyncNode->senders[i])) {
209,889!
1623
        snapshotSenderStop(pSyncNode->senders[i], false);
×
1624
      }
1625

1626
      snapshotSenderDestroy(pSyncNode->senders[i]);
209,902✔
1627
      pSyncNode->senders[i] = NULL;
209,915✔
1628
    }
1629
  }
1630

1631
  if (pSyncNode->pNewNodeReceiver != NULL) {
13,996✔
1632
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
1,798!
1633
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
1634
    }
1635

1636
    sDebug("vgId:%d, snapshot receiver destroy while close, data:%p", pSyncNode->vgId, pSyncNode->pNewNodeReceiver);
1,798✔
1637
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
1,798✔
1638
    pSyncNode->pNewNodeReceiver = NULL;
1,798✔
1639
  }
1640

1641
  if (pSyncNode->pFsm != NULL) {
13,996!
1642
    taosMemoryFree(pSyncNode->pFsm);
13,996!
1643
  }
1644

1645
  raftStoreClose(pSyncNode);
13,996✔
1646

1647
  taosMemoryFree(pSyncNode);
13,996!
1648
}
1649

1650
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->raftCfg.snapshotStrategy; }
×
1651

1652
// timer control --------------
1653
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
13,996✔
1654
  int32_t code = 0;
13,996✔
1655
  if (syncIsInit()) {
13,996!
1656
    bool stopped = taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, (void*)pSyncNode->rid,
13,996✔
1657
                                syncEnv()->pTimerManager, &pSyncNode->pPingTimer);
13,996✔
1658
    if (stopped) {
13,996!
1659
      sError("vgId:%d, failed to reset ping timer, ms:%d", pSyncNode->vgId, pSyncNode->pingTimerMS);
×
1660
      return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1661
    }
1662
    atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
13,996✔
1663
  } else {
1664
    sError("vgId:%d, start ping timer error, sync env is stop", pSyncNode->vgId);
×
1665
  }
1666
  return code;
13,996✔
1667
}
1668

1669
int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
27,990✔
1670
  int32_t code = 0;
27,990✔
1671
  (void)atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
27,990✔
1672
  bool stop = taosTmrStop(pSyncNode->pPingTimer);
27,992✔
1673
  sDebug("vgId:%d, stop ping timer, stop:%d", pSyncNode->vgId, stop);
27,992✔
1674
  pSyncNode->pPingTimer = NULL;
27,992✔
1675
  return code;
27,992✔
1676
}
1677

1678
int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
535,340✔
1679
  int32_t code = 0;
535,340✔
1680
  if (syncIsInit()) {
535,340!
1681
    pSyncNode->electTimerMS = ms;
535,339✔
1682

1683
    int64_t execTime = taosGetTimestampMs() + ms;
535,339✔
1684
    atomic_store_64(&(pSyncNode->electTimerParam.executeTime), execTime);
535,339✔
1685
    atomic_store_64(&(pSyncNode->electTimerParam.logicClock), pSyncNode->electTimerLogicClock);
535,340✔
1686
    pSyncNode->electTimerParam.pSyncNode = pSyncNode;
535,340✔
1687
    pSyncNode->electTimerParam.pData = NULL;
535,340✔
1688

1689
    bool stopped = taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, (void*)(pSyncNode->rid),
535,340✔
1690
                                syncEnv()->pTimerManager, &pSyncNode->pElectTimer);
535,340✔
1691
    if (stopped) sError("vgId:%d, failed to reset elect timer, ms:%d", pSyncNode->vgId, ms);
535,339!
1692
  } else {
1693
    sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
×
1694
  }
1695
  return code;
535,339✔
1696
}
1697

1698
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
575,034✔
1699
  int32_t code = 0;
575,034✔
1700
  (void)atomic_add_fetch_64(&pSyncNode->electTimerLogicClock, 1);
575,034✔
1701
  bool stop = taosTmrStop(pSyncNode->pElectTimer);
575,034✔
1702
  sDebug("vgId:%d, stop elect timer, stop:%d", pSyncNode->vgId, stop);
575,033✔
1703
  pSyncNode->pElectTimer = NULL;
575,033✔
1704

1705
  return code;
575,033✔
1706
}
1707

1708
int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
535,340✔
1709
  int32_t ret = 0;
535,340✔
1710
  TAOS_CHECK_RETURN(syncNodeStopElectTimer(pSyncNode));
535,340!
1711
  TAOS_CHECK_RETURN(syncNodeStartElectTimer(pSyncNode, ms));
535,340!
1712
  return ret;
535,339✔
1713
}
1714

1715
void syncNodeResetElectTimer(SSyncNode* pSyncNode) {
535,337✔
1716
  int32_t code = 0;
535,337✔
1717
  int32_t electMS;
1718

1719
  if (pSyncNode->raftCfg.isStandBy) {
535,337!
1720
    electMS = TIMER_MAX_MS;
×
1721
  } else {
1722
    electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
535,337✔
1723
  }
1724

1725
  if ((code = syncNodeRestartElectTimer(pSyncNode, electMS)) != 0) {
535,340!
1726
    sWarn("vgId:%d, failed to restart elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
1727
    return;
×
1728
  };
1729

1730
  sNTrace(pSyncNode, "reset elect timer, min:%d, max:%d, ms:%d", pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine,
535,339!
1731
          electMS);
1732
}
1733

1734
#ifdef BUILD_NO_CALL
1735
static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) {
1736
  int32_t code = 0;
1737
  if (syncIsInit()) {
1738
    TAOS_CHECK_RETURN(taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, (void*)pSyncNode->rid,
1739
                                   syncEnv()->pTimerManager, &pSyncNode->pHeartbeatTimer));
1740
    atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
1741
  } else {
1742
    sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId);
1743
  }
1744

1745
  sNTrace(pSyncNode, "start heartbeat timer, ms:%d", pSyncNode->heartbeatTimerMS);
1746
  return code;
1747
}
1748
#endif
1749

1750
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
13,209✔
1751
  int32_t ret = 0;
13,209✔
1752

1753
#if 0
1754
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
1755
  ret = syncNodeDoStartHeartbeatTimer(pSyncNode);
1756
#endif
1757

1758
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
15,603✔
1759
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
2,394✔
1760
    if (pSyncTimer != NULL) {
2,394!
1761
      TAOS_CHECK_RETURN(syncHbTimerStart(pSyncNode, pSyncTimer));
2,394!
1762
    }
1763
  }
1764

1765
  return ret;
13,209✔
1766
}
1767

1768
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
34,592✔
1769
  int32_t code = 0;
34,592✔
1770

1771
#if 0
1772
  TAOS_CHECK_RETURN(atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1));
1773
  bool stop = taosTmrStop(pSyncNode->pHeartbeatTimer);
1774
  sDebug("vgId:%d, stop heartbeat timer, stop:%d", pSyncNode->vgId, stop);
1775
  pSyncNode->pHeartbeatTimer = NULL;
1776
#endif
1777

1778
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
59,412✔
1779
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
24,819✔
1780
    if (pSyncTimer != NULL) {
24,820!
1781
      TAOS_CHECK_RETURN(syncHbTimerStop(pSyncNode, pSyncTimer));
24,820!
1782
    }
1783
  }
1784

1785
  return code;
34,593✔
1786
}
1787

1788
#ifdef BUILD_NO_CALL
1789
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) {
1790
  // TODO check return value
1791
  int32_t code = 0;
1792
  TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
1793
  TAOS_CHECK_RETURN(syncNodeStartHeartbeatTimer(pSyncNode));
1794
  return 0;
1795
}
1796
#endif
1797

1798
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pNode, SRpcMsg* pMsg) {
6,818,923✔
1799
  SEpSet* epSet = NULL;
6,818,923✔
1800
  for (int32_t i = 0; i < pNode->peersNum; ++i) {
7,176,690✔
1801
    if (destRaftId->addr == pNode->peersId[i].addr) {
7,176,638✔
1802
      epSet = &pNode->peersEpset[i];
6,818,871✔
1803
      break;
6,818,871✔
1804
    }
1805
  }
1806

1807
  int32_t code = -1;
6,818,923✔
1808
  if (pNode->syncSendMSg != NULL && epSet != NULL) {
6,818,923!
1809
    syncUtilMsgHtoN(pMsg->pCont);
6,818,872✔
1810
    pMsg->info.noResp = 1;
6,818,851✔
1811
    code = pNode->syncSendMSg(epSet, pMsg);
6,818,851✔
1812
  }
1813

1814
  if (code < 0) {
6,818,954✔
1815
    sError("vgId:%d, failed to send sync msg since %s. epset:%p dnode:%d addr:%" PRId64, pNode->vgId, tstrerror(code),
76!
1816
           epSet, DID(destRaftId), destRaftId->addr);
1817
    rpcFreeCont(pMsg->pCont);
76✔
1818
  }
1819

1820
  TAOS_RETURN(code);
6,818,954✔
1821
}
1822

1823
inline bool syncNodeInConfig(SSyncNode* pNode, const SSyncCfg* pCfg) {
2,311✔
1824
  bool b1 = false;
2,311✔
1825
  bool b2 = false;
2,311✔
1826

1827
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
2,788!
1828
    if (strcmp(pCfg->nodeInfo[i].nodeFqdn, pNode->myNodeInfo.nodeFqdn) == 0 &&
2,788!
1829
        pCfg->nodeInfo[i].nodePort == pNode->myNodeInfo.nodePort) {
2,788✔
1830
      b1 = true;
2,311✔
1831
      break;
2,311✔
1832
    }
1833
  }
1834

1835
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
2,788!
1836
    SRaftId raftId = {
2,788✔
1837
        .addr = SYNC_ADDR(&pCfg->nodeInfo[i]),
2,788✔
1838
        .vgId = pNode->vgId,
2,788✔
1839
    };
1840

1841
    if (syncUtilSameId(&raftId, &pNode->myRaftId)) {
2,788✔
1842
      b2 = true;
2,311✔
1843
      break;
2,311✔
1844
    }
1845
  }
1846

1847
  if (b1 != b2) {
2,311!
1848
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1849
    return false;
×
1850
  }
1851
  return b1;
2,311✔
1852
}
1853

1854
static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg) {
3,152✔
1855
  if (pOldCfg->totalReplicaNum != pNewCfg->totalReplicaNum) return true;
3,152✔
1856
  if (pOldCfg->myIndex != pNewCfg->myIndex) return true;
2,138✔
1857
  for (int32_t i = 0; i < pOldCfg->totalReplicaNum; ++i) {
5,104✔
1858
    const SNodeInfo* pOldInfo = &pOldCfg->nodeInfo[i];
3,705✔
1859
    const SNodeInfo* pNewInfo = &pNewCfg->nodeInfo[i];
3,705✔
1860
    if (strcmp(pOldInfo->nodeFqdn, pNewInfo->nodeFqdn) != 0) return true;
3,705!
1861
    if (pOldInfo->nodePort != pNewInfo->nodePort) return true;
3,705✔
1862
    if (pOldInfo->nodeRole != pNewInfo->nodeRole) return true;
3,703✔
1863
  }
1864

1865
  return false;
1,399✔
1866
}
1867

1868
int32_t syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) {
1,703✔
1869
  int32_t  code = 0;
1,703✔
1870
  SSyncCfg oldConfig = pSyncNode->raftCfg.cfg;
1,703✔
1871
  if (!syncIsConfigChanged(&oldConfig, pNewConfig)) {
1,703✔
1872
    sInfo("vgId:1, sync not reconfig since not changed");
1,399✔
1873
    return 0;
1,399✔
1874
  }
1875

1876
  pSyncNode->raftCfg.cfg = *pNewConfig;
304✔
1877
  pSyncNode->raftCfg.lastConfigIndex = lastConfigChangeIndex;
304✔
1878

1879
  pSyncNode->configChangeNum++;
304✔
1880

1881
  bool IamInOld = syncNodeInConfig(pSyncNode, &oldConfig);
304✔
1882
  bool IamInNew = syncNodeInConfig(pSyncNode, pNewConfig);
304✔
1883

1884
  bool isDrop = false;
304✔
1885
  bool isAdd = false;
304✔
1886

1887
  if (IamInOld && !IamInNew) {
304!
1888
    isDrop = true;
×
1889
  } else {
1890
    isDrop = false;
304✔
1891
  }
1892

1893
  if (!IamInOld && IamInNew) {
304!
1894
    isAdd = true;
×
1895
  } else {
1896
    isAdd = false;
304✔
1897
  }
1898

1899
  // log begin config change
1900
  sNInfo(pSyncNode, "begin do config change, from %d to %d, from %" PRId64 " to %" PRId64 ", replicas:%d",
304!
1901
         pSyncNode->vgId, oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum, oldConfig.lastIndex,
1902
         pNewConfig->lastIndex);
1903

1904
  if (IamInNew) {
304!
1905
    pSyncNode->raftCfg.isStandBy = 0;  // change isStandBy to normal
304✔
1906
  }
1907
  if (isDrop) {
304!
1908
    pSyncNode->raftCfg.isStandBy = 1;  // set standby
×
1909
  }
1910

1911
  // add last config index
1912
  SRaftCfg* pCfg = &pSyncNode->raftCfg;
304✔
1913
  if (pCfg->configIndexCount >= MAX_CONFIG_INDEX_COUNT) {
304!
1914
    sNError(pSyncNode, "failed to add cfg index:%d since out of range", pCfg->configIndexCount);
×
1915
    terrno = TSDB_CODE_OUT_OF_RANGE;
×
1916
    return -1;
×
1917
  }
1918

1919
  pCfg->configIndexArr[pCfg->configIndexCount] = lastConfigChangeIndex;
304✔
1920
  pCfg->configIndexCount++;
304✔
1921

1922
  if (IamInNew) {
304!
1923
    //-----------------------------------------
1924
    int32_t ret = 0;
304✔
1925

1926
    // save snapshot senders
1927
    SRaftId oldReplicasId[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
1928
    memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId));
304✔
1929
    SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
1930
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
4,864✔
1931
      oldSenders[i] = pSyncNode->senders[i];
4,560✔
1932
      sSTrace(oldSenders[i], "snapshot sender save old");
4,560!
1933
    }
1934

1935
    // init internal
1936
    pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
304✔
1937
    if (syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId) == false) return terrno;
304!
1938

1939
    // init peersNum, peers, peersId
1940
    pSyncNode->peersNum = pSyncNode->raftCfg.cfg.totalReplicaNum - 1;
304✔
1941
    int32_t j = 0;
304✔
1942
    for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
1,102✔
1943
      if (i != pSyncNode->raftCfg.cfg.myIndex) {
798✔
1944
        pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
494✔
1945
        syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
494✔
1946
        j++;
494✔
1947
      }
1948
    }
1949
    for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
798✔
1950
      if (syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]) == false)
494!
1951
        return terrno;
×
1952
    }
1953

1954
    // init replicaNum, replicasId
1955
    pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
304✔
1956
    pSyncNode->totalReplicaNum = pSyncNode->raftCfg.cfg.totalReplicaNum;
304✔
1957
    for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
1,102✔
1958
      if (syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]) ==
798!
1959
          false)
1960
        return terrno;
×
1961
    }
1962

1963
    // update quorum first
1964
    pSyncNode->quorum = syncUtilQuorum(pSyncNode->raftCfg.cfg.replicaNum);
304✔
1965

1966
    syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode);
304✔
1967
    syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode);
304✔
1968
    voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode);
304✔
1969
    votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode);
304✔
1970

1971
    // reset snapshot senders
1972

1973
    // clear new
1974
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
4,864✔
1975
      pSyncNode->senders[i] = NULL;
4,560✔
1976
    }
1977

1978
    // reset new
1979
    for (int32_t i = 0; i < pSyncNode->totalReplicaNum; ++i) {
1,102✔
1980
      // reset sender
1981
      bool reset = false;
798✔
1982
      for (int32_t j = 0; j < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++j) {
3,402✔
1983
        if (syncUtilSameId(&(pSyncNode->replicasId)[i], &oldReplicasId[j]) && oldSenders[j] != NULL) {
3,259!
1984
          sNTrace(pSyncNode, "snapshot sender reset for:%" PRId64 ", newIndex:%d, dnode:%d, %p",
655!
1985
                  (pSyncNode->replicasId)[i].addr, i, DID(&pSyncNode->replicasId[i]), oldSenders[j]);
1986

1987
          pSyncNode->senders[i] = oldSenders[j];
655✔
1988
          oldSenders[j] = NULL;
655✔
1989
          reset = true;
655✔
1990

1991
          // reset replicaIndex
1992
          int32_t oldreplicaIndex = pSyncNode->senders[i]->replicaIndex;
655✔
1993
          pSyncNode->senders[i]->replicaIndex = i;
655✔
1994

1995
          sNTrace(pSyncNode, "snapshot sender udpate replicaIndex from %d to %d, dnode:%d, %p, reset:%d",
655!
1996
                  oldreplicaIndex, i, DID(&pSyncNode->replicasId[i]), pSyncNode->senders[i], reset);
1997

1998
          break;
655✔
1999
        }
2000
      }
2001
    }
2002

2003
    // create new
2004
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
4,864✔
2005
      if (pSyncNode->senders[i] == NULL) {
4,560✔
2006
        TAOS_CHECK_RETURN(snapshotSenderCreate(pSyncNode, i, &pSyncNode->senders[i]));
3,905!
2007
        if (pSyncNode->senders[i] == NULL) {
3,905!
2008
          // will be created later while send snapshot
2009
          sSError(pSyncNode->senders[i], "snapshot sender create failed while reconfig");
×
2010
        } else {
2011
          sSDebug(pSyncNode->senders[i], "snapshot sender create while reconfig, data:%p", pSyncNode->senders[i]);
3,905!
2012
        }
2013
      } else {
2014
        sSDebug(pSyncNode->senders[i], "snapshot sender already exist, data:%p", pSyncNode->senders[i]);
655!
2015
      }
2016
    }
2017

2018
    // free old
2019
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
4,864✔
2020
      if (oldSenders[i] != NULL) {
4,560✔
2021
        sSDebug(oldSenders[i], "snapshot sender destroy old, data:%p replica-index:%d", oldSenders[i], i);
3,905!
2022
        snapshotSenderDestroy(oldSenders[i]);
3,905✔
2023
        oldSenders[i] = NULL;
3,905✔
2024
      }
2025
    }
2026

2027
    // persist cfg
2028
    TAOS_CHECK_RETURN(syncWriteCfgFile(pSyncNode));
304!
2029
  } else {
2030
    // persist cfg
2031
    TAOS_CHECK_RETURN(syncWriteCfgFile(pSyncNode));
×
2032
    sNInfo(pSyncNode, "do not config change from %d to %d", oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum);
×
2033
  }
2034

2035
_END:
×
2036
  // log end config change
2037
  sNInfo(pSyncNode, "end do config change, from %d to %d", oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum);
304!
2038
  return 0;
304✔
2039
}
2040

2041
// raft state change --------------
2042
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
86,526✔
2043
  if (term > raftStoreGetTerm(pSyncNode)) {
86,526!
2044
    raftStoreSetTerm(pSyncNode, term);
×
2045
  }
2046
}
86,526✔
2047

2048
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm, SRaftId id) {
405,527✔
2049
  SyncTerm currentTerm = raftStoreGetTerm(pSyncNode);
405,527✔
2050
  if (currentTerm > newTerm) {
405,525!
2051
    sNTrace(pSyncNode, "step down, ignore, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
×
2052
    return;
×
2053
  }
2054

2055
  do {
2056
    sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
405,525!
2057
  } while (0);
2058

2059
  if (pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
405,525!
2060
    (void)taosThreadMutexLock(&pSyncNode->arbTokenMutex);
×
2061
    syncUtilGenerateArbToken(pSyncNode->myNodeInfo.nodeId, pSyncNode->vgId, pSyncNode->arbToken);
×
2062
    sInfo("vgId:%d, step down as assigned leader, new arbToken:%s", pSyncNode->vgId, pSyncNode->arbToken);
×
2063
    (void)taosThreadMutexUnlock(&pSyncNode->arbTokenMutex);
×
2064
  }
2065

2066
  if (currentTerm < newTerm) {
405,521✔
2067
    raftStoreSetTerm(pSyncNode, newTerm);
2,083✔
2068
    char tmpBuf[64];
2069
    snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm);
2,083✔
2070
    syncNodeBecomeFollower(pSyncNode, id, tmpBuf);
2,083✔
2071
    raftStoreClearVote(pSyncNode);
2,083✔
2072
  } else {
2073
    if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
403,438✔
2074
      syncNodeBecomeFollower(pSyncNode, id, "step down");
4✔
2075
    }
2076
  }
2077
}
2078

2079
void syncNodeLeaderChangeRsp(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); }
5,095✔
2080

2081
void syncNodeBecomeFollower(SSyncNode* pSyncNode, SRaftId leaderId, const char* debugStr) {
5,095✔
2082
  int32_t code = 0;  // maybe clear leader cache
5,095✔
2083
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
5,095✔
2084
    pSyncNode->leaderCache = EMPTY_RAFT_ID;
10✔
2085
  }
2086

2087
  pSyncNode->hbSlowNum = 0;
5,095✔
2088

2089
  pSyncNode->leaderCache = leaderId;  // state change
5,095✔
2090
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
5,095✔
2091
  pSyncNode->roleTimeMs = taosGetTimestampMs();
5,095✔
2092
  if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
5,095!
2093
    sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
2094
    return;
×
2095
  }
2096

2097
  // trace log
2098
  sNTrace(pSyncNode, "become follower %s", debugStr);
5,095!
2099

2100
  // send rsp to client
2101
  syncNodeLeaderChangeRsp(pSyncNode);
5,095✔
2102

2103
  // call back
2104
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeFollowerCb != NULL) {
5,095!
2105
    pSyncNode->pFsm->FpBecomeFollowerCb(pSyncNode->pFsm);
5,095✔
2106
  }
2107

2108
  // min match index
2109
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
5,095✔
2110

2111
  // reset log buffer
2112
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
5,095!
2113
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2114
    return;
×
2115
  }
2116

2117
  // reset elect timer
2118
  syncNodeResetElectTimer(pSyncNode);
5,095✔
2119

2120
  sInfo("vgId:%d, become follower. %s", pSyncNode->vgId, debugStr);
5,095!
2121
}
2122

2123
void syncNodeBecomeLearner(SSyncNode* pSyncNode, const char* debugStr) {
277✔
2124
  pSyncNode->hbSlowNum = 0;
277✔
2125

2126
  // state change
2127
  pSyncNode->state = TAOS_SYNC_STATE_LEARNER;
277✔
2128
  pSyncNode->roleTimeMs = taosGetTimestampMs();
277✔
2129

2130
  // trace log
2131
  sNTrace(pSyncNode, "become learner %s", debugStr);
277!
2132

2133
  // call back
2134
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLearnerCb != NULL) {
277!
2135
    pSyncNode->pFsm->FpBecomeLearnerCb(pSyncNode->pFsm);
277✔
2136
  }
2137

2138
  // min match index
2139
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
277✔
2140

2141
  // reset log buffer
2142
  int32_t code = 0;
277✔
2143
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
277!
2144
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2145
    return;
×
2146
  };
2147
}
2148

2149
// TLA+ Spec
2150
// \* Candidate i transitions to leader.
2151
// BecomeLeader(i) ==
2152
//     /\ state[i] = Candidate
2153
//     /\ votesGranted[i] \in Quorum
2154
//     /\ state'      = [state EXCEPT ![i] = Leader]
2155
//     /\ nextIndex'  = [nextIndex EXCEPT ![i] =
2156
//                          [j \in Server |-> Len(log[i]) + 1]]
2157
//     /\ matchIndex' = [matchIndex EXCEPT ![i] =
2158
//                          [j \in Server |-> 0]]
2159
//     /\ elections'  = elections \cup
2160
//                          {[eterm     |-> currentTerm[i],
2161
//                            eleader   |-> i,
2162
//                            elog      |-> log[i],
2163
//                            evotes    |-> votesGranted[i],
2164
//                            evoterLog |-> voterLog[i]]}
2165
//     /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
2166
//
2167
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
11,703✔
2168
  int32_t code = 0;
11,703✔
2169
  pSyncNode->becomeLeaderNum++;
11,703✔
2170
  pSyncNode->hbrSlowNum = 0;
11,703✔
2171

2172
  // reset restoreFinish
2173
  pSyncNode->restoreFinish = false;
11,703✔
2174

2175
  // state change
2176
  pSyncNode->state = TAOS_SYNC_STATE_LEADER;
11,703✔
2177
  pSyncNode->roleTimeMs = taosGetTimestampMs();
11,703✔
2178

2179
  // set leader cache
2180
  pSyncNode->leaderCache = pSyncNode->myRaftId;
11,703✔
2181

2182
  for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
25,331✔
2183
    SyncIndex lastIndex;
2184
    SyncTerm  lastTerm;
2185
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
13,628✔
2186
    if (code != 0) {
13,628!
2187
      sError("vgId:%d, failed to become leader since %s", pSyncNode->vgId, tstrerror(code));
×
2188
      return;
×
2189
    }
2190
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
13,628✔
2191
  }
2192

2193
  for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
25,331✔
2194
    // maybe overwrite myself, no harm
2195
    // just do it!
2196
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
13,628✔
2197
  }
2198

2199
  // init peer mgr
2200
  if ((code = syncNodePeerStateInit(pSyncNode)) != 0) {
11,703!
2201
    sError("vgId:%d, failed to init peer state since %s", pSyncNode->vgId, tstrerror(code));
×
2202
    return;
×
2203
  }
2204

2205
#if 0
2206
  // update sender private term
2207
  SSyncSnapshotSender* pMySender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->myRaftId));
2208
  if (pMySender != NULL) {
2209
    for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
2210
      if (pSyncNode->senders[i]->privateTerm > pMySender->privateTerm) {
2211
        pMySender->privateTerm = pSyncNode->senders[i]->privateTerm;
2212
      }
2213
    }
2214
    (pMySender->privateTerm) += 100;
2215
  }
2216
#endif
2217

2218
  // close receiver
2219
  if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
11,703!
2220
    snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
2221
  }
2222

2223
  // stop elect timer
2224
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
11,703!
2225
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
2226
    return;
×
2227
  }
2228

2229
  // start heartbeat timer
2230
  if ((code = syncNodeStartHeartbeatTimer(pSyncNode)) != 0) {
11,703!
2231
    sError("vgId:%d, failed to start heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
2232
    return;
×
2233
  }
2234

2235
  // send heartbeat right now
2236
  if ((code = syncNodeHeartbeatPeers(pSyncNode)) != 0) {
11,703!
2237
    sError("vgId:%d, failed to send heartbeat to peers since %s", pSyncNode->vgId, tstrerror(code));
×
2238
    return;
×
2239
  }
2240

2241
  // call back
2242
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLeaderCb != NULL) {
11,703!
2243
    pSyncNode->pFsm->FpBecomeLeaderCb(pSyncNode->pFsm);
11,703✔
2244
  }
2245

2246
  // min match index
2247
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
11,703✔
2248

2249
  // reset log buffer
2250
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
11,703!
2251
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2252
    return;
×
2253
  }
2254

2255
  // trace log
2256
  sNInfo(pSyncNode, "become leader %s", debugStr);
11,703✔
2257
}
2258

2259
void syncNodeBecomeAssignedLeader(SSyncNode* pSyncNode) {
×
2260
  int32_t code = 0;
×
2261
  pSyncNode->becomeAssignedLeaderNum++;
×
2262
  pSyncNode->hbrSlowNum = 0;
×
2263

2264
  // reset restoreFinish
2265
  // pSyncNode->restoreFinish = false;
2266

2267
  // state change
2268
  pSyncNode->state = TAOS_SYNC_STATE_ASSIGNED_LEADER;
×
2269
  pSyncNode->roleTimeMs = taosGetTimestampMs();
×
2270

2271
  // set leader cache
2272
  pSyncNode->leaderCache = pSyncNode->myRaftId;
×
2273

2274
  for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
×
2275
    SyncIndex lastIndex;
2276
    SyncTerm  lastTerm;
2277
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
×
2278
    if (code != 0) {
×
2279
      sError("vgId:%d, failed to become assigned leader since %s", pSyncNode->vgId, tstrerror(code));
×
2280
      return;
×
2281
    }
2282
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
×
2283
  }
2284

2285
  for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
×
2286
    // maybe overwrite myself, no harm
2287
    // just do it!
2288
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
×
2289
  }
2290

2291
  // init peer mgr
2292
  if ((code = syncNodePeerStateInit(pSyncNode)) != 0) {
×
2293
    sError("vgId:%d, failed to init peer state since %s", pSyncNode->vgId, tstrerror(code));
×
2294
    return;
×
2295
  }
2296

2297
  // close receiver
2298
  if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
×
2299
    snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
2300
  }
2301

2302
  // stop elect timer
2303
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
×
2304
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
2305
    return;
×
2306
  }
2307

2308
  // start heartbeat timer
2309
  if ((code = syncNodeStartHeartbeatTimer(pSyncNode)) != 0) {
×
2310
    sError("vgId:%d, failed to start heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
2311
    return;
×
2312
  }
2313

2314
  // send heartbeat right now
2315
  if ((code = syncNodeHeartbeatPeers(pSyncNode)) != 0) {
×
2316
    sError("vgId:%d, failed to send heartbeat to peers since %s", pSyncNode->vgId, tstrerror(code));
×
2317
    return;
×
2318
  }
2319

2320
  // call back
2321
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeAssignedLeaderCb != NULL) {
×
2322
    pSyncNode->pFsm->FpBecomeAssignedLeaderCb(pSyncNode->pFsm);
×
2323
  }
2324

2325
  // min match index
2326
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
×
2327

2328
  // reset log buffer
2329
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
×
2330
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2331
    return;
×
2332
  }
2333

2334
  // trace log
2335
  sNInfo(pSyncNode, "become assigned leader");
×
2336
}
2337

2338
void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
987✔
2339
  if (pSyncNode->state != TAOS_SYNC_STATE_CANDIDATE) {
987!
2340
    sError("vgId:%d, failed leader from candidate since node state is wrong:%d", pSyncNode->vgId, pSyncNode->state);
×
2341
    return;
×
2342
  }
2343
  bool granted = voteGrantedMajority(pSyncNode->pVotesGranted);
987✔
2344
  if (!granted) {
987!
2345
    sError("vgId:%d, not granted by majority.", pSyncNode->vgId);
×
2346
    return;
×
2347
  }
2348
  syncNodeBecomeLeader(pSyncNode, "candidate to leader");
987✔
2349

2350
  sNTrace(pSyncNode, "state change syncNodeCandidate2Leader");
987!
2351

2352
  int32_t ret = syncNodeAppendNoop(pSyncNode);
987✔
2353
  if (ret < 0) {
987!
2354
    sError("vgId:%d, failed to append noop entry since %s", pSyncNode->vgId, terrstr());
×
2355
  }
2356

2357
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
987✔
2358

2359
  sInfo("vgId:%d, become leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64 "", pSyncNode->vgId,
987!
2360
        raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
2361
}
2362

2363
bool syncNodeIsMnode(SSyncNode* pSyncNode) { return (pSyncNode->vgId == 1); }
457,275✔
2364

2365
int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) {
25,693✔
2366
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
411,154✔
2367
    pSyncNode->peerStates[i].lastSendIndex = SYNC_INDEX_INVALID;
385,461✔
2368
    pSyncNode->peerStates[i].lastSendTime = 0;
385,461✔
2369
  }
2370

2371
  return 0;
25,693✔
2372
}
2373

2374
void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
1,167✔
2375
  if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
1,167!
2376
    sError("vgId:%d, failed candidate from follower since node state is wrong:%d", pSyncNode->vgId, pSyncNode->state);
×
2377
    return;
×
2378
  }
2379
  pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
1,167✔
2380
  pSyncNode->roleTimeMs = taosGetTimestampMs();
1,167✔
2381
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
1,167✔
2382
  sInfo("vgId:%d, become candidate from follower. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
1,167!
2383
        pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
2384

2385
  sNTrace(pSyncNode, "follower to candidate");
1,167!
2386
}
2387

2388
int32_t syncNodeAssignedLeader2Leader(SSyncNode* pSyncNode) {
×
2389
  if (pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) return TSDB_CODE_SYN_INTERNAL_ERROR;
×
2390
  syncNodeBecomeLeader(pSyncNode, "assigned leader to leader");
×
2391

2392
  sNTrace(pSyncNode, "assigned leader to leader");
×
2393

2394
  int32_t ret = syncNodeAppendNoop(pSyncNode);
×
2395
  if (ret < 0) {
×
2396
    sError("vgId:%d, failed to append noop entry since %s", pSyncNode->vgId, tstrerror(ret));
×
2397
  }
2398

2399
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
×
2400
  sInfo("vgId:%d, become leader from assigned leader. term:%" PRId64 ", commit index:%" PRId64
×
2401
        "assigned commit index:%" PRId64 ", last index:%" PRId64,
2402
        pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, pSyncNode->assignedCommitIndex,
2403
        lastIndex);
2404
  return 0;
×
2405
}
2406

2407
// just called by syncNodeVoteForSelf
2408
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
1,258✔
2409
  SyncTerm storeTerm = raftStoreGetTerm(pSyncNode);
1,258✔
2410
  if (term != storeTerm) {
1,258!
2411
    sError("vgId:%d, failed to vote for term, term:%" PRId64 ", storeTerm:%" PRId64, pSyncNode->vgId, term, storeTerm);
×
2412
    return;
×
2413
  }
2414
  bool voted = raftStoreHasVoted(pSyncNode);
1,258✔
2415
  if (voted) {
1,258!
2416
    sError("vgId:%d, failed to vote for term since not voted", pSyncNode->vgId);
×
2417
    return;
×
2418
  }
2419

2420
  raftStoreVote(pSyncNode, pRaftId);
1,258✔
2421
}
2422

2423
// simulate get vote from outside
2424
void syncNodeVoteForSelf(SSyncNode* pSyncNode, SyncTerm currentTerm) {
1,258✔
2425
  syncNodeVoteForTerm(pSyncNode, currentTerm, &pSyncNode->myRaftId);
1,258✔
2426

2427
  SRpcMsg rpcMsg = {0};
1,258✔
2428
  int32_t ret = syncBuildRequestVoteReply(&rpcMsg, pSyncNode->vgId);
1,258✔
2429
  if (ret != 0) return;
1,258!
2430

2431
  SyncRequestVoteReply* pMsg = rpcMsg.pCont;
1,258✔
2432
  pMsg->srcId = pSyncNode->myRaftId;
1,258✔
2433
  pMsg->destId = pSyncNode->myRaftId;
1,258✔
2434
  pMsg->term = currentTerm;
1,258✔
2435
  pMsg->voteGranted = true;
1,258✔
2436

2437
  voteGrantedVote(pSyncNode->pVotesGranted, pMsg);
1,258✔
2438
  votesRespondAdd(pSyncNode->pVotesRespond, pMsg);
1,258✔
2439
  rpcFreeCont(rpcMsg.pCont);
1,258✔
2440
}
2441

2442
// return if has a snapshot
2443
bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
18,088✔
2444
  bool      ret = false;
18,088✔
2445
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
18,088✔
2446
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
18,088!
2447
    // TODO check return value
2448
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
18,088✔
2449
    if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) {
18,088✔
2450
      ret = true;
2,252✔
2451
    }
2452
  }
2453
  return ret;
18,088✔
2454
}
2455

2456
// return max(logLastIndex, snapshotLastIndex)
2457
// if no snapshot and log, return -1
2458
SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode) {
18,094✔
2459
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
18,094✔
2460
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
18,094!
2461
    // TODO check return value
2462
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
18,094✔
2463
  }
2464
  SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
18,094✔
2465

2466
  SyncIndex lastIndex = logLastIndex > snapshot.lastApplyIndex ? logLastIndex : snapshot.lastApplyIndex;
18,094✔
2467
  return lastIndex;
18,094✔
2468
}
2469

2470
// return the last term of snapshot and log
2471
// if error, return SYNC_TERM_INVALID (by syncLogLastTerm)
2472
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) {
18,088✔
2473
  SyncTerm lastTerm = 0;
18,088✔
2474
  if (syncNodeHasSnapshot(pSyncNode)) {
18,088✔
2475
    // has snapshot
2476
    SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
2,252✔
2477
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
2,252!
2478
      // TODO check return value
2479
      (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
2,252✔
2480
    }
2481

2482
    SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
2,252✔
2483
    if (logLastIndex > snapshot.lastApplyIndex) {
2,252✔
2484
      lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
1,373✔
2485
    } else {
2486
      lastTerm = snapshot.lastApplyTerm;
879✔
2487
    }
2488

2489
  } else {
2490
    // no snapshot
2491
    lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
15,836✔
2492
  }
2493

2494
  return lastTerm;
18,088✔
2495
}
2496

2497
// get last index and term along with snapshot
2498
int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm) {
15,982✔
2499
  *pLastIndex = syncNodeGetLastIndex(pSyncNode);
15,982✔
2500
  *pLastTerm = syncNodeGetLastTerm(pSyncNode);
15,982✔
2501
  return 0;
15,982✔
2502
}
2503

2504
#ifdef BUILD_NO_CALL
2505
// return append-entries first try index
2506
SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) {
2507
  SyncIndex syncStartIndex = syncNodeGetLastIndex(pSyncNode) + 1;
2508
  return syncStartIndex;
2509
}
2510

2511
// if index > 0, return index - 1
2512
// else, return -1
2513
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
2514
  SyncIndex preIndex = index - 1;
2515
  if (preIndex < SYNC_INDEX_INVALID) {
2516
    preIndex = SYNC_INDEX_INVALID;
2517
  }
2518

2519
  return preIndex;
2520
}
2521

2522
// if index < 0, return SYNC_TERM_INVALID
2523
// if index == 0, return 0
2524
// if index > 0, return preTerm
2525
// if error, return SYNC_TERM_INVALID
2526
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
2527
  if (index < SYNC_INDEX_BEGIN) {
2528
    return SYNC_TERM_INVALID;
2529
  }
2530

2531
  if (index == SYNC_INDEX_BEGIN) {
2532
    return 0;
2533
  }
2534

2535
  SyncTerm  preTerm = 0;
2536
  SyncIndex preIndex = index - 1;
2537

2538
  SSyncRaftEntry* pPreEntry = NULL;
2539
  SLRUCache*      pCache = pSyncNode->pLogStore->pCache;
2540
  LRUHandle*      h = taosLRUCacheLookup(pCache, &preIndex, sizeof(preIndex));
2541
  int32_t         code = 0;
2542
  if (h) {
2543
    pPreEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
2544
    code = 0;
2545

2546
    pSyncNode->pLogStore->cacheHit++;
2547
    sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", preIndex, pPreEntry->bytes, pPreEntry);
2548

2549
  } else {
2550
    pSyncNode->pLogStore->cacheMiss++;
2551
    sNTrace(pSyncNode, "miss cache index:%" PRId64, preIndex);
2552

2553
    code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry);
2554
  }
2555

2556
  SSnapshot snapshot = {.data = NULL,
2557
                        .lastApplyIndex = SYNC_INDEX_INVALID,
2558
                        .lastApplyTerm = SYNC_TERM_INVALID,
2559
                        .lastConfigIndex = SYNC_INDEX_INVALID};
2560

2561
  if (code == 0) {
2562
    if (pPreEntry == NULL) return -1;
2563
    preTerm = pPreEntry->term;
2564

2565
    if (h) {
2566
      taosLRUCacheRelease(pCache, h, false);
2567
    } else {
2568
      syncEntryDestroy(pPreEntry);
2569
    }
2570

2571
    return preTerm;
2572
  } else {
2573
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
2574
      // TODO check return value
2575
      (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
2576
      if (snapshot.lastApplyIndex == preIndex) {
2577
        return snapshot.lastApplyTerm;
2578
      }
2579
    }
2580
  }
2581

2582
  sNError(pSyncNode, "sync node get pre term error, index:%" PRId64 ", snap-index:%" PRId64 ", snap-term:%" PRId64,
2583
          index, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
2584
  return SYNC_TERM_INVALID;
2585
}
2586

2587
// get pre index and term of "index"
2588
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm) {
2589
  *pPreIndex = syncNodeGetPreIndex(pSyncNode, index);
2590
  *pPreTerm = syncNodeGetPreTerm(pSyncNode, index);
2591
  return 0;
2592
}
2593
#endif
2594

2595
static void syncNodeEqPingTimer(void* param, void* tmrId) {
212,483✔
2596
  if (!syncIsInit()) return;
212,483!
2597

2598
  int64_t    rid = (int64_t)param;
212,483✔
2599
  SSyncNode* pNode = syncNodeAcquire(rid);
212,483✔
2600

2601
  if (pNode == NULL) return;
212,483!
2602

2603
  if (atomic_load_64(&pNode->pingTimerLogicClockUser) <= atomic_load_64(&pNode->pingTimerLogicClock)) {
212,483!
2604
    SRpcMsg rpcMsg = {0};
212,483✔
2605
    int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock),
212,483✔
2606
                                    pNode->pingTimerMS, pNode);
2607
    if (code != 0) {
212,483!
2608
      sError("failed to build ping msg");
×
2609
      rpcFreeCont(rpcMsg.pCont);
×
2610
      goto _out;
×
2611
    }
2612

2613
    // sTrace("enqueue ping msg");
2614
    code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
212,483✔
2615
    if (code != 0) {
212,483!
2616
      sError("failed to sync enqueue ping msg since %s", terrstr());
×
2617
      rpcFreeCont(rpcMsg.pCont);
×
2618
      goto _out;
×
2619
    }
2620

2621
  _out:
212,483✔
2622
    if (taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager,
212,483!
2623
                     &pNode->pPingTimer))
2624
      sError("failed to reset ping timer");
×
2625
  }
2626
  syncNodeRelease(pNode);
212,483✔
2627
}
2628

2629
static void syncNodeEqElectTimer(void* param, void* tmrId) {
1,261✔
2630
  if (!syncIsInit()) return;
1,264!
2631

2632
  int64_t    rid = (int64_t)param;
1,261✔
2633
  SSyncNode* pNode = syncNodeAcquire(rid);
1,261✔
2634

2635
  if (pNode == NULL) return;
1,261✔
2636

2637
  if (pNode->syncEqMsg == NULL) {
1,259!
2638
    syncNodeRelease(pNode);
×
2639
    return;
×
2640
  }
2641

2642
  int64_t tsNow = taosGetTimestampMs();
1,259✔
2643
  if (tsNow < pNode->electTimerParam.executeTime) {
1,259✔
2644
    syncNodeRelease(pNode);
1✔
2645
    return;
1✔
2646
  }
2647

2648
  SRpcMsg rpcMsg = {0};
1,258✔
2649
  int32_t code =
2650
      syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pNode->electTimerParam.logicClock, pNode->electTimerMS, pNode);
1,258✔
2651

2652
  if (code != 0) {
1,258!
2653
    sError("failed to build elect msg");
×
2654
    syncNodeRelease(pNode);
×
2655
    return;
×
2656
  }
2657

2658
  SyncTimeout* pTimeout = rpcMsg.pCont;
1,258✔
2659
  sNTrace(pNode, "enqueue elect msg lc:%" PRId64, pTimeout->logicClock);
1,258!
2660

2661
  code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
1,258✔
2662
  if (code != 0) {
1,258!
2663
    sError("failed to sync enqueue elect msg since %s", terrstr());
×
2664
    rpcFreeCont(rpcMsg.pCont);
×
2665
    syncNodeRelease(pNode);
×
2666
    return;
×
2667
  }
2668

2669
  syncNodeRelease(pNode);
1,258✔
2670
}
2671

2672
#ifdef BUILD_NO_CALL
2673
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
2674
  if (!syncIsInit()) return;
2675

2676
  int64_t    rid = (int64_t)param;
2677
  SSyncNode* pNode = syncNodeAcquire(rid);
2678

2679
  if (pNode == NULL) return;
2680

2681
  if (pNode->totalReplicaNum > 1) {
2682
    if (atomic_load_64(&pNode->heartbeatTimerLogicClockUser) <= atomic_load_64(&pNode->heartbeatTimerLogicClock)) {
2683
      SRpcMsg rpcMsg = {0};
2684
      int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pNode->heartbeatTimerLogicClock),
2685
                                      pNode->heartbeatTimerMS, pNode);
2686

2687
      if (code != 0) {
2688
        sError("failed to build heartbeat msg");
2689
        goto _out;
2690
      }
2691

2692
      sTrace("vgId:%d, enqueue heartbeat timer", pNode->vgId);
2693
      code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
2694
      if (code != 0) {
2695
        sError("failed to enqueue heartbeat msg since %s", terrstr());
2696
        rpcFreeCont(rpcMsg.pCont);
2697
        goto _out;
2698
      }
2699

2700
    _out:
2701
      if (taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager,
2702
                       &pNode->pHeartbeatTimer) != 0)
2703
        return;
2704

2705
    } else {
2706
      sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRId64 ", heartbeatTimerLogicClockUser:%" PRId64,
2707
             pNode->heartbeatTimerLogicClock, pNode->heartbeatTimerLogicClockUser);
2708
    }
2709
  }
2710
}
2711
#endif
2712

2713
static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
40,318✔
2714
  int32_t code = 0;
40,318✔
2715
  int64_t hbDataRid = (int64_t)param;
40,318✔
2716
  int64_t tsNow = taosGetTimestampMs();
40,318✔
2717

2718
  SSyncHbTimerData* pData = syncHbTimerDataAcquire(hbDataRid);
40,318✔
2719
  if (pData == NULL) {
40,318!
2720
    sError("hb timer get pData NULL, %" PRId64, hbDataRid);
×
2721
    return;
×
2722
  }
2723

2724
  SSyncNode* pSyncNode = syncNodeAcquire(pData->syncNodeRid);
40,318✔
2725
  if (pSyncNode == NULL) {
40,318✔
2726
    syncHbTimerDataRelease(pData);
4✔
2727
    sError("hb timer get pSyncNode NULL");
4!
2728
    return;
4✔
2729
  }
2730

2731
  SSyncTimer* pSyncTimer = pData->pTimer;
40,314✔
2732

2733
  if (!pSyncNode->isStart) {
40,314!
2734
    syncNodeRelease(pSyncNode);
×
2735
    syncHbTimerDataRelease(pData);
×
2736
    sError("vgId:%d, hb timer sync node already stop", pSyncNode->vgId);
×
2737
    return;
×
2738
  }
2739

2740
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
40,314!
2741
    syncNodeRelease(pSyncNode);
×
2742
    syncHbTimerDataRelease(pData);
×
2743
    sError("vgId:%d, hb timer sync node not leader", pSyncNode->vgId);
×
2744
    return;
×
2745
  }
2746

2747
  sTrace("vgId:%d, peer hb timer execution, rid:%" PRId64 " addr:%" PRId64, pSyncNode->vgId, hbDataRid,
40,314!
2748
         pData->destId.addr);
2749

2750
  if (pSyncNode->totalReplicaNum > 1) {
40,314✔
2751
    int64_t timerLogicClock = atomic_load_64(&pSyncTimer->logicClock);
40,312✔
2752
    int64_t msgLogicClock = atomic_load_64(&pData->logicClock);
40,312✔
2753

2754
    if (timerLogicClock == msgLogicClock) {
40,312✔
2755
      if (tsNow > pData->execTime) {
40,308✔
2756
        pData->execTime += pSyncTimer->timerMS;
40,237✔
2757

2758
        SRpcMsg rpcMsg = {0};
40,237✔
2759
        if ((code = syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId)) != 0) {
40,237!
2760
          sError("vgId:%d, failed to build heartbeat msg since %s", pSyncNode->vgId, tstrerror(code));
×
2761
          syncNodeRelease(pSyncNode);
×
2762
          syncHbTimerDataRelease(pData);
×
2763
          return;
×
2764
        }
2765

2766
        pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode);
40,237✔
2767

2768
        SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
40,237✔
2769
        pSyncMsg->srcId = pSyncNode->myRaftId;
40,237✔
2770
        pSyncMsg->destId = pData->destId;
40,237✔
2771
        pSyncMsg->term = raftStoreGetTerm(pSyncNode);
40,237✔
2772
        pSyncMsg->commitIndex = pSyncNode->commitIndex;
40,237✔
2773
        pSyncMsg->minMatchIndex = pSyncNode->minMatchIndex;
40,237✔
2774
        pSyncMsg->privateTerm = 0;
40,237✔
2775
        pSyncMsg->timeStamp = tsNow;
40,237✔
2776

2777
        // update reset time
2778
        int64_t timerElapsed = tsNow - pSyncTimer->timeStamp;
40,237✔
2779
        pSyncTimer->timeStamp = tsNow;
40,237✔
2780

2781
        // send msg
2782
        TRACE_SET_MSGID(&(rpcMsg.info.traceId), tGenIdPI64());
40,237✔
2783
        STraceId* trace = &(rpcMsg.info.traceId);
40,237✔
2784
        sGTrace("vgId:%d, send sync-heartbeat to dnode:%d", pSyncNode->vgId, DID(&(pSyncMsg->destId)));
40,237!
2785
        syncLogSendHeartbeat(pSyncNode, pSyncMsg, false, timerElapsed, pData->execTime);
40,237✔
2786
        int ret = syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
40,237✔
2787
        if (ret != 0) {
40,237✔
2788
          sError("vgId:%d, failed to send heartbeat since %s", pSyncNode->vgId, tstrerror(ret));
76!
2789
        }
2790
      }
2791

2792
      if (syncIsInit()) {
40,308!
2793
        sTrace("vgId:%d, reset peer hb timer at %d", pSyncNode->vgId, pSyncTimer->timerMS);
40,308!
2794
        bool stopped = taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS, (void*)hbDataRid,
40,308✔
2795
                                    syncEnv()->pTimerManager, &pSyncTimer->pTimer);
40,308✔
2796
        if (stopped) sError("vgId:%d, reset peer hb timer error, %s", pSyncNode->vgId, tstrerror(code));
40,308!
2797

2798
      } else {
2799
        sError("sync env is stop, reset peer hb timer error");
×
2800
      }
2801

2802
    } else {
2803
      sTrace("vgId:%d, do not send hb, timerLogicClock:%" PRId64 ", msgLogicClock:%" PRId64 "", pSyncNode->vgId,
4!
2804
             timerLogicClock, msgLogicClock);
2805
    }
2806
  }
2807

2808
  syncHbTimerDataRelease(pData);
40,314✔
2809
  syncNodeRelease(pSyncNode);
40,314✔
2810
}
2811

2812
#ifdef BUILD_NO_CALL
2813
static void deleteCacheEntry(const void* key, size_t keyLen, void* value, void* ud) {
2814
  (void)ud;
2815
  taosMemoryFree(value);
2816
}
2817

2818
int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) {
2819
  SSyncLogStoreData* pData = pLogStore->data;
2820
  sNTrace(pData->pSyncNode, "in cache index:%" PRId64 ", bytes:%u, %p", pEntry->index, pEntry->bytes, pEntry);
2821

2822
  int32_t   code = 0;
2823
  int32_t   entryLen = sizeof(*pEntry) + pEntry->dataLen;
2824
  LRUStatus status = taosLRUCacheInsert(pLogStore->pCache, &pEntry->index, sizeof(pEntry->index), pEntry, entryLen,
2825
                                        deleteCacheEntry, h, TAOS_LRU_PRIORITY_LOW, NULL);
2826
  if (status != TAOS_LRU_STATUS_OK) {
2827
    code = -1;
2828
  }
2829

2830
  return code;
2831
}
2832
#endif
2833

2834
void syncBuildConfigFromReq(SAlterVnodeReplicaReq* pReq, SSyncCfg* cfg) {  // TODO SAlterVnodeReplicaReq name is proper?
×
2835
  cfg->replicaNum = 0;
×
2836
  cfg->totalReplicaNum = 0;
×
2837
  int32_t code = 0;
×
2838

2839
  for (int i = 0; i < pReq->replica; ++i) {
×
2840
    SNodeInfo* pNode = &cfg->nodeInfo[i];
×
2841
    pNode->nodeId = pReq->replicas[i].id;
×
2842
    pNode->nodePort = pReq->replicas[i].port;
×
2843
    tstrncpy(pNode->nodeFqdn, pReq->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
×
2844
    pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
×
2845
    bool update = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
×
2846
    sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d, update:%d", pReq->vgId, i, pNode->nodeFqdn,
×
2847
          pNode->nodePort, pNode->nodeId, pNode->nodeRole, update);
2848
    cfg->replicaNum++;
×
2849
  }
2850
  if (pReq->selfIndex != -1) {
×
2851
    cfg->myIndex = pReq->selfIndex;
×
2852
  }
2853
  for (int i = cfg->replicaNum; i < pReq->replica + pReq->learnerReplica; ++i) {
×
2854
    SNodeInfo* pNode = &cfg->nodeInfo[i];
×
2855
    pNode->nodeId = pReq->learnerReplicas[cfg->totalReplicaNum].id;
×
2856
    pNode->nodePort = pReq->learnerReplicas[cfg->totalReplicaNum].port;
×
2857
    pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
2858
    tstrncpy(pNode->nodeFqdn, pReq->learnerReplicas[cfg->totalReplicaNum].fqdn, sizeof(pNode->nodeFqdn));
×
2859
    bool update = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
×
2860
    sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d, update:%d", pReq->vgId, i, pNode->nodeFqdn,
×
2861
          pNode->nodePort, pNode->nodeId, pNode->nodeRole, update);
2862
    cfg->totalReplicaNum++;
×
2863
  }
2864
  cfg->totalReplicaNum += pReq->replica;
×
2865
  if (pReq->learnerSelfIndex != -1) {
×
2866
    cfg->myIndex = pReq->replica + pReq->learnerSelfIndex;
×
2867
  }
2868
  cfg->changeVersion = pReq->changeVersion;
×
2869
}
×
2870

2871
int32_t syncNodeCheckChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry) {
×
2872
  int32_t code = 0;
×
2873
  if (pEntry->originalRpcType != TDMT_SYNC_CONFIG_CHANGE) {
×
2874
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
2875
  }
2876

2877
  SMsgHead* head = (SMsgHead*)pEntry->data;
×
2878
  void*     pReq = POINTER_SHIFT(head, sizeof(SMsgHead));
×
2879

2880
  SAlterVnodeTypeReq req = {0};
×
2881
  if (tDeserializeSAlterVnodeReplicaReq(pReq, head->contLen, &req) != 0) {
×
2882
    code = TSDB_CODE_INVALID_MSG;
×
2883
    TAOS_RETURN(code);
×
2884
  }
2885

2886
  SSyncCfg cfg = {0};
×
2887
  syncBuildConfigFromReq(&req, &cfg);
×
2888

2889
  if (cfg.totalReplicaNum >= 1 &&
×
2890
      (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER)) {
×
2891
    bool incfg = false;
×
2892
    for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
×
2893
      if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
×
2894
          ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
×
2895
        incfg = true;
×
2896
        break;
×
2897
      }
2898
    }
2899

2900
    if (!incfg) {
×
2901
      SyncTerm currentTerm = raftStoreGetTerm(ths);
×
2902
      SRaftId  id = EMPTY_RAFT_ID;
×
2903
      syncNodeStepDown(ths, currentTerm, id);
×
2904
      return 1;
×
2905
    }
2906
  }
2907
  return 0;
×
2908
}
2909

2910
void syncNodeLogConfigInfo(SSyncNode* ths, SSyncCfg* cfg, char* str) {
×
2911
  sInfo("vgId:%d, %s. SyncNode, replicaNum:%d, peersNum:%d, lastConfigIndex:%" PRId64
×
2912
        ", changeVersion:%d, "
2913
        "restoreFinish:%d",
2914
        ths->vgId, str, ths->replicaNum, ths->peersNum, ths->raftCfg.lastConfigIndex, ths->raftCfg.cfg.changeVersion,
2915
        ths->restoreFinish);
2916

2917
  sInfo("vgId:%d, %s, myNodeInfo, clusterId:%" PRId64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str,
×
2918
        ths->myNodeInfo.clusterId, ths->myNodeInfo.nodeId, ths->myNodeInfo.nodeFqdn, ths->myNodeInfo.nodePort,
2919
        ths->myNodeInfo.nodeRole);
2920

2921
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
2922
    sInfo("vgId:%d, %s, peersNodeInfo%d, clusterId:%" PRId64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str,
×
2923
          i, ths->peersNodeInfo[i].clusterId, ths->peersNodeInfo[i].nodeId, ths->peersNodeInfo[i].nodeFqdn,
2924
          ths->peersNodeInfo[i].nodePort, ths->peersNodeInfo[i].nodeRole);
2925
  }
2926

2927
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
2928
    char    buf[256];
2929
    int32_t len = 256;
×
2930
    int32_t n = 0;
×
2931
    n += tsnprintf(buf + n, len - n, "%s", "{");
×
2932
    for (int i = 0; i < ths->peersEpset->numOfEps; i++) {
×
2933
      n += tsnprintf(buf + n, len - n, "%s:%d%s", ths->peersEpset->eps[i].fqdn, ths->peersEpset->eps[i].port,
×
2934
                     (i + 1 < ths->peersEpset->numOfEps ? ", " : ""));
×
2935
    }
2936
    n += tsnprintf(buf + n, len - n, "%s", "}");
×
2937

2938
    sInfo("vgId:%d, %s, peersEpset%d, %s, inUse:%d", ths->vgId, str, i, buf, ths->peersEpset->inUse);
×
2939
  }
2940

2941
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
2942
    sInfo("vgId:%d, %s, peersId%d, addr:%" PRId64, ths->vgId, str, i, ths->peersId[i].addr);
×
2943
  }
2944

2945
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
2946
    sInfo("vgId:%d, %s, nodeInfo%d, clusterId:%" PRId64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str, i,
×
2947
          ths->raftCfg.cfg.nodeInfo[i].clusterId, ths->raftCfg.cfg.nodeInfo[i].nodeId,
2948
          ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, ths->raftCfg.cfg.nodeInfo[i].nodePort,
2949
          ths->raftCfg.cfg.nodeInfo[i].nodeRole);
2950
  }
2951

2952
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
2953
    sInfo("vgId:%d, %s, replicasId%d, addr:%" PRId64, ths->vgId, str, i, ths->replicasId[i].addr);
×
2954
  }
2955
}
×
2956

2957
int32_t syncNodeRebuildPeerAndCfg(SSyncNode* ths, SSyncCfg* cfg) {
×
2958
  int32_t i = 0;
×
2959

2960
  // change peersNodeInfo
2961
  i = 0;
×
2962
  for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
2963
    if (!(strcmp(ths->myNodeInfo.nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
2964
          ths->myNodeInfo.nodePort == cfg->nodeInfo[j].nodePort)) {
×
2965
      ths->peersNodeInfo[i].nodeRole = cfg->nodeInfo[j].nodeRole;
×
2966
      ths->peersNodeInfo[i].clusterId = cfg->nodeInfo[j].clusterId;
×
2967
      tstrncpy(ths->peersNodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn, TSDB_FQDN_LEN);
×
2968
      ths->peersNodeInfo[i].nodeId = cfg->nodeInfo[j].nodeId;
×
2969
      ths->peersNodeInfo[i].nodePort = cfg->nodeInfo[j].nodePort;
×
2970

2971
      syncUtilNodeInfo2EpSet(&ths->peersNodeInfo[i], &ths->peersEpset[i]);
×
2972

2973
      if (syncUtilNodeInfo2RaftId(&ths->peersNodeInfo[i], ths->vgId, &ths->peersId[i]) == false) {
×
2974
        sError("vgId:%d, failed to determine raft member id, peer:%d", ths->vgId, i);
×
2975
        return terrno;
×
2976
      }
2977

2978
      i++;
×
2979
    }
2980
  }
2981
  ths->peersNum = i;
×
2982

2983
  // change cfg nodeInfo
2984
  ths->raftCfg.cfg.replicaNum = 0;
×
2985
  i = 0;
×
2986
  for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
2987
    if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
2988
      ths->raftCfg.cfg.replicaNum++;
×
2989
    }
2990
    ths->raftCfg.cfg.nodeInfo[i].nodeRole = cfg->nodeInfo[j].nodeRole;
×
2991
    ths->raftCfg.cfg.nodeInfo[i].clusterId = cfg->nodeInfo[j].clusterId;
×
2992
    tstrncpy(ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn, TSDB_FQDN_LEN);
×
2993
    ths->raftCfg.cfg.nodeInfo[i].nodeId = cfg->nodeInfo[j].nodeId;
×
2994
    ths->raftCfg.cfg.nodeInfo[i].nodePort = cfg->nodeInfo[j].nodePort;
×
2995
    if ((strcmp(ths->myNodeInfo.nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
2996
         ths->myNodeInfo.nodePort == cfg->nodeInfo[j].nodePort)) {
×
2997
      ths->raftCfg.cfg.myIndex = i;
×
2998
    }
2999
    i++;
×
3000
  }
3001
  ths->raftCfg.cfg.totalReplicaNum = i;
×
3002

3003
  return 0;
×
3004
}
3005

3006
void syncNodeChangePeerAndCfgToVoter(SSyncNode* ths, SSyncCfg* cfg) {
×
3007
  // change peersNodeInfo
3008
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
3009
    for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
3010
      if (strcmp(ths->peersNodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
3011
          ths->peersNodeInfo[i].nodePort == cfg->nodeInfo[j].nodePort) {
×
3012
        if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3013
          ths->peersNodeInfo[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3014
        }
3015
      }
3016
    }
3017
  }
3018

3019
  // change cfg nodeInfo
3020
  ths->raftCfg.cfg.replicaNum = 0;
×
3021
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3022
    for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
3023
      if (strcmp(ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
3024
          ths->raftCfg.cfg.nodeInfo[i].nodePort == cfg->nodeInfo[j].nodePort) {
×
3025
        if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3026
          ths->raftCfg.cfg.nodeInfo[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3027
          ths->raftCfg.cfg.replicaNum++;
×
3028
        }
3029
      }
3030
    }
3031
  }
3032
}
×
3033

3034
int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum) {
×
3035
  int32_t code = 0;
×
3036
  // 1.rebuild replicasId, remove deleted one
3037
  SRaftId oldReplicasId[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
3038
  memcpy(oldReplicasId, ths->replicasId, sizeof(oldReplicasId));
×
3039

3040
  ths->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3041
  ths->totalReplicaNum = ths->raftCfg.cfg.totalReplicaNum;
×
3042
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3043
    if (syncUtilNodeInfo2RaftId(&ths->raftCfg.cfg.nodeInfo[i], ths->vgId, &ths->replicasId[i]) == false) return terrno;
×
3044
  }
3045

3046
  // 2.rebuild MatchIndex, remove deleted one
3047
  SSyncIndexMgr* oldIndex = ths->pMatchIndex;
×
3048

3049
  ths->pMatchIndex = syncIndexMgrCreate(ths);
×
3050
  if (ths->pMatchIndex == NULL) {
×
3051
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
3052
    if (terrno != 0) code = terrno;
×
3053
    TAOS_RETURN(code);
×
3054
  }
3055

3056
  syncIndexMgrCopyIfExist(ths->pMatchIndex, oldIndex, oldReplicasId);
×
3057

3058
  syncIndexMgrDestroy(oldIndex);
×
3059

3060
  // 3.rebuild NextIndex, remove deleted one
3061
  SSyncIndexMgr* oldNextIndex = ths->pNextIndex;
×
3062

3063
  ths->pNextIndex = syncIndexMgrCreate(ths);
×
3064
  if (ths->pNextIndex == NULL) {
×
3065
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
3066
    if (terrno != 0) code = terrno;
×
3067
    TAOS_RETURN(code);
×
3068
  }
3069

3070
  syncIndexMgrCopyIfExist(ths->pNextIndex, oldNextIndex, oldReplicasId);
×
3071

3072
  syncIndexMgrDestroy(oldNextIndex);
×
3073

3074
  // 4.rebuild pVotesGranted, pVotesRespond, no need to keep old vote state, only rebuild
3075
  voteGrantedUpdate(ths->pVotesGranted, ths);
×
3076
  votesRespondUpdate(ths->pVotesRespond, ths);
×
3077

3078
  // 5.rebuild logReplMgr
3079
  for (int i = 0; i < oldtotalReplicaNum; ++i) {
×
3080
    sDebug("vgId:%d, old logReplMgrs i:%d, peerId:%d, restoreed:%d, [%" PRId64 " %" PRId64 ", %" PRId64 ")", ths->vgId,
×
3081
           i, ths->logReplMgrs[i]->peerId, ths->logReplMgrs[i]->restored, ths->logReplMgrs[i]->startIndex,
3082
           ths->logReplMgrs[i]->matchIndex, ths->logReplMgrs[i]->endIndex);
3083
  }
3084

3085
  SSyncLogReplMgr* oldLogReplMgrs = NULL;
×
3086
  int64_t          length = sizeof(SSyncLogReplMgr) * (TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA);
×
3087
  oldLogReplMgrs = taosMemoryMalloc(length);
×
3088
  if (NULL == oldLogReplMgrs) return terrno;
×
3089
  memset(oldLogReplMgrs, 0, length);
×
3090

3091
  for (int i = 0; i < oldtotalReplicaNum; i++) {
×
3092
    oldLogReplMgrs[i] = *(ths->logReplMgrs[i]);
×
3093
  }
3094

3095
  syncNodeLogReplDestroy(ths);
×
3096
  if ((code = syncNodeLogReplInit(ths)) != 0) {
×
3097
    taosMemoryFree(oldLogReplMgrs);
×
3098
    TAOS_RETURN(code);
×
3099
  }
3100

3101
  for (int i = 0; i < ths->totalReplicaNum; ++i) {
×
3102
    for (int j = 0; j < oldtotalReplicaNum; j++) {
×
3103
      if (syncUtilSameId(&ths->replicasId[i], &oldReplicasId[j])) {
×
3104
        *(ths->logReplMgrs[i]) = oldLogReplMgrs[j];
×
3105
        ths->logReplMgrs[i]->peerId = i;
×
3106
      }
3107
    }
3108
  }
3109

3110
  for (int i = 0; i < ths->totalReplicaNum; ++i) {
×
3111
    sDebug("vgId:%d, new logReplMgrs i:%d, peerId:%d, restoreed:%d, [%" PRId64 " %" PRId64 ", %" PRId64 ")", ths->vgId,
×
3112
           i, ths->logReplMgrs[i]->peerId, ths->logReplMgrs[i]->restored, ths->logReplMgrs[i]->startIndex,
3113
           ths->logReplMgrs[i]->matchIndex, ths->logReplMgrs[i]->endIndex);
3114
  }
3115

3116
  // 6.rebuild sender
3117
  for (int i = 0; i < oldtotalReplicaNum; ++i) {
×
3118
    sDebug("vgId:%d, old sender i:%d, replicaIndex:%d, lastSendTime:%" PRId64, ths->vgId, i,
×
3119
           ths->senders[i]->replicaIndex, ths->senders[i]->lastSendTime)
3120
  }
3121

3122
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
×
3123
    if (ths->senders[i] != NULL) {
×
3124
      sDebug("vgId:%d, snapshot sender destroy while close, data:%p", ths->vgId, ths->senders[i]);
×
3125

3126
      if (snapshotSenderIsStart(ths->senders[i])) {
×
3127
        snapshotSenderStop(ths->senders[i], false);
×
3128
      }
3129

3130
      snapshotSenderDestroy(ths->senders[i]);
×
3131
      ths->senders[i] = NULL;
×
3132
    }
3133
  }
3134

3135
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
×
3136
    SSyncSnapshotSender* pSender = NULL;
×
3137
    int32_t              code = snapshotSenderCreate(ths, i, &pSender);
×
3138
    if (pSender == NULL) return terrno = code;
×
3139

3140
    ths->senders[i] = pSender;
×
3141
    sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender);
×
3142
  }
3143

3144
  for (int i = 0; i < ths->totalReplicaNum; i++) {
×
3145
    sDebug("vgId:%d, new sender i:%d, replicaIndex:%d, lastSendTime:%" PRId64, ths->vgId, i,
×
3146
           ths->senders[i]->replicaIndex, ths->senders[i]->lastSendTime)
3147
  }
3148

3149
  // 7.rebuild synctimer
3150
  if ((code = syncNodeStopHeartbeatTimer(ths)) != 0) {
×
3151
    taosMemoryFree(oldLogReplMgrs);
×
3152
    TAOS_RETURN(code);
×
3153
  }
3154

3155
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
×
3156
    if ((code = syncHbTimerInit(ths, &ths->peerHeartbeatTimerArr[i], ths->replicasId[i])) != 0) {
×
3157
      taosMemoryFree(oldLogReplMgrs);
×
3158
      TAOS_RETURN(code);
×
3159
    }
3160
  }
3161

3162
  if ((code = syncNodeStartHeartbeatTimer(ths)) != 0) {
×
3163
    taosMemoryFree(oldLogReplMgrs);
×
3164
    TAOS_RETURN(code);
×
3165
  }
3166

3167
  // 8.rebuild peerStates
3168
  SPeerState oldState[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA] = {0};
×
3169
  for (int i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; i++) {
×
3170
    oldState[i] = ths->peerStates[i];
×
3171
  }
3172

3173
  for (int i = 0; i < ths->totalReplicaNum; i++) {
×
3174
    for (int j = 0; j < oldtotalReplicaNum; j++) {
×
3175
      if (syncUtilSameId(&ths->replicasId[i], &oldReplicasId[j])) {
×
3176
        ths->peerStates[i] = oldState[j];
×
3177
      }
3178
    }
3179
  }
3180

3181
  taosMemoryFree(oldLogReplMgrs);
×
3182

3183
  return 0;
×
3184
}
3185

3186
void syncNodeChangeToVoter(SSyncNode* ths) {
×
3187
  // replicasId, only need to change replicaNum when 1->3
3188
  ths->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3189
  sDebug("vgId:%d, totalReplicaNum:%d", ths->vgId, ths->totalReplicaNum);
×
3190
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
×
3191
    sDebug("vgId:%d, i:%d, replicaId.addr:%" PRIx64, ths->vgId, i, ths->replicasId[i].addr);
×
3192
  }
3193

3194
  // pMatchIndex, pNextIndex, only need to change replicaNum when 1->3
3195
  ths->pMatchIndex->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3196
  ths->pNextIndex->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3197

3198
  sDebug("vgId:%d, pMatchIndex->totalReplicaNum:%d", ths->vgId, ths->pMatchIndex->totalReplicaNum);
×
3199
  for (int32_t i = 0; i < ths->pMatchIndex->totalReplicaNum; ++i) {
×
3200
    sDebug("vgId:%d, i:%d, match.index:%" PRId64, ths->vgId, i, ths->pMatchIndex->index[i]);
×
3201
  }
3202

3203
  // pVotesGranted, pVotesRespond
3204
  voteGrantedUpdate(ths->pVotesGranted, ths);
×
3205
  votesRespondUpdate(ths->pVotesRespond, ths);
×
3206

3207
  // logRepMgrs
3208
  // no need to change logRepMgrs when 1->3
3209
}
×
3210

3211
void syncNodeResetPeerAndCfg(SSyncNode* ths) {
×
3212
  SNodeInfo node = {0};
×
3213
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
3214
    memcpy(&ths->peersNodeInfo[i], &node, sizeof(SNodeInfo));
×
3215
  }
3216

3217
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3218
    memcpy(&ths->raftCfg.cfg.nodeInfo[i], &node, sizeof(SNodeInfo));
×
3219
  }
3220
}
×
3221

3222
int32_t syncNodeChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry, char* str) {
×
3223
  int32_t code = 0;
×
3224
  if (pEntry->originalRpcType != TDMT_SYNC_CONFIG_CHANGE) {
×
3225
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3226
  }
3227

3228
  SMsgHead* head = (SMsgHead*)pEntry->data;
×
3229
  void*     pReq = POINTER_SHIFT(head, sizeof(SMsgHead));
×
3230

3231
  SAlterVnodeTypeReq req = {0};
×
3232
  if (tDeserializeSAlterVnodeReplicaReq(pReq, head->contLen, &req) != 0) {
×
3233
    code = TSDB_CODE_INVALID_MSG;
×
3234
    TAOS_RETURN(code);
×
3235
  }
3236

3237
  SSyncCfg cfg = {0};
×
3238
  syncBuildConfigFromReq(&req, &cfg);
×
3239

3240
  if (cfg.changeVersion <= ths->raftCfg.cfg.changeVersion) {
×
3241
    sInfo(
×
3242
        "vgId:%d, skip conf change entry since lower version. "
3243
        "this entry, index:%" PRId64 ", term:%" PRId64
3244
        ", totalReplicaNum:%d, changeVersion:%d; "
3245
        "current node, replicaNum:%d, peersNum:%d, lastConfigIndex:%" PRId64 ", changeVersion:%d",
3246
        ths->vgId, pEntry->index, pEntry->term, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum, ths->peersNum,
3247
        ths->raftCfg.lastConfigIndex, ths->raftCfg.cfg.changeVersion);
3248
    return 0;
×
3249
  }
3250

3251
  if (strcmp(str, "Commit") == 0) {
×
3252
    sInfo(
×
3253
        "vgId:%d, change config from %s. "
3254
        "this, i:%" PRId64
3255
        ", trNum:%d, vers:%d; "
3256
        "node, rNum:%d, pNum:%d, trNum:%d, "
3257
        "buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
3258
        "), "
3259
        "cond:(next i:%" PRId64 ", t:%" PRId64 " ==%s)",
3260
        ths->vgId, str, pEntry->index - 1, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum, ths->peersNum,
3261
        ths->totalReplicaNum, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex, ths->pLogBuf->matchIndex,
3262
        ths->pLogBuf->endIndex, pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType));
3263
  } else {
3264
    sInfo(
×
3265
        "vgId:%d, change config from %s. "
3266
        "this, i:%" PRId64 ", t:%" PRId64
3267
        ", trNum:%d, vers:%d; "
3268
        "node, rNum:%d, pNum:%d, trNum:%d, "
3269
        "buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
3270
        "), "
3271
        "cond:(pre i:%" PRId64 "==ci:%" PRId64 ", bci:%" PRId64 ")",
3272
        ths->vgId, str, pEntry->index, pEntry->term, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum,
3273
        ths->peersNum, ths->totalReplicaNum, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
3274
        ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex, pEntry->index - 1, ths->commitIndex,
3275
        ths->pLogBuf->commitIndex);
3276
  }
3277

3278
  syncNodeLogConfigInfo(ths, &cfg, "before config change");
×
3279

3280
  int32_t oldTotalReplicaNum = ths->totalReplicaNum;
×
3281

3282
  if (cfg.totalReplicaNum == 1 || cfg.totalReplicaNum == 2) {  // remove replica
×
3283

3284
    bool incfg = false;
×
3285
    for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
×
3286
      if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
×
3287
          ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
×
3288
        incfg = true;
×
3289
        break;
×
3290
      }
3291
    }
3292

3293
    if (incfg) {  // remove other
×
3294
      syncNodeResetPeerAndCfg(ths);
×
3295

3296
      // no need to change myNodeInfo
3297

3298
      if ((code = syncNodeRebuildPeerAndCfg(ths, &cfg)) != 0) {
×
3299
        TAOS_RETURN(code);
×
3300
      };
3301

3302
      if ((code = syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum)) != 0) {
×
3303
        TAOS_RETURN(code);
×
3304
      };
3305
    } else {  // remove myself
3306
      // no need to do anything actually, to change the following to reduce distruptive server chance
3307

3308
      syncNodeResetPeerAndCfg(ths);
×
3309

3310
      // change myNodeInfo
3311
      ths->myNodeInfo.nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3312

3313
      // change peer and cfg
3314
      ths->peersNum = 0;
×
3315
      memcpy(&ths->raftCfg.cfg.nodeInfo[0], &ths->myNodeInfo, sizeof(SNodeInfo));
×
3316
      ths->raftCfg.cfg.replicaNum = 0;
×
3317
      ths->raftCfg.cfg.totalReplicaNum = 1;
×
3318

3319
      // change other
3320
      if ((code = syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum)) != 0) {
×
3321
        TAOS_RETURN(code);
×
3322
      }
3323

3324
      // change state
3325
      ths->state = TAOS_SYNC_STATE_LEARNER;
×
3326
    }
3327

3328
    ths->restoreFinish = false;
×
3329
  } else {                            // add replica, or change replica type
3330
    if (ths->totalReplicaNum == 3) {  // change replica type
×
3331
      sInfo("vgId:%d, begin change replica type", ths->vgId);
×
3332

3333
      // change myNodeInfo
3334
      for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
×
3335
        if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
×
3336
            ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
×
3337
          if (cfg.nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3338
            ths->myNodeInfo.nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3339
          }
3340
        }
3341
      }
3342

3343
      // change peer and cfg
3344
      syncNodeChangePeerAndCfgToVoter(ths, &cfg);
×
3345

3346
      // change other
3347
      syncNodeChangeToVoter(ths);
×
3348

3349
      // change state
3350
      if (ths->state == TAOS_SYNC_STATE_LEARNER) {
×
3351
        if (ths->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3352
          ths->state = TAOS_SYNC_STATE_FOLLOWER;
×
3353
        }
3354
      }
3355

3356
      ths->restoreFinish = false;
×
3357
    } else {  // add replica
3358
      sInfo("vgId:%d, begin add replica", ths->vgId);
×
3359

3360
      // no need to change myNodeInfo
3361

3362
      // change peer and cfg
3363
      if ((code = syncNodeRebuildPeerAndCfg(ths, &cfg)) != 0) {
×
3364
        TAOS_RETURN(code);
×
3365
      };
3366

3367
      // change other
3368
      if ((code = syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum)) != 0) {
×
3369
        TAOS_RETURN(code);
×
3370
      };
3371

3372
      // no need to change state
3373

3374
      if (ths->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_LEARNER) {
×
3375
        ths->restoreFinish = false;
×
3376
      }
3377
    }
3378
  }
3379

3380
  ths->quorum = syncUtilQuorum(ths->replicaNum);
×
3381

3382
  ths->raftCfg.lastConfigIndex = pEntry->index;
×
3383
  ths->raftCfg.cfg.lastIndex = pEntry->index;
×
3384
  ths->raftCfg.cfg.changeVersion = cfg.changeVersion;
×
3385

3386
  syncNodeLogConfigInfo(ths, &cfg, "after config change");
×
3387

3388
  if ((code = syncWriteCfgFile(ths)) != 0) {
×
3389
    sError("vgId:%d, failed to create sync cfg file", ths->vgId);
×
3390
    TAOS_RETURN(code);
×
3391
  };
3392

3393
  TAOS_RETURN(code);
×
3394
}
3395

3396
int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) {
10,385,416✔
3397
  int32_t code = -1;
10,385,416✔
3398
  if (pEntry->dataLen < sizeof(SMsgHead)) {
10,385,416!
3399
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
3400
    sError("vgId:%d, cannot append an invalid client request with no msg head. type:%s, dataLen:%d", ths->vgId,
×
3401
           TMSG_INFO(pEntry->originalRpcType), pEntry->dataLen);
3402
    syncEntryDestroy(pEntry);
×
3403
    pEntry = NULL;
×
3404
    goto _out;
×
3405
  }
3406

3407
  // append to log buffer
3408
  if ((code = syncLogBufferAppend(ths->pLogBuf, ths, pEntry)) < 0) {
10,385,416✔
3409
    sError("vgId:%d, failed to enqueue sync log buffer, index:%" PRId64, ths->vgId, pEntry->index);
233!
3410
    int32_t ret = 0;
233✔
3411
    if ((ret = syncFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, terrno, false)) != 0) {
233!
3412
      sError("vgId:%d, failed to execute fsm, since %s", ths->vgId, tstrerror(ret));
×
3413
    }
3414
    syncEntryDestroy(pEntry);
×
3415
    pEntry = NULL;
×
3416
    goto _out;
×
3417
  }
3418

3419
  code = 0;
10,385,396✔
3420
_out:;
10,385,396✔
3421
  // proceed match index, with replicating on needed
3422
  SyncIndex matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, NULL, "Append");
10,385,396✔
3423

3424
  if (pEntry != NULL)
10,385,291✔
3425
    sTrace("vgId:%d, append raft entry. index:%" PRId64 ", term:%" PRId64 " pBuf: [%" PRId64 " %" PRId64 " %" PRId64
10,385,211✔
3426
           ", %" PRId64 ")",
3427
           ths->vgId, pEntry->index, pEntry->term, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
3428
           ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex);
3429

3430
  if (code == 0 && ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
10,385,291!
3431
    int64_t index = syncNodeUpdateAssignedCommitIndex(ths, matchIndex);
×
3432
    sTrace("vgId:%d, update assigned commit index %" PRId64 "", ths->vgId, index);
×
3433

3434
    if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
×
3435
        syncLogBufferCommit(ths->pLogBuf, ths, ths->assignedCommitIndex) < 0) {
×
3436
      sError("vgId:%d, failed to commit until commitIndex:%" PRId64 "", ths->vgId, ths->commitIndex);
×
3437
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
3438
    }
3439
  }
3440

3441
  // multi replica
3442
  if (ths->replicaNum > 1) {
10,385,253✔
3443
    TAOS_RETURN(code);
138,309✔
3444
  }
3445

3446
  // single replica
3447
  SyncIndex returnIndex = syncNodeUpdateCommitIndex(ths, matchIndex);
10,246,944✔
3448
  sTrace("vgId:%d, update commit return index %" PRId64 "", ths->vgId, returnIndex);
10,247,215✔
3449

3450
  if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
20,494,635!
3451
      (code = syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex)) < 0) {
10,247,187✔
3452
    sError("vgId:%d, failed to commit until commitIndex:%" PRId64 "", ths->vgId, ths->commitIndex);
×
3453
  }
3454

3455
  TAOS_RETURN(code);
10,247,448✔
3456
}
3457

3458
bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode) {
10,374,982✔
3459
  if (pSyncNode->totalReplicaNum == 1) {
10,374,982✔
3460
    return false;
9,995,188✔
3461
  }
3462

3463
  int32_t toCount = 0;
379,794✔
3464
  int64_t tsNow = taosGetTimestampMs();
380,061✔
3465
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
895,971✔
3466
    if (pSyncNode->peersNodeInfo[i].nodeRole == TAOS_SYNC_ROLE_LEARNER) {
515,911✔
3467
      continue;
241,752✔
3468
    }
3469
    int64_t recvTime = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
274,159✔
3470
    if (recvTime == 0 || recvTime == -1) {
274,158!
3471
      continue;
×
3472
    }
3473

3474
    if (tsNow - recvTime > tsHeartbeatTimeout) {
274,158✔
3475
      toCount++;
2,869✔
3476
    }
3477
  }
3478

3479
  bool b = (toCount >= pSyncNode->quorum ? true : false);
380,060✔
3480

3481
  return b;
380,060✔
3482
}
3483

3484
bool syncNodeSnapshotSending(SSyncNode* pSyncNode) {
×
3485
  if (pSyncNode == NULL) return false;
×
3486
  bool b = false;
×
3487
  for (int32_t i = 0; i < pSyncNode->totalReplicaNum; ++i) {
×
3488
    if (pSyncNode->senders[i] != NULL && pSyncNode->senders[i]->start) {
×
3489
      b = true;
×
3490
      break;
×
3491
    }
3492
  }
3493
  return b;
×
3494
}
3495

3496
bool syncNodeSnapshotRecving(SSyncNode* pSyncNode) {
×
3497
  if (pSyncNode == NULL) return false;
×
3498
  if (pSyncNode->pNewNodeReceiver == NULL) return false;
×
3499
  if (pSyncNode->pNewNodeReceiver->start) return true;
×
3500
  return false;
×
3501
}
3502

3503
static int32_t syncNodeAppendNoop(SSyncNode* ths) {
11,703✔
3504
  int32_t   code = 0;
11,703✔
3505
  SyncIndex index = syncLogBufferGetEndIndex(ths->pLogBuf);
11,703✔
3506
  SyncTerm  term = raftStoreGetTerm(ths);
11,703✔
3507

3508
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
11,703✔
3509
  if (pEntry == NULL) {
11,703!
3510
    code = TSDB_CODE_OUT_OF_MEMORY;
×
3511
    TAOS_RETURN(code);
×
3512
  }
3513

3514
  code = syncNodeAppend(ths, pEntry);
11,703✔
3515
  TAOS_RETURN(code);
11,703✔
3516
}
3517

3518
#ifdef BUILD_NO_CALL
3519
static int32_t syncNodeAppendNoopOld(SSyncNode* ths) {
3520
  int32_t ret = 0;
3521

3522
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
3523
  SyncTerm        term = raftStoreGetTerm(ths);
3524
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
3525
  if (pEntry == NULL) return -1;
3526

3527
  LRUHandle* h = NULL;
3528

3529
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
3530
    int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry, false);
3531
    if (code != 0) {
3532
      sError("append noop error");
3533
      return -1;
3534
    }
3535

3536
    syncCacheEntry(ths->pLogStore, pEntry, &h);
3537
  }
3538

3539
  if (h) {
3540
    taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
3541
  } else {
3542
    syncEntryDestroy(pEntry);
3543
  }
3544

3545
  return ret;
3546
}
3547
#endif
3548

3549
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
38,673✔
3550
  SyncHeartbeat* pMsg = pRpcMsg->pCont;
38,673✔
3551
  bool           resetElect = false;
38,673✔
3552

3553
  const STraceId* trace = &pRpcMsg->info.traceId;
38,673✔
3554
  char            tbuf[40] = {0};
38,673✔
3555
  TRACE_TO_STR(trace, tbuf);
38,673!
3556

3557
  int64_t tsMs = taosGetTimestampMs();
38,673✔
3558
  int64_t timeDiff = tsMs - pMsg->timeStamp;
38,673✔
3559
  syncLogRecvHeartbeat(ths, pMsg, timeDiff, tbuf);
38,673✔
3560

3561
  if (!syncNodeInRaftGroup(ths, &pMsg->srcId)) {
38,673✔
3562
    sWarn(
6!
3563
        "vgId:%d, drop heartbeat msg from dnode:%d, because it come from another cluster:%d, differ from current "
3564
        "cluster:%d",
3565
        ths->vgId, DID(&(pMsg->srcId)), CID(&(pMsg->srcId)), CID(&(ths->myRaftId)));
3566
    return 0;
6✔
3567
  }
3568

3569
  SRpcMsg rpcMsg = {0};
38,667✔
3570
  TAOS_CHECK_RETURN(syncBuildHeartbeatReply(&rpcMsg, ths->vgId));
38,667!
3571
  SyncTerm currentTerm = raftStoreGetTerm(ths);
38,667✔
3572

3573
  SyncHeartbeatReply* pMsgReply = rpcMsg.pCont;
38,667✔
3574
  pMsgReply->destId = pMsg->srcId;
38,667✔
3575
  pMsgReply->srcId = ths->myRaftId;
38,667✔
3576
  pMsgReply->term = currentTerm;
38,667✔
3577
  pMsgReply->privateTerm = 8864;  // magic number
38,667✔
3578
  pMsgReply->startTime = ths->startTime;
38,667✔
3579
  pMsgReply->timeStamp = tsMs;
38,667✔
3580

3581
  sGTrace("vgId:%d, process sync-heartbeat msg from dnode:%d, cluster:%d, Msgterm:%" PRId64 " currentTerm:%" PRId64,
38,667!
3582
          ths->vgId, DID(&(pMsg->srcId)), CID(&(pMsg->srcId)), pMsg->term, currentTerm);
3583

3584
  if (pMsg->term > currentTerm && ths->state == TAOS_SYNC_STATE_LEARNER) {
38,667✔
3585
    raftStoreSetTerm(ths, pMsg->term);
269✔
3586
    currentTerm = pMsg->term;
269✔
3587
  }
3588

3589
  if (pMsg->term == currentTerm &&
38,667✔
3590
      (ths->state != TAOS_SYNC_STATE_LEADER && ths->state != TAOS_SYNC_STATE_ASSIGNED_LEADER)) {
38,507!
3591
    syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs);
38,507✔
3592
    resetElect = true;
38,507✔
3593

3594
    ths->minMatchIndex = pMsg->minMatchIndex;
38,507✔
3595

3596
    if (ths->state == TAOS_SYNC_STATE_FOLLOWER || ths->state == TAOS_SYNC_STATE_LEARNER) {
38,507✔
3597
      SRpcMsg rpcMsgLocalCmd = {0};
38,503✔
3598
      TAOS_CHECK_RETURN(syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId));
38,503!
3599

3600
      SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
38,503✔
3601
      pSyncMsg->cmd =
38,503✔
3602
          (ths->state == TAOS_SYNC_STATE_LEARNER) ? SYNC_LOCAL_CMD_LEARNER_CMT : SYNC_LOCAL_CMD_FOLLOWER_CMT;
38,503✔
3603
      pSyncMsg->commitIndex = pMsg->commitIndex;
38,503✔
3604
      pSyncMsg->currentTerm = pMsg->term;
38,503✔
3605

3606
      if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
38,503!
3607
        int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
38,503✔
3608
        if (code != 0) {
38,503!
3609
          sError("vgId:%d, failed to enqueue commit msg from heartbeat since %s, code:%d", ths->vgId, terrstr(), code);
×
3610
          rpcFreeCont(rpcMsgLocalCmd.pCont);
×
3611
        } else {
3612
          sTrace("vgId:%d, enqueue commit msg from heartbeat, commit-index:%" PRId64 ", term:%" PRId64, ths->vgId,
38,503!
3613
                 pMsg->commitIndex, pMsg->term);
3614
        }
3615
      }
3616
    }
3617
  }
3618

3619
  if (pMsg->term >= currentTerm &&
38,667!
3620
      (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER)) {
38,667!
3621
    SRpcMsg rpcMsgLocalCmd = {0};
×
3622
    TAOS_CHECK_RETURN(syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId));
×
3623

3624
    SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
×
3625
    pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN;
×
3626
    pSyncMsg->currentTerm = pMsg->term;
×
3627
    pSyncMsg->commitIndex = pMsg->commitIndex;
×
3628

3629
    if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
×
3630
      int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
×
3631
      if (code != 0) {
×
3632
        sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code);
×
3633
        rpcFreeCont(rpcMsgLocalCmd.pCont);
×
3634
      } else {
3635
        sTrace("vgId:%d, sync enqueue step-down msg, new-term:%" PRId64, ths->vgId, pMsg->term);
×
3636
      }
3637
    }
3638
  }
3639

3640
  // reply
3641
  TAOS_CHECK_RETURN(syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg));
38,667!
3642

3643
  if (resetElect) syncNodeResetElectTimer(ths);
38,667✔
3644
  return 0;
38,667✔
3645
}
3646

3647
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
38,092✔
3648
  int32_t         code = 0;
38,092✔
3649
  const STraceId* trace = &pRpcMsg->info.traceId;
38,092✔
3650
  char            tbuf[40] = {0};
38,092✔
3651
  TRACE_TO_STR(trace, tbuf);
38,092!
3652

3653
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
38,092✔
3654
  SSyncLogReplMgr*    pMgr = syncNodeGetLogReplMgr(ths, &pMsg->srcId);
38,092✔
3655
  if (pMgr == NULL) {
38,092!
3656
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
3657
    if (terrno != 0) code = terrno;
×
3658
    sError("vgId:%d, failed to get log repl mgr for the peer at addr 0x016%" PRIx64 "", ths->vgId, pMsg->srcId.addr);
×
3659
    TAOS_RETURN(code);
×
3660
  }
3661

3662
  int64_t tsMs = taosGetTimestampMs();
38,092✔
3663
  syncLogRecvHeartbeatReply(ths, pMsg, tsMs - pMsg->timeStamp, tbuf);
38,092✔
3664

3665
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);
38,092✔
3666

3667
  return syncLogReplProcessHeartbeatReply(pMgr, ths, pMsg);
38,092✔
3668
}
3669

3670
#ifdef BUILD_NO_CALL
3671
int32_t syncNodeOnHeartbeatReplyOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
3672
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
3673

3674
  const STraceId* trace = &pRpcMsg->info.traceId;
3675
  char            tbuf[40] = {0};
3676
  TRACE_TO_STR(trace, tbuf);
3677

3678
  int64_t tsMs = taosGetTimestampMs();
3679
  int64_t timeDiff = tsMs - pMsg->timeStamp;
3680
  syncLogRecvHeartbeatReply(ths, pMsg, timeDiff, tbuf);
3681

3682
  // update last reply time, make decision whether the other node is alive or not
3683
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);
3684
  return 0;
3685
}
3686
#endif
3687

3688
int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
38,502✔
3689
  SyncLocalCmd* pMsg = pRpcMsg->pCont;
38,502✔
3690
  syncLogRecvLocalCmd(ths, pMsg, "");
38,502✔
3691

3692
  if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
38,502!
3693
    SRaftId id = EMPTY_RAFT_ID;
×
3694
    syncNodeStepDown(ths, pMsg->currentTerm, id);
×
3695

3696
  } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT || pMsg->cmd == SYNC_LOCAL_CMD_LEARNER_CMT) {
77,004!
3697
    if (syncLogBufferIsEmpty(ths->pLogBuf)) {
38,502!
3698
      sError("vgId:%d, sync log buffer is empty.", ths->vgId);
×
3699
      return 0;
×
3700
    }
3701
    SyncTerm matchTerm = syncLogBufferGetLastMatchTerm(ths->pLogBuf);
38,503✔
3702
    if (matchTerm < 0) {
38,503!
3703
      return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3704
    }
3705
    if (pMsg->currentTerm == matchTerm) {
38,503✔
3706
      SyncIndex returnIndex = syncNodeUpdateCommitIndex(ths, pMsg->commitIndex);
35,017✔
3707
      sTrace("vgId:%d, update commit return index %" PRId64 "", ths->vgId, returnIndex);
35,015!
3708
    }
3709
    if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE && syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) {
38,501!
3710
      sError("vgId:%d, failed to commit raft log since %s. commit index:%" PRId64 "", ths->vgId, terrstr(),
×
3711
             ths->commitIndex);
3712
    }
3713
  } else {
3714
    sError("error local cmd");
×
3715
  }
3716

3717
  return 0;
38,503✔
3718
}
3719

3720
// TLA+ Spec
3721
// ClientRequest(i, v) ==
3722
//     /\ state[i] = Leader
3723
//     /\ LET entry == [term  |-> currentTerm[i],
3724
//                      value |-> v]
3725
//            newLog == Append(log[i], entry)
3726
//        IN  log' = [log EXCEPT ![i] = newLog]
3727
//     /\ UNCHANGED <<messages, serverVars, candidateVars,
3728
//                    leaderVars, commitIndex>>
3729
//
3730

3731
int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex) {
10,373,837✔
3732
  sNTrace(ths, "on client request");
10,373,837✔
3733

3734
  int32_t code = 0;
10,373,837✔
3735

3736
  SyncIndex       index = syncLogBufferGetEndIndex(ths->pLogBuf);
10,373,837✔
3737
  SyncTerm        term = raftStoreGetTerm(ths);
10,374,045✔
3738
  SSyncRaftEntry* pEntry = NULL;
10,374,088✔
3739
  if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
10,374,088✔
3740
    pEntry = syncEntryBuildFromClientRequest(pMsg->pCont, term, index);
223,706✔
3741
  } else {
3742
    pEntry = syncEntryBuildFromRpcMsg(pMsg, term, index);
10,150,382✔
3743
  }
3744

3745
  if (pEntry == NULL) {
10,373,892!
3746
    sError("vgId:%d, failed to process client request since %s.", ths->vgId, terrstr());
×
3747
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3748
  }
3749

3750
  // 1->2, config change is add in write thread, and will continue in sync thread
3751
  // need save message for it
3752
  if (pMsg->msgType == TDMT_SYNC_CONFIG_CHANGE) {
10,373,892!
3753
    SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
×
3754
    uint64_t  seqNum = syncRespMgrAdd(ths->pSyncRespMgr, &stub);
×
3755
    pEntry->seqNum = seqNum;
×
3756
  }
3757

3758
  if (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
10,373,892!
3759
    if (pRetIndex) {
10,373,892✔
3760
      (*pRetIndex) = index;
10,150,118✔
3761
    }
3762

3763
    if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) {
10,373,892!
3764
      int32_t code = syncNodeCheckChangeConfig(ths, pEntry);
×
3765
      if (code < 0) {
×
3766
        sError("vgId:%d, failed to check change config since %s.", ths->vgId, terrstr());
×
3767
        syncEntryDestroy(pEntry);
×
3768
        pEntry = NULL;
×
3769
        TAOS_RETURN(code);
×
3770
      }
3771

3772
      if (code > 0) {
×
3773
        SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
×
3774
        int32_t num = syncRespMgrGetAndDel(ths->pSyncRespMgr, pEntry->seqNum, &rsp.info);
×
3775
        sDebug("vgId:%d, get response stub for config change, seqNum:%" PRIu64 ", num:%d", ths->vgId, pEntry->seqNum,
×
3776
               num);
3777
        if (rsp.info.handle != NULL) {
×
3778
          tmsgSendRsp(&rsp);
×
3779
        }
3780
        syncEntryDestroy(pEntry);
×
3781
        pEntry = NULL;
×
3782
        TAOS_RETURN(code);
×
3783
      }
3784
    }
3785

3786
    code = syncNodeAppend(ths, pEntry);
10,373,892✔
3787
    return code;
10,373,499✔
3788
  } else {
3789
    syncEntryDestroy(pEntry);
×
3790
    pEntry = NULL;
×
3791
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3792
  }
3793
}
3794

3795
const char* syncStr(ESyncState state) {
1,460,132✔
3796
  switch (state) {
1,460,132!
3797
    case TAOS_SYNC_STATE_FOLLOWER:
131,432✔
3798
      return "follower";
131,432✔
3799
    case TAOS_SYNC_STATE_CANDIDATE:
6,100✔
3800
      return "candidate";
6,100✔
3801
    case TAOS_SYNC_STATE_LEADER:
1,310,892✔
3802
      return "leader";
1,310,892✔
3803
    case TAOS_SYNC_STATE_ERROR:
×
3804
      return "error";
×
3805
    case TAOS_SYNC_STATE_OFFLINE:
3,619✔
3806
      return "offline";
3,619✔
3807
    case TAOS_SYNC_STATE_LEARNER:
8,127✔
3808
      return "learner";
8,127✔
3809
    case TAOS_SYNC_STATE_ASSIGNED_LEADER:
×
3810
      return "assigned leader";
×
3811
    default:
×
3812
      return "unknown";
×
3813
  }
3814
}
3815

3816
int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) {
1,703✔
3817
  for (int32_t i = 0; i < pNewCfg->totalReplicaNum; ++i) {
1,952!
3818
    SRaftId raftId = {
1,952✔
3819
        .addr = SYNC_ADDR(&pNewCfg->nodeInfo[i]),
1,952✔
3820
        .vgId = ths->vgId,
1,952✔
3821
    };
3822

3823
    if (syncUtilSameId(&(ths->myRaftId), &raftId)) {
1,952✔
3824
      pNewCfg->myIndex = i;
1,703✔
3825
      return 0;
1,703✔
3826
    }
3827
  }
3828

3829
  return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3830
}
3831

3832
bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) {
10,374,913✔
3833
  return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1);
10,374,913!
3834
}
3835

3836
bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId) {
6,751,484✔
3837
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
10,449,912✔
3838
    if (syncUtilSameId(&((ths->replicasId)[i]), pRaftId)) {
10,449,910✔
3839
      return true;
6,751,479✔
3840
    }
3841
  }
3842
  return false;
2✔
3843
}
3844

3845
SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId) {
112,848✔
3846
  SSyncSnapshotSender* pSender = NULL;
112,848✔
3847
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
365,959✔
3848
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
253,098✔
3849
      pSender = (ths->senders)[i];
112,877✔
3850
    }
3851
  }
3852
  return pSender;
112,861✔
3853
}
3854

3855
SSyncTimer* syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId) {
27,213✔
3856
  SSyncTimer* pTimer = NULL;
27,213✔
3857
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
114,033✔
3858
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
86,816✔
3859
      pTimer = &((ths->peerHeartbeatTimerArr)[i]);
27,214✔
3860
    }
3861
  }
3862
  return pTimer;
27,217✔
3863
}
3864

3865
SPeerState* syncNodeGetPeerState(SSyncNode* ths, const SRaftId* pDestId) {
19,871✔
3866
  SPeerState* pState = NULL;
19,871✔
3867
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
43,658✔
3868
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
23,787✔
3869
      pState = &((ths->peerStates)[i]);
19,871✔
3870
    }
3871
  }
3872
  return pState;
19,871✔
3873
}
3874

3875
#ifdef BUILD_NO_CALL
3876
bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const SyncAppendEntries* pMsg) {
3877
  SPeerState* pState = syncNodeGetPeerState(ths, pDestId);
3878
  if (pState == NULL) {
3879
    sError("vgId:%d, replica maybe dropped", ths->vgId);
3880
    return false;
3881
  }
3882

3883
  SyncIndex sendIndex = pMsg->prevLogIndex + 1;
3884
  int64_t   tsNow = taosGetTimestampMs();
3885

3886
  if (pState->lastSendIndex == sendIndex && tsNow - pState->lastSendTime < SYNC_APPEND_ENTRIES_TIMEOUT_MS) {
3887
    return false;
3888
  }
3889

3890
  return true;
3891
}
3892

3893
bool syncNodeCanChange(SSyncNode* pSyncNode) {
3894
  if (pSyncNode->changing) {
3895
    sError("sync cannot change");
3896
    return false;
3897
  }
3898

3899
  if ((pSyncNode->commitIndex >= SYNC_INDEX_BEGIN)) {
3900
    SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
3901
    if (pSyncNode->commitIndex != lastIndex) {
3902
      sError("sync cannot change2");
3903
      return false;
3904
    }
3905
  }
3906

3907
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
3908
    SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->peersId)[i]);
3909
    if (pSender != NULL && pSender->start) {
3910
      sError("sync cannot change3");
3911
      return false;
3912
    }
3913
  }
3914

3915
  return true;
3916
}
3917
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc