• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4914

06 Jan 2026 01:30AM UTC coverage: 64.876% (-0.008%) from 64.884%
#4914

push

travis-ci

web-flow
merge: from main to 3.0 branch #34167

180 of 319 new or added lines in 14 files covered. (56.43%)

3475 existing lines in 124 files now uncovered.

194993 of 300563 relevant lines covered (64.88%)

116239151.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.19
/source/libs/sync/src/syncMain.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "sync.h"
18
#include "syncAppendEntries.h"
19
#include "syncAppendEntriesReply.h"
20
#include "syncCommit.h"
21
#include "syncElection.h"
22
#include "syncEnv.h"
23
#include "syncIndexMgr.h"
24
#include "syncInt.h"
25
#include "syncMessage.h"
26
#include "syncPipeline.h"
27
#include "syncRaftCfg.h"
28
#include "syncRaftLog.h"
29
#include "syncRaftStore.h"
30
#include "syncReplication.h"
31
#include "syncRequestVote.h"
32
#include "syncRequestVoteReply.h"
33
#include "syncRespMgr.h"
34
#include "syncSnapshot.h"
35
#include "syncTimeout.h"
36
#include "syncUtil.h"
37
#include "syncVoteMgr.h"
38
#include "tglobal.h"
39
#include "tmisce.h"
40
#include "tref.h"
41

42
static void    syncNodeEqPingTimer(void* param, void* tmrId);
43
static void    syncNodeEqElectTimer(void* param, void* tmrId);
44
static void    syncNodeEqHeartbeatTimer(void* param, void* tmrId);
45
static int32_t syncNodeAppendNoop(SSyncNode* ths);
46
static void    syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId);
47
static bool    syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg);
48
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId);
49
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
50
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
51
static int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg);
52
static bool    syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config);
53
static int32_t syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex);
54
static bool    syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg);
55

56
static bool    syncNodeCanChange(SSyncNode* pSyncNode);
57
static int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode);
58
static int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader);
59
static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry);
60

61
static ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
62

63
int64_t syncOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
4,043,733✔
64
  sInfo("vgId:%d, start to open sync", pSyncInfo->vgId);
4,043,733✔
65

66
  SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo, vnodeVersion, pSyncInfo->electMs, pSyncInfo->heartbeatMs);
4,043,733✔
67
  if (pSyncNode == NULL) {
4,044,058✔
68
    sError("vgId:%d, failed to open sync node", pSyncInfo->vgId);
×
69
    return -1;
×
70
  }
71

72
  pSyncNode->rid = syncNodeAdd(pSyncNode);
4,044,058✔
73
  if (pSyncNode->rid < 0) {
4,043,881✔
74
    syncNodeClose(pSyncNode);
×
75
    return -1;
×
76
  }
77

78
  pSyncNode->pingBaseLine = pSyncInfo->pingMs;
4,044,058✔
79
  pSyncNode->pingTimerMS = pSyncInfo->pingMs;
4,044,058✔
80
  pSyncNode->electBaseLine = pSyncInfo->electMs;
4,044,058✔
81
  pSyncNode->hbBaseLine = pSyncInfo->heartbeatMs;
4,043,881✔
82
  pSyncNode->heartbeatTimerMS = pSyncInfo->heartbeatMs;
4,044,058✔
83
  pSyncNode->msgcb = pSyncInfo->msgcb;
4,043,881✔
84
  sInfo("vgId:%d, sync opened, electBaseLine:%d, hbBaseLine:%d", pSyncInfo->vgId, pSyncNode->electBaseLine,
4,043,881✔
85
        pSyncNode->hbBaseLine);
86
  return pSyncNode->rid;
4,043,881✔
87
}
88

89
int32_t syncStart(int64_t rid) {
4,043,999✔
90
  int32_t    code = 0;
4,043,999✔
91
  int32_t    vgId = 0;
4,043,999✔
92
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
4,043,999✔
93
  if (pSyncNode == NULL) {
4,043,999✔
94
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
95
    if (terrno != 0) code = terrno;
×
96
    sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid);
×
97
    TAOS_RETURN(code);
×
98
  }
99
  vgId = pSyncNode->vgId;
4,043,999✔
100
  sInfo("vgId:%d, begin to start sync", pSyncNode->vgId);
4,043,999✔
101

102
  if ((code = syncNodeRestore(pSyncNode)) < 0) {
4,043,999✔
103
    sError("vgId:%d, failed to restore sync log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
104
    goto _err;
×
105
  }
106
  sInfo("vgId:%d, sync node restore is executed", pSyncNode->vgId);
4,043,999✔
107

108
  if ((code = syncNodeStart(pSyncNode)) < 0) {
4,043,999✔
109
    sError("vgId:%d, failed to start sync node since %s", pSyncNode->vgId, tstrerror(code));
×
110
    goto _err;
×
111
  }
112
  sInfo("vgId:%d, sync node start is executed", pSyncNode->vgId);
4,043,999✔
113

114
  syncNodeRelease(pSyncNode);
4,043,999✔
115

116
  sInfo("vgId:%d, sync started", vgId);
4,043,999✔
117

118
  TAOS_RETURN(code);
4,043,999✔
119

120
_err:
×
121
  syncNodeRelease(pSyncNode);
×
122
  TAOS_RETURN(code);
×
123
}
124

125
int32_t syncNodeGetConfig(int64_t rid, SSyncCfg* cfg) {
4,833,242✔
126
  int32_t    code = 0;
4,833,242✔
127
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
4,833,242✔
128

129
  if (pSyncNode == NULL) {
4,833,242✔
130
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
131
    if (terrno != 0) code = terrno;
×
132
    sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid);
×
133
    TAOS_RETURN(code);
×
134
  }
135

136
  *cfg = pSyncNode->raftCfg.cfg;
4,833,242✔
137

138
  syncNodeRelease(pSyncNode);
4,833,242✔
139

140
  return 0;
4,833,242✔
141
}
142

143
void syncStop(int64_t rid) {
4,043,999✔
144
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
4,043,999✔
145
  if (pSyncNode != NULL) {
4,043,999✔
146
    pSyncNode->isStart = false;
4,043,999✔
147
    syncNodeRelease(pSyncNode);
4,043,999✔
148
    syncNodeRemove(rid);
4,043,999✔
149
  }
150
}
4,043,999✔
151

152
void syncPreStop(int64_t rid) {
4,043,999✔
153
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
4,043,999✔
154
  if (pSyncNode != NULL) {
4,043,999✔
155
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
4,043,999✔
156
      sInfo("vgId:%d, stop snapshot receiver", pSyncNode->vgId);
96✔
157
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
96✔
158
    }
159
    syncNodePreClose(pSyncNode);
4,043,999✔
160
    syncNodeRelease(pSyncNode);
4,043,696✔
161
  }
162
}
4,043,999✔
163

164
void syncPostStop(int64_t rid) {
3,654,230✔
165
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
3,654,230✔
166
  if (pSyncNode != NULL) {
3,654,230✔
167
    syncNodePostClose(pSyncNode);
3,654,230✔
168
    syncNodeRelease(pSyncNode);
3,654,151✔
169
  }
170
}
3,654,052✔
171

172
static bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pCfg) {
332,204✔
173
  if (!syncNodeInConfig(pSyncNode, pCfg)) return false;
332,204✔
174
  return abs(pCfg->replicaNum - pSyncNode->replicaNum) <= 1;
332,204✔
175
}
176

177
int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) {
348,166✔
178
  int32_t    code = 0;
348,166✔
179
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
348,166✔
180
  if (pSyncNode == NULL) {
348,166✔
181
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
182
    if (terrno != 0) code = terrno;
×
183
    TAOS_RETURN(code);
×
184
  }
185

186
  if (pSyncNode->raftCfg.lastConfigIndex >= pNewCfg->lastIndex) {
348,166✔
187
    syncNodeRelease(pSyncNode);
15,962✔
188
    sInfo("vgId:%d, no need Reconfig, current index:%" PRId64 ", new index:%" PRId64, pSyncNode->vgId,
15,962✔
189
          pSyncNode->raftCfg.lastConfigIndex, pNewCfg->lastIndex);
190
    return 0;
15,962✔
191
  }
192

193
  if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) {
332,204✔
194
    syncNodeRelease(pSyncNode);
×
195
    code = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
×
196
    sError("vgId:%d, failed to reconfig since invalid new config", pSyncNode->vgId);
×
197
    TAOS_RETURN(code);
×
198
  }
199

200
  TAOS_CHECK_RETURN(syncNodeUpdateNewConfigIndex(pSyncNode, pNewCfg));
332,204✔
201

202
  if (syncNodeDoConfigChange(pSyncNode, pNewCfg, pNewCfg->lastIndex) != 0) {
332,204✔
203
    code = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
×
204
    sError("vgId:%d, failed to reconfig since do change error", pSyncNode->vgId);
×
205
    TAOS_RETURN(code);
×
206
  }
207

208
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER || pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
332,204✔
209
    // TODO check return value
210
    TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
306,902✔
211

212
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
4,910,432✔
213
      TAOS_CHECK_RETURN(syncHbTimerInit(pSyncNode, &pSyncNode->peerHeartbeatTimerArr[i], pSyncNode->replicasId[i]));
4,603,530✔
214
    }
215

216
    TAOS_CHECK_RETURN(syncNodeStartHeartbeatTimer(pSyncNode));
306,902✔
217
    // syncNodeReplicate(pSyncNode);
218
  }
219

220
  syncNodeRelease(pSyncNode);
332,204✔
221
  TAOS_RETURN(code);
332,204✔
222
}
223

224
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
359,335,806✔
225
  int32_t code = -1;
359,335,806✔
226
  if (!syncIsInit()) {
359,335,806✔
227
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
228
    if (terrno != 0) code = terrno;
×
229
    TAOS_RETURN(code);
×
230
  }
231

232
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
359,335,533✔
233
  if (pSyncNode == NULL) {
359,336,577✔
234
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
235
    if (terrno != 0) code = terrno;
×
236
    TAOS_RETURN(code);
×
237
  }
238

239
  switch (pMsg->msgType) {
359,336,577✔
240
    case TDMT_SYNC_HEARTBEAT:
23,174,960✔
241
      code = syncNodeOnHeartbeat(pSyncNode, pMsg);
23,174,960✔
242
      break;
23,174,960✔
243
    case TDMT_SYNC_HEARTBEAT_REPLY:
22,970,173✔
244
      code = syncNodeOnHeartbeatReply(pSyncNode, pMsg);
22,970,173✔
245
      break;
22,970,173✔
246
    case TDMT_SYNC_TIMEOUT:
25,897,087✔
247
      code = syncNodeOnTimeout(pSyncNode, pMsg);
25,897,087✔
248
      break;
25,880,388✔
249
    case TDMT_SYNC_TIMEOUT_ELECTION:
476,444✔
250
      code = syncNodeOnTimeout(pSyncNode, pMsg);
476,444✔
251
      break;
476,444✔
252
    case TDMT_SYNC_CLIENT_REQUEST:
72,242,935✔
253
      code = syncNodeOnClientRequest(pSyncNode, pMsg, NULL);
72,242,935✔
254
      break;
72,242,935✔
255
    case TDMT_SYNC_REQUEST_VOTE:
829,801✔
256
      code = syncNodeOnRequestVote(pSyncNode, pMsg);
829,801✔
257
      break;
829,801✔
258
    case TDMT_SYNC_REQUEST_VOTE_REPLY:
813,399✔
259
      code = syncNodeOnRequestVoteReply(pSyncNode, pMsg);
813,399✔
260
      break;
813,399✔
261
    case TDMT_SYNC_APPEND_ENTRIES:
94,603,936✔
262
      code = syncNodeOnAppendEntries(pSyncNode, pMsg);
94,603,936✔
263
      break;
94,603,936✔
264
    case TDMT_SYNC_APPEND_ENTRIES_REPLY:
94,401,806✔
265
      code = syncNodeOnAppendEntriesReply(pSyncNode, pMsg);
94,401,806✔
266
      break;
94,401,806✔
267
    case TDMT_SYNC_SNAPSHOT_SEND:
413,758✔
268
      code = syncNodeOnSnapshot(pSyncNode, pMsg);
413,758✔
269
      break;
413,758✔
270
    case TDMT_SYNC_SNAPSHOT_RSP:
412,010✔
271
      code = syncNodeOnSnapshotRsp(pSyncNode, pMsg);
412,010✔
272
      break;
412,010✔
273
    case TDMT_SYNC_LOCAL_CMD:
23,095,913✔
274
      code = syncNodeOnLocalCmd(pSyncNode, pMsg);
23,095,913✔
275
      break;
23,095,913✔
276
    case TDMT_SYNC_FORCE_FOLLOWER:
3,544✔
277
      code = syncForceBecomeFollower(pSyncNode, pMsg);
3,544✔
278
      break;
3,544✔
279
    case TDMT_SYNC_SET_ASSIGNED_LEADER:
811✔
280
      code = syncBecomeAssignedLeader(pSyncNode, pMsg);
811✔
281
      break;
16✔
282
    default:
×
283
      code = TSDB_CODE_MSG_NOT_PROCESSED;
×
284
  }
285

286
  syncNodeRelease(pSyncNode);
359,319,083✔
287
  if (code != 0) {
359,322,967✔
288
    sDebug("vgId:%d, failed to process sync msg:%p type:%s since %s", pSyncNode->vgId, pMsg, TMSG_INFO(pMsg->msgType),
5,797✔
289
           tstrerror(code));
290
  }
291
  TAOS_RETURN(code);
359,322,967✔
292
}
293

294
int32_t syncLeaderTransfer(int64_t rid) {
4,043,999✔
295
  int32_t    code = 0;
4,043,999✔
296
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
4,043,999✔
297
  if (pSyncNode == NULL) {
4,043,999✔
298
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
299
    if (terrno != 0) code = terrno;
×
300
    TAOS_RETURN(code);
×
301
  }
302

303
  int32_t ret = syncNodeLeaderTransfer(pSyncNode);
4,043,999✔
304
  syncNodeRelease(pSyncNode);
4,043,999✔
305
  return ret;
4,043,999✔
306
}
307

308
int32_t syncForceBecomeFollower(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
3,544✔
309
  SRaftId id = {0};
3,544✔
310
  syncNodeBecomeFollower(ths, id, "force election");
3,544✔
311

312
  SRpcMsg rsp = {
7,088✔
313
      .code = 0,
314
      .pCont = pRpcMsg->info.rsp,
3,544✔
315
      .contLen = pRpcMsg->info.rspLen,
3,544✔
316
      .info = pRpcMsg->info,
317
  };
318
  tmsgSendRsp(&rsp);
3,544✔
319

320
  return 0;
3,544✔
321
}
322

323
int32_t syncBecomeAssignedLeader(SSyncNode* ths, SRpcMsg* pRpcMsg) {
16✔
324
  int32_t code = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
16✔
325
  void*   pHead = NULL;
16✔
326
  int32_t contLen = 0;
16✔
327

328
  SVArbSetAssignedLeaderReq req = {0};
16✔
329
  if (tDeserializeSVArbSetAssignedLeaderReq((char*)pRpcMsg->pCont + sizeof(SMsgHead), pRpcMsg->contLen, &req) != 0) {
16✔
330
    sError("vgId:%d, failed to deserialize SVArbSetAssignedLeaderReq", ths->vgId);
×
331
    code = TSDB_CODE_INVALID_MSG;
×
332
    goto _OVER;
×
333
  }
334

335
  if (ths->arbTerm > req.arbTerm) {
16✔
336
    sInfo("vgId:%d, skip to set assigned leader, msg with lower term, local:%" PRId64 "msg:%" PRId64, ths->vgId,
×
337
          ths->arbTerm, req.arbTerm);
338
    goto _OVER;
×
339
  }
340

341
  ths->arbTerm = TMAX(req.arbTerm, ths->arbTerm);
16✔
342

343
  if (!req.force) {
16✔
344
    if (strncmp(req.memberToken, ths->arbToken, TSDB_ARB_TOKEN_SIZE) != 0) {
16✔
345
      sInfo("vgId:%d, skip to set assigned leader, token mismatch, local:%s, msg:%s", ths->vgId, ths->arbToken,
×
346
            req.memberToken);
347
      goto _OVER;
×
348
    }
349
  }
350

351
  if (ths->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
16✔
352
    code = TSDB_CODE_SUCCESS;
16✔
353
    raftStoreNextTerm(ths);
16✔
354
    if (terrno != TSDB_CODE_SUCCESS) {
16✔
355
      code = terrno;
×
356
      sError("vgId:%d, failed to set next term since:%s", ths->vgId, tstrerror(code));
×
357
      goto _OVER;
×
358
    }
359
    syncNodeBecomeAssignedLeader(ths);
16✔
360

361
    if ((code = syncNodeAppendNoop(ths)) < 0) {
16✔
362
      sError("vgId:%d, assigned leader failed to append noop entry since %s", ths->vgId, tstrerror(code));
×
363
    }
364
  }
365

366
  SVArbSetAssignedLeaderRsp rsp = {0};
16✔
367
  rsp.arbToken = req.arbToken;
16✔
368
  rsp.memberToken = req.memberToken;
16✔
369
  rsp.vgId = ths->vgId;
16✔
370

371
  contLen = tSerializeSVArbSetAssignedLeaderRsp(NULL, 0, &rsp);
16✔
372
  if (contLen <= 0) {
16✔
373
    code = TSDB_CODE_OUT_OF_MEMORY;
×
374
    sError("vgId:%d, failed to serialize SVArbSetAssignedLeaderRsp", ths->vgId);
×
375
    goto _OVER;
×
376
  }
377
  pHead = rpcMallocCont(contLen);
16✔
378
  if (!pHead) {
16✔
379
    code = terrno;
×
380
    sError("vgId:%d, failed to malloc memory for SVArbSetAssignedLeaderRsp", ths->vgId);
×
381
    goto _OVER;
×
382
  }
383
  if (tSerializeSVArbSetAssignedLeaderRsp(pHead, contLen, &rsp) <= 0) {
16✔
384
    code = TSDB_CODE_OUT_OF_MEMORY;
×
385
    sError("vgId:%d, failed to serialize SVArbSetAssignedLeaderRsp", ths->vgId);
×
386
    rpcFreeCont(pHead);
×
387
    goto _OVER;
×
388
  }
389

390
  code = TSDB_CODE_SUCCESS;
16✔
391

392
_OVER:;
16✔
393
  SRpcMsg rspMsg = {
16✔
394
      .code = code,
395
      .pCont = pHead,
396
      .contLen = contLen,
397
      .info = pRpcMsg->info,
398
  };
399

400
  tmsgSendRsp(&rspMsg);
16✔
401

402
  tFreeSVArbSetAssignedLeaderReq(&req);
16✔
403
  TAOS_RETURN(code);
16✔
404
}
405

406
int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) {
×
407
  int32_t    code = 0;
×
408
  SSyncNode* pNode = syncNodeAcquire(rid);
×
409
  if (pNode == NULL) {
×
410
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
411
    if (terrno != 0) code = terrno;
×
412
    TAOS_RETURN(code);
×
413
  }
414

415
  SRpcMsg rpcMsg = {0, .info.notFreeAhandle = 1};
×
416
  int32_t ret = syncRespMgrGetAndDel(pNode->pSyncRespMgr, seq, &rpcMsg.info);
×
417
  rpcMsg.code = TSDB_CODE_SYN_TIMEOUT;
×
418

419
  syncNodeRelease(pNode);
×
420
  if (ret == 1) {
×
421
    sInfo("send timeout response, seq:%" PRId64 " handle:%p ahandle:%p", seq, rpcMsg.info.handle, rpcMsg.info.ahandle);
×
422
    code = rpcSendResponse(&rpcMsg);
×
423
    return code;
×
424
  } else {
425
    sError("no message handle to send timeout response, seq:%" PRId64, seq);
×
426
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
427
  }
428
}
429

430
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) {
24,591,785✔
431
  SyncIndex minMatchIndex = SYNC_INDEX_INVALID;
24,591,785✔
432

433
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
73,873,046✔
434
    SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
49,281,261✔
435
    if (minMatchIndex == SYNC_INDEX_INVALID) {
49,281,261✔
436
      minMatchIndex = matchIndex;
26,002,795✔
437
    } else if (matchIndex > 0 && matchIndex < minMatchIndex) {
23,278,466✔
438
      minMatchIndex = matchIndex;
279,902✔
439
    }
440
  }
441
  return minMatchIndex;
24,591,785✔
442
}
443

444
static SyncIndex syncLogRetentionIndex(SSyncNode* pSyncNode, int64_t bytes) {
664,029✔
445
  return pSyncNode->pLogStore->syncLogIndexRetention(pSyncNode->pLogStore, bytes);
664,029✔
446
}
447

448
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
5,580,421✔
449
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
5,580,421✔
450
  int32_t    code = 0;
5,580,421✔
451
  if (pSyncNode == NULL) {
5,580,421✔
452
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
1,552✔
453
    if (terrno != 0) code = terrno;
1,552✔
454
    sError("sync begin snapshot error");
1,552✔
455
    TAOS_RETURN(code);
1,552✔
456
  }
457

458
  SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
5,578,869✔
459
  SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
5,578,869✔
460
  bool      isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);
5,578,869✔
461

462
  if (isEmpty || !(lastApplyIndex >= beginIndex && lastApplyIndex <= endIndex)) {
5,578,869✔
463
    sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", empty:%d, do not delete wal", lastApplyIndex, isEmpty);
27,666✔
464
    syncNodeRelease(pSyncNode);
27,666✔
465
    return 0;
27,666✔
466
  }
467

468
  int64_t logRetention = 0;
5,551,203✔
469

470
  if (syncNodeIsMnode(pSyncNode)) {
5,551,203✔
471
    // mnode
472
    logRetention = tsMndLogRetention;
745,627✔
473
  } else {
474
    // vnode
475
    if (pSyncNode->replicaNum > 1) {
4,805,576✔
476
      logRetention = SYNC_VNODE_LOG_RETENTION;
594,049✔
477
    }
478
  }
479

480
  if (pSyncNode->totalReplicaNum > 1) {
5,551,203✔
481
    if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER &&
664,339✔
482
        pSyncNode->state != TAOS_SYNC_STATE_LEARNER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
44,645✔
483
      sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " candidate or unknown state, do not delete wal",
310✔
484
              lastApplyIndex);
485
      syncNodeRelease(pSyncNode);
310✔
486
      return 0;
310✔
487
    }
488
    SyncIndex retentionIndex =
664,029✔
489
        TMAX(pSyncNode->minMatchIndex, syncLogRetentionIndex(pSyncNode, SYNC_WAL_LOG_RETENTION_SIZE));
664,029✔
490
    logRetention += TMAX(0, lastApplyIndex - retentionIndex);
664,029✔
491
  }
492

493
_DEL_WAL:
4,886,864✔
494

495
  do {
496
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
5,550,893✔
497
    SyncIndex          snapshotVer = walGetSnapshotVer(pData->pWal);
5,550,893✔
498
    SyncIndex          walCommitVer = walGetCommittedVer(pData->pWal);
5,550,893✔
499
    SyncIndex          wallastVer = walGetLastVer(pData->pWal);
5,550,893✔
500
    if (lastApplyIndex <= walCommitVer) {
5,550,893✔
501
      SyncIndex snapshottingIndex = atomic_load_64(&pSyncNode->snapshottingIndex);
5,550,893✔
502

503
      if (snapshottingIndex == SYNC_INDEX_INVALID) {
5,550,893✔
504
        atomic_store_64(&pSyncNode->snapshottingIndex, lastApplyIndex);
5,550,893✔
505
        pSyncNode->snapshottingTime = taosGetTimestampMs();
5,550,893✔
506

507
        code = walBeginSnapshot(pData->pWal, lastApplyIndex, logRetention);
5,550,893✔
508
        if (code == 0) {
5,550,384✔
509
          sNTrace(pSyncNode, "wal snapshot begin, index:%" PRId64 ", last apply index:%" PRId64,
5,549,681✔
510
                  pSyncNode->snapshottingIndex, lastApplyIndex);
511
        } else {
512
          sNError(pSyncNode, "wal snapshot begin error since:%s, index:%" PRId64 ", last apply index:%" PRId64,
703✔
513
                  terrstr(), pSyncNode->snapshottingIndex, lastApplyIndex);
514
          atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
703✔
515
        }
516

517
      } else {
518
        sNTrace(pSyncNode, "snapshotting for %" PRId64 ", do not delete wal for new-snapshot-index:%" PRId64,
×
519
                snapshottingIndex, lastApplyIndex);
520
      }
521
    }
522
  } while (0);
523

524
  syncNodeRelease(pSyncNode);
5,550,384✔
525
  TAOS_RETURN(code);
5,550,384✔
526
}
527

528
int32_t syncEndSnapshot(int64_t rid, bool forceTrim) {
5,578,166✔
529
  int32_t    code = 0;
5,578,166✔
530
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
5,578,166✔
531
  if (pSyncNode == NULL) {
5,578,166✔
532
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
533
    if (terrno != 0) code = terrno;
×
534
    sError("sync end snapshot error");
×
535
    TAOS_RETURN(code);
×
536
  }
537

538
  if (atomic_load_64(&pSyncNode->snapshottingIndex) != SYNC_INDEX_INVALID) {
5,578,166✔
539
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
5,550,190✔
540
    code = walEndSnapshot(pData->pWal, forceTrim);
5,550,190✔
541
    if (code != 0) {
5,550,190✔
542
      sNError(pSyncNode, "wal snapshot end error since:%s", tstrerror(code));
×
543
      syncNodeRelease(pSyncNode);
×
544
      TAOS_RETURN(code);
×
545
    } else {
546
      sNTrace(pSyncNode, "wal snapshot end, index:%" PRId64, atomic_load_64(&pSyncNode->snapshottingIndex));
5,550,190✔
547
      atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
5,550,190✔
548
    }
549
  }
550

551
  syncNodeRelease(pSyncNode);
5,578,166✔
552
  TAOS_RETURN(code);
5,578,166✔
553
}
554

555
bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) {
689,695,332✔
556
  if (pSyncNode == NULL) {
689,695,332✔
557
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
558
    sError("sync ready for read error");
×
559
    return false;
×
560
  }
561

562
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
689,695,332✔
563
    terrno = TSDB_CODE_SYN_NOT_LEADER;
31,572,954✔
564
    return false;
31,572,954✔
565
  }
566

567
  if (!pSyncNode->restoreFinish) {
658,081,965✔
568
    terrno = TSDB_CODE_SYN_RESTORING;
166,424✔
569
    return false;
183,990✔
570
  }
571

572
  return true;
657,985,848✔
573
}
574

575
bool syncIsReadyForRead(int64_t rid) {
575,979,928✔
576
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
575,979,928✔
577
  if (pSyncNode == NULL) {
576,048,747✔
578
    sError("sync ready for read error");
×
579
    return false;
×
580
  }
581

582
  bool ready = syncNodeIsReadyForRead(pSyncNode);
576,048,747✔
583

584
  syncNodeRelease(pSyncNode);
576,024,075✔
585
  return ready;
576,065,264✔
586
}
587

588
#ifdef BUILD_NO_CALL
589
bool syncSnapshotSending(int64_t rid) {
590
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
591
  if (pSyncNode == NULL) {
592
    return false;
593
  }
594

595
  bool b = syncNodeSnapshotSending(pSyncNode);
596
  syncNodeRelease(pSyncNode);
597
  return b;
598
}
599

600
bool syncSnapshotRecving(int64_t rid) {
601
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
602
  if (pSyncNode == NULL) {
603
    return false;
604
  }
605

606
  bool b = syncNodeSnapshotRecving(pSyncNode);
607
  syncNodeRelease(pSyncNode);
608
  return b;
609
}
610
#endif
611

612
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) {
4,043,999✔
613
  if (pSyncNode->peersNum == 0) {
4,043,999✔
614
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
2,727,263✔
615
    return 0;
2,727,263✔
616
  }
617

618
  int32_t ret = 0;
1,316,736✔
619
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && pSyncNode->replicaNum > 1) {
1,316,736✔
620
    SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0];
404,378✔
621
    if (pSyncNode->peersNum == 2) {
404,378✔
622
      SyncIndex matchIndex0 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
241,899✔
623
      SyncIndex matchIndex1 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[1]));
241,899✔
624
      if (matchIndex1 > matchIndex0) {
241,899✔
625
        newLeader = (pSyncNode->peersNodeInfo)[1];
4,108✔
626
      }
627
    }
628
    ret = syncNodeLeaderTransferTo(pSyncNode, newLeader);
404,378✔
629
  }
630

631
  return ret;
1,316,736✔
632
}
633

634
int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) {
404,378✔
635
  if (pSyncNode->replicaNum == 1) {
404,378✔
636
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
×
637
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
638
  }
639

640
  sNTrace(pSyncNode, "begin leader transfer to %s:%u", newLeader.nodeFqdn, newLeader.nodePort);
404,378✔
641

642
  SRpcMsg rpcMsg = {0};
404,378✔
643
  TAOS_CHECK_RETURN(syncBuildLeaderTransfer(&rpcMsg, pSyncNode->vgId));
404,378✔
644

645
  SyncLeaderTransfer* pMsg = rpcMsg.pCont;
404,378✔
646
  pMsg->newLeaderId.addr = SYNC_ADDR(&newLeader);
404,378✔
647
  pMsg->newLeaderId.vgId = pSyncNode->vgId;
404,378✔
648
  pMsg->newNodeInfo = newLeader;
404,378✔
649

650
  int32_t ret = syncNodePropose(pSyncNode, &rpcMsg, false, NULL);
404,378✔
651
  rpcFreeCont(rpcMsg.pCont);
404,378✔
652
  return ret;
404,378✔
653
}
654

655
int32_t syncResetTimer(int64_t rid, int32_t electInterval, int32_t heartbeatInterval) {
×
656
  int32_t code = 0;
×
657
  sInfo("sync Reset Timer, rid:%" PRId64, rid);
×
658
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
×
659
  if (pSyncNode == NULL) {
×
660
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
661
    if (terrno != 0) code = terrno;
×
662
    sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid);
×
663
    TAOS_RETURN(code);
×
664
  }
665
  pSyncNode->electBaseLine = electInterval;
×
666
  syncNodeResetElectTimer(pSyncNode);
×
667

668
  sInfo("vgId:%d, sync Reset Timer, rid:%" PRId64, pSyncNode->vgId, rid);
×
669
  code = syncNodeRestartHeartbeatTimer(pSyncNode, heartbeatInterval);
×
670

671
  syncNodeRelease(pSyncNode);
×
672
  return code;
×
673
}
674

675
SSyncState syncGetState(int64_t rid) {
715,627,284✔
676
  SSyncState state = {.state = TAOS_SYNC_STATE_ERROR};
715,627,284✔
677

678
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
715,627,284✔
679
  if (pSyncNode != NULL) {
715,636,139✔
680
    state.state = pSyncNode->state;
715,636,206✔
681
    state.roleTimeMs = pSyncNode->roleTimeMs;
715,635,912✔
682
    state.startTimeMs = pSyncNode->startTime;
715,631,245✔
683
    state.restored = pSyncNode->restoreFinish;
715,631,230✔
684
    if (pSyncNode->vgId != 1) {
715,629,835✔
685
      state.canRead = syncNodeIsReadyForRead(pSyncNode);
113,707,816✔
686
    } else {
687
      state.canRead = state.restored;
601,917,691✔
688
    }
689
    /*
690
    double progress = 0;
691
    if(pSyncNode->pLogBuf->totalIndex > 0 && pSyncNode->pLogBuf->commitIndex > 0){
692
      progress = (double)pSyncNode->pLogBuf->commitIndex/(double)pSyncNode->pLogBuf->totalIndex;
693
      state.progress = (int32_t)(progress * 100);
694
    }
695
    else{
696
      state.progress = -1;
697
    }
698
    sDebug("vgId:%d, learner progress state, commitIndex:%" PRId64 " totalIndex:%" PRId64 ", "
699
            "progress:%lf, progress:%d",
700
          pSyncNode->vgId,
701
         pSyncNode->pLogBuf->commitIndex, pSyncNode->pLogBuf->totalIndex, progress, state.progress);
702
    */
703
    state.term = raftStoreGetTerm(pSyncNode);
715,625,188✔
704
    syncNodeRelease(pSyncNode);
715,636,188✔
705
  }
706

707
  return state;
715,635,400✔
708
}
709

710
SSyncMetrics syncGetMetrics(int64_t rid) {
×
711
  SSyncMetrics metrics = {0};
×
712

713
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
×
714
  if (pSyncNode != NULL) {
×
715
    sDebug("vgId:%d, sync get metrics, wal_write_bytes:%" PRId64 ", wal_write_time:%" PRId64, pSyncNode->vgId,
×
716
           pSyncNode->wal_write_bytes, pSyncNode->wal_write_time);
717
    metrics.wal_write_bytes = atomic_load_64(&pSyncNode->wal_write_bytes);
×
718
    metrics.wal_write_time = atomic_load_64(&pSyncNode->wal_write_time);
×
719
    syncNodeRelease(pSyncNode);
×
720
  }
721
  return metrics;
×
722
}
723

724
void syncResetMetrics(int64_t rid, const SSyncMetrics* pOldMetrics) {
×
725
  if (pOldMetrics == NULL) return;
×
726

727
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
×
728
  if (pSyncNode != NULL) {
×
729
    // Atomically subtract the old metrics values from current metrics
730
    (void)atomic_sub_fetch_64(&pSyncNode->wal_write_bytes, pOldMetrics->wal_write_bytes);
×
731
    (void)atomic_sub_fetch_64(&pSyncNode->wal_write_time, pOldMetrics->wal_write_time);
×
732
    syncNodeRelease(pSyncNode);
×
733
  }
734
}
735

736
void syncGetCommitIndex(int64_t rid, int64_t* syncCommitIndex) {
109,978,978✔
737
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
109,978,978✔
738
  if (pSyncNode != NULL) {
109,978,978✔
739
    *syncCommitIndex = pSyncNode->commitIndex;
109,978,978✔
740
    syncNodeRelease(pSyncNode);
109,978,978✔
741
  }
742
}
109,978,978✔
743

744
int32_t syncGetArbToken(int64_t rid, char* outToken) {
19,279,960✔
745
  int32_t    code = 0;
19,279,960✔
746
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
19,279,960✔
747
  if (pSyncNode == NULL) {
19,279,960✔
748
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
749
    if (terrno != 0) code = terrno;
×
750
    TAOS_RETURN(code);
×
751
  }
752

753
  memset(outToken, 0, TSDB_ARB_TOKEN_SIZE);
19,279,960✔
754
  (void)taosThreadMutexLock(&pSyncNode->arbTokenMutex);
19,279,960✔
755
  tstrncpy(outToken, pSyncNode->arbToken, TSDB_ARB_TOKEN_SIZE);
19,279,960✔
756
  (void)taosThreadMutexUnlock(&pSyncNode->arbTokenMutex);
19,279,960✔
757

758
  syncNodeRelease(pSyncNode);
19,279,960✔
759
  TAOS_RETURN(code);
19,279,960✔
760
}
761

762
int32_t syncCheckSynced(int64_t rid) {
2,615✔
763
  int32_t    code = 0;
2,615✔
764
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
2,615✔
765
  if (pSyncNode == NULL) {
2,615✔
766
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
767
    if (terrno != 0) code = terrno;
×
768
    TAOS_RETURN(code);
×
769
  }
770

771
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
2,615✔
772
    code = TSDB_CODE_SYN_NOT_LEADER;
×
773
    syncNodeRelease(pSyncNode);
×
774
    TAOS_RETURN(code);
×
775
  }
776

777
  bool isSync = pSyncNode->commitIndex >= pSyncNode->assignedCommitIndex;
2,615✔
778
  code = (isSync ? TSDB_CODE_SUCCESS : TSDB_CODE_VND_ARB_NOT_SYNCED);
2,615✔
779
  if (!isSync) {
2,615✔
780
    sInfo("vgId:%d, not synced, assignedCommitIndex:%" PRId64 ", commitIndex:%" PRId64, pSyncNode->vgId,
×
781
          pSyncNode->assignedCommitIndex, pSyncNode->commitIndex);
782
  }
783

784
  syncNodeRelease(pSyncNode);
2,615✔
785
  TAOS_RETURN(code);
2,615✔
786
}
787

788
int32_t syncUpdateArbTerm(int64_t rid, SyncTerm arbTerm) {
101,609✔
789
  int32_t    code = 0;
101,609✔
790
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
101,609✔
791
  if (pSyncNode == NULL) {
101,609✔
792
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
793
    if (terrno != 0) code = terrno;
×
794
    TAOS_RETURN(code);
×
795
  }
796

797
  pSyncNode->arbTerm = TMAX(arbTerm, pSyncNode->arbTerm);
101,609✔
798
  syncNodeRelease(pSyncNode);
101,609✔
799
  TAOS_RETURN(code);
101,609✔
800
}
801

802
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex) {
207,947,141✔
803
  if (pSyncNode->raftCfg.configIndexCount < 1) {
207,947,141✔
804
    sError("vgId:%d, failed get snapshot config index, configIndexCount:%d", pSyncNode->vgId,
×
805
           pSyncNode->raftCfg.configIndexCount);
806
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
807
    return -2;
×
808
  }
809
  SyncIndex lastIndex = (pSyncNode->raftCfg.configIndexArr)[0];
207,947,141✔
810

811
  for (int32_t i = 0; i < pSyncNode->raftCfg.configIndexCount; ++i) {
421,237,338✔
812
    if ((pSyncNode->raftCfg.configIndexArr)[i] > lastIndex &&
213,290,197✔
813
        (pSyncNode->raftCfg.configIndexArr)[i] <= snapshotLastApplyIndex) {
5,343,056✔
814
      lastIndex = (pSyncNode->raftCfg.configIndexArr)[i];
5,327,409✔
815
    }
816
  }
817
  sTrace("vgId:%d, index:%" PRId64 ", get last snapshot config index:%" PRId64, pSyncNode->vgId, snapshotLastApplyIndex,
207,947,141✔
818
         lastIndex);
819

820
  return lastIndex;
207,947,099✔
821
}
822

823
static SRaftId syncGetRaftIdByEp(SSyncNode* pSyncNode, const SEp* pEp) {
71,137,363✔
824
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
119,435,192✔
825
    if (strcmp(pEp->fqdn, (pSyncNode->peersNodeInfo)[i].nodeFqdn) == 0 &&
78,887,645✔
826
        pEp->port == (pSyncNode->peersNodeInfo)[i].nodePort) {
78,888,608✔
827
      return pSyncNode->peersId[i];
30,591,082✔
828
    }
829
  }
830
  return EMPTY_RAFT_ID;
40,549,936✔
831
}
832

833
static void epsetToString(const SEpSet* pEpSet, char* buffer, size_t bufferSize) {
40,560,388✔
834
  if (pEpSet == NULL || buffer == NULL) {
40,560,388✔
835
    snprintf(buffer, bufferSize, "EpSet is NULL");
×
836
    return;
×
837
  }
838

839
  size_t offset = 0;
40,561,315✔
840
  offset += snprintf(buffer + offset, bufferSize - offset, "EpSet: [");
40,561,315✔
841

842
  for (int i = 0; i < pEpSet->numOfEps; ++i) {
111,703,558✔
843
    if (offset >= bufferSize) break;
71,135,522✔
844
    offset += snprintf(buffer + offset, bufferSize - offset, "%s:%d%s", pEpSet->eps[i].fqdn, pEpSet->eps[i].port,
71,133,114✔
845
                       (i + 1 < pEpSet->numOfEps) ? ", " : "");
71,135,522✔
846
  }
847

848
  if (offset < bufferSize) {
40,572,147✔
849
    snprintf(buffer + offset, bufferSize - offset, "]");
40,568,355✔
850
  }
851
}
852

853
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
40,561,549✔
854
  pEpSet->numOfEps = 0;
40,561,549✔
855

856
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
40,565,498✔
857
  if (pSyncNode == NULL) return;
40,571,040✔
858

859
  int index = -1;
40,571,040✔
860

861
  sDebug("vgId:%d, sync get retry epset, leaderCache:%" PRIx64 ", leaderCacheEp.fqdn:%s, leaderCacheEp.port:%d",
40,571,040✔
862
         pSyncNode->vgId, pSyncNode->leaderCache.addr, pSyncNode->leaderCacheEp.fqdn, pSyncNode->leaderCacheEp.port);
863
  int j = 0;
40,567,655✔
864
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
112,388,644✔
865
    if (pSyncNode->raftCfg.cfg.nodeInfo[i].nodeRole == TAOS_SYNC_ROLE_LEARNER) continue;
71,827,147✔
866
    SEp* pEp = &pEpSet->eps[j];
71,140,275✔
867
    tstrncpy(pEp->fqdn, pSyncNode->raftCfg.cfg.nodeInfo[i].nodeFqdn, TSDB_FQDN_LEN);
71,142,291✔
868
    pEp->port = (pSyncNode->raftCfg.cfg.nodeInfo)[i].nodePort;
71,143,786✔
869
    pEpSet->numOfEps++;
71,134,456✔
870
    SRaftId id = syncGetRaftIdByEp(pSyncNode, pEp);
71,137,458✔
871
    sDebug("vgId:%d, sync get retry epset, index:%d id:%" PRIx64 " %s:%d", pSyncNode->vgId, i, id.addr, pEp->fqdn,
71,140,339✔
872
           pEp->port);
873
    if (pEp->port == pSyncNode->leaderCacheEp.port &&
71,140,339✔
874
        strncmp(pEp->fqdn, pSyncNode->leaderCacheEp.fqdn, TSDB_FQDN_LEN) == 0 /*&&
30,072,809✔
875
        id.vgId == pSyncNode->leaderCache.vgId && id.addr != 0 && id.vgId != 0*/) {
876
      index = j;
30,073,778✔
877
    }
878
    j++;
71,137,877✔
879
  }
880
  if (pEpSet->numOfEps > 0) {
40,557,793✔
881
    if (index != -1) {
40,554,493✔
882
      pEpSet->inUse = index;
30,074,254✔
883
    } else {
884
      if (pSyncNode->myRaftId.addr == pSyncNode->leaderCache.addr &&
10,480,239✔
885
          pSyncNode->myRaftId.vgId == pSyncNode->leaderCache.vgId) {
×
886
        pEpSet->inUse = pSyncNode->raftCfg.cfg.myIndex;
×
887
      } else {
888
        pEpSet->inUse = (pSyncNode->raftCfg.cfg.myIndex + 1) % pEpSet->numOfEps;
10,480,026✔
889
      }
890
    }
891
    // pEpSet->inUse = 0;
892
  }
893
  epsetSort(pEpSet);
40,567,227✔
894

895
  char buffer[1024];
40,555,476✔
896
  epsetToString(pEpSet, buffer, sizeof(buffer));
40,561,498✔
897
  sDebug("vgId:%d, sync get retry epset numOfEps:%d %s inUse:%d", pSyncNode->vgId, pEpSet->numOfEps, buffer,
40,567,723✔
898
         pEpSet->inUse);
899
  syncNodeRelease(pSyncNode);
40,567,723✔
900
}
901

902
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
620,754,764✔
903
  int32_t    code = 0;
620,754,764✔
904
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
620,754,764✔
905
  if (pSyncNode == NULL) {
620,757,537✔
906
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
907
    if (terrno != 0) code = terrno;
×
908
    sError("sync propose error");
×
909
    TAOS_RETURN(code);
×
910
  }
911

912
  int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak, seq);
620,757,537✔
913
  syncNodeRelease(pSyncNode);
620,714,777✔
914
  return ret;
620,727,246✔
915
}
916

917
int32_t syncCheckMember(int64_t rid) {
×
918
  int32_t    code = 0;
×
919
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
×
920
  if (pSyncNode == NULL) {
×
921
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
922
    if (terrno != 0) code = terrno;
×
923
    sError("sync propose error");
×
924
    TAOS_RETURN(code);
×
925
  }
926

927
  if (pSyncNode->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_LEARNER) {
×
928
    syncNodeRelease(pSyncNode);
×
929
    return TSDB_CODE_SYN_WRONG_ROLE;
×
930
  }
931

932
  syncNodeRelease(pSyncNode);
×
933
  return 0;
×
934
}
935

936
int32_t syncIsCatchUp(int64_t rid) {
2,036,833✔
937
  int32_t    code = 0;
2,036,833✔
938
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
2,036,833✔
939
  if (pSyncNode == NULL) {
2,036,833✔
940
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
941
    if (terrno != 0) code = terrno;
×
942
    sError("sync Node Acquire error since %d", ERRNO);
×
943
    TAOS_RETURN(code);
×
944
  }
945

946
  int32_t isCatchUp = 0;
2,036,833✔
947
  if (pSyncNode->pLogBuf->totalIndex < 0 || pSyncNode->pLogBuf->commitIndex < 0 ||
2,036,833✔
948
      pSyncNode->pLogBuf->totalIndex < pSyncNode->pLogBuf->commitIndex ||
342,816✔
949
      pSyncNode->pLogBuf->totalIndex - pSyncNode->pLogBuf->commitIndex > SYNC_LEARNER_CATCHUP) {
342,609✔
950
    sInfo("vgId:%d, Not catch up, wait one second, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64,
1,934,504✔
951
          pSyncNode->vgId, pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex,
952
          pSyncNode->pLogBuf->matchIndex);
953
    isCatchUp = 0;
1,934,504✔
954
  } else {
955
    sInfo("vgId:%d, Catch up, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64, pSyncNode->vgId,
102,329✔
956
          pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex, pSyncNode->pLogBuf->matchIndex);
957
    isCatchUp = 1;
102,329✔
958
  }
959

960
  syncNodeRelease(pSyncNode);
2,036,833✔
961
  return isCatchUp;
2,036,833✔
962
}
963

964
ESyncRole syncGetRole(int64_t rid) {
2,036,833✔
965
  int32_t    code = 0;
2,036,833✔
966
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
2,036,833✔
967
  if (pSyncNode == NULL) {
2,036,833✔
968
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
969
    if (terrno != 0) code = terrno;
×
970
    sError("sync Node Acquire error since %d", ERRNO);
×
971
    TAOS_RETURN(code);
×
972
  }
973

974
  ESyncRole role = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole;
2,036,833✔
975

976
  syncNodeRelease(pSyncNode);
2,036,833✔
977
  return role;
2,036,833✔
978
}
979

980
int64_t syncGetTerm(int64_t rid) {
7,685,680✔
981
  int32_t    code = 0;
7,685,680✔
982
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
7,685,680✔
983
  if (pSyncNode == NULL) {
7,685,680✔
984
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
985
    if (terrno != 0) code = terrno;
×
986
    sError("sync Node Acquire error since %d", ERRNO);
×
987
    TAOS_RETURN(code);
×
988
  }
989

990
  int64_t term = raftStoreGetTerm(pSyncNode);
7,685,680✔
991

992
  syncNodeRelease(pSyncNode);
7,685,680✔
993
  return term;
7,685,680✔
994
}
995

996
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
621,157,479✔
997
  int32_t code = 0;
621,157,479✔
998
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
621,157,479✔
999
    code = TSDB_CODE_SYN_NOT_LEADER;
1,352,989✔
1000
    sNWarn(pSyncNode, "sync propose not leader, type:%s", TMSG_INFO(pMsg->msgType));
1,352,989✔
1001
    TAOS_RETURN(code);
1,352,989✔
1002
  }
1003

1004
  if (!pSyncNode->restoreFinish) {
619,812,969✔
1005
    code = TSDB_CODE_SYN_RESTORING;
22,135✔
1006
    sNWarn(pSyncNode, "failed to sync propose since not ready, type:%s, last:%" PRId64 ", cmt:%" PRId64,
22,135✔
1007
           TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
1008
    TAOS_RETURN(code);
22,135✔
1009
  }
1010

1011
  // heartbeat timeout
1012
  if (pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER && syncNodeHeartbeatReplyTimeout(pSyncNode)) {
619,786,788✔
1013
    code = TSDB_CODE_SYN_PROPOSE_NOT_READY;
98✔
1014
    sNError(pSyncNode, "failed to sync propose since heartbeat timeout, type:%s, last:%" PRId64 ", cmt:%" PRId64,
98✔
1015
            TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
1016
    TAOS_RETURN(code);
98✔
1017
  }
1018

1019
  // optimized one replica
1020
  if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) {
619,778,463✔
1021
    SyncIndex retIndex;
547,021,078✔
1022
    int32_t   code = syncNodeOnClientRequest(pSyncNode, pMsg, &retIndex);
547,025,485✔
1023
    if (code >= 0) {
546,947,975✔
1024
      pMsg->info.conn.applyIndex = retIndex;
546,947,975✔
1025
      pMsg->info.conn.applyTerm = raftStoreGetTerm(pSyncNode);
546,989,582✔
1026

1027
      // after raft member change, need to handle 1->2 switching point
1028
      // at this point, need to switch entry handling thread
1029
      if (pSyncNode->replicaNum == 1) {
547,024,660✔
1030
        sGDebug(&pMsg->info.traceId, "vgId:%d, index:%" PRId64 ", propose optimized msg type:%s", pSyncNode->vgId,
546,977,734✔
1031
                retIndex, TMSG_INFO(pMsg->msgType));
1032
        return 1;
546,975,166✔
1033
      } else {
1034
        sGDebug(&pMsg->info.traceId,
×
1035
                "vgId:%d, index:%" PRId64 ", propose optimized msg, return to normal, type:%s, handle:%p",
1036
                pSyncNode->vgId, retIndex, TMSG_INFO(pMsg->msgType), pMsg->info.handle);
1037
        return 0;
×
1038
      }
1039
    } else {
1040
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1041
      sError("vgId:%d, failed to propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
×
1042
             TMSG_INFO(pMsg->msgType));
1043
      TAOS_RETURN(code);
×
1044
    }
1045
  } else {
1046
    SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
72,769,409✔
1047
    uint64_t  seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);
72,770,124✔
1048
    SRpcMsg   rpcMsg = {.info.traceId = pMsg->info.traceId};
72,769,858✔
1049
    int32_t   code = syncBuildClientRequest(&rpcMsg, pMsg, seqNum, isWeak, pSyncNode->vgId);
72,769,818✔
1050
    if (code != 0) {
72,769,818✔
1051
      sError("vgId:%d, failed to propose msg while serialize since %s", pSyncNode->vgId, terrstr());
×
1052
      code = syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
×
1053
      TAOS_RETURN(code);
×
1054
    }
1055

1056
    sGDebug(&pMsg->info.traceId, "vgId:%d, msg:%p, propose msg, type:%s", pSyncNode->vgId, pMsg,
72,769,818✔
1057
            TMSG_INFO(pMsg->msgType));
1058
    code = (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg);
72,769,818✔
1059
    if (code != 0) {
72,769,818✔
1060
      sWarn("vgId:%d, failed to propose msg while enqueue since %s", pSyncNode->vgId, terrstr());
527,189✔
1061
      TAOS_CHECK_RETURN(syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum));
527,189✔
1062
    }
1063

1064
    if (seq != NULL) *seq = seqNum;
72,763,728✔
1065
    TAOS_RETURN(code);
72,763,728✔
1066
  }
1067
}
1068

1069
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId) {
65,262,351✔
1070
  pSyncTimer->pTimer = NULL;
65,262,351✔
1071
  pSyncTimer->counter = 0;
65,264,077✔
1072
  pSyncTimer->timerMS = pSyncNode->hbBaseLine;
65,264,077✔
1073
  pSyncTimer->timerCb = syncNodeEqPeerHeartbeatTimer;
65,263,374✔
1074
  pSyncTimer->destId = destId;
65,263,374✔
1075
  pSyncTimer->timeStamp = taosGetTimestampMs();
65,263,697✔
1076
  atomic_store_64(&pSyncTimer->logicClock, 0);
65,263,655✔
1077
  sInfo("vgId:%d, HbTimer init, timerMs:%d for addr:0x%" PRIx64, pSyncNode->vgId, pSyncTimer->timerMS, destId.addr);
65,263,697✔
1078
  return 0;
65,263,481✔
1079
}
1080

1081
static void syncHBSetTimerMS(SSyncTimer* pSyncTimer, int32_t ms) { pSyncTimer->timerMS = ms; }
×
1082

1083
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
922,257✔
1084
  int32_t code = 0;
922,257✔
1085
  int64_t tsNow = taosGetTimestampMs();
922,257✔
1086
  if (syncIsInit()) {
922,257✔
1087
    SSyncHbTimerData* pData = syncHbTimerDataAcquire(pSyncTimer->hbDataRid);
922,257✔
1088
    if (pData == NULL) {
922,257✔
1089
      pData = taosMemoryMalloc(sizeof(SSyncHbTimerData));
922,241✔
1090
      pData->rid = syncHbTimerDataAdd(pData);
922,241✔
1091
    }
1092
    pSyncTimer->hbDataRid = pData->rid;
922,257✔
1093
    pSyncTimer->timeStamp = tsNow;
922,257✔
1094

1095
    pData->syncNodeRid = pSyncNode->rid;
922,257✔
1096
    pData->pTimer = pSyncTimer;
922,257✔
1097
    pData->destId = pSyncTimer->destId;
922,257✔
1098
    pData->logicClock = pSyncTimer->logicClock;
922,257✔
1099
    pData->execTime = tsNow + pSyncTimer->timerMS;
922,257✔
1100

1101
    sInfo("vgId:%d, start hb timer, rid:%" PRId64 " addr:0x%" PRIx64 " at %d", pSyncNode->vgId, pData->rid,
922,257✔
1102
          pData->destId.addr, pSyncTimer->timerMS);
1103

1104
    bool stopped = taosTmrResetPriority(pSyncTimer->timerCb, pSyncTimer->timerMS, (void*)(pData->rid),
922,257✔
1105
                                        syncEnv()->pTimerManager, &pSyncTimer->pTimer, 2);
922,257✔
1106
    if (stopped) {
922,257✔
1107
      sWarn("vgId:%d, reset hb timer stopped:%d", pSyncNode->vgId, stopped);
16✔
1108
    }
1109
  } else {
1110
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1111
    sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId);
×
1112
  }
1113
  return code;
922,257✔
1114
}
1115

1116
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
9,899,219✔
1117
  int32_t ret = 0;
9,899,219✔
1118
  (void)atomic_add_fetch_64(&pSyncTimer->logicClock, 1);
9,899,219✔
1119
  bool stop = taosTmrStop(pSyncTimer->pTimer);
9,899,219✔
1120
  sDebug("vgId:%d, stop hb timer stop:%d", pSyncNode->vgId, stop);
9,899,301✔
1121
  pSyncTimer->pTimer = NULL;
9,899,301✔
1122
  syncHbTimerDataRemove(pSyncTimer->hbDataRid);
9,899,001✔
1123
  pSyncTimer->hbDataRid = -1;
9,898,740✔
1124
  return ret;
9,898,740✔
1125
}
1126

1127
int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) {
4,043,976✔
1128
  int32_t code = 0;
4,043,976✔
1129
  if (pNode->pLogStore == NULL) {
4,043,976✔
1130
    sError("vgId:%d, log store not created", pNode->vgId);
×
1131
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1132
  }
1133
  if (pNode->pFsm == NULL) {
4,044,058✔
1134
    sError("vgId:%d, pFsm not registered", pNode->vgId);
×
1135
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1136
  }
1137
  if (pNode->pFsm->FpGetSnapshotInfo == NULL) {
4,044,058✔
1138
    sError("vgId:%d, FpGetSnapshotInfo not registered", pNode->vgId);
×
1139
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1140
  }
1141
  SSnapshot snapshot = {0};
4,044,058✔
1142
  // TODO check return value
1143
  (void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
4,044,058✔
1144

1145
  SyncIndex commitIndex = snapshot.lastApplyIndex;
4,044,058✔
1146
  SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
4,044,058✔
1147
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
4,044,058✔
1148
  if ((lastVer < commitIndex || firstVer > commitIndex + 1) || pNode->fsmState == SYNC_FSM_STATE_INCOMPLETE) {
4,044,058✔
1149
    sInfo("vgId:%d, restore log store from snapshot, firstVer:%" PRId64 ", lastVer:%" PRId64 ", commitIndex:%" PRId64,
703✔
1150
          pNode->vgId, firstVer, lastVer, commitIndex);
1151
    if ((code = pNode->pLogStore->syncLogRestoreFromSnapshot(pNode->pLogStore, commitIndex)) != 0) {
703✔
1152
      sError("vgId:%d, failed to restore log store from snapshot since %s. lastVer:%" PRId64 ", snapshotVer:%" PRId64,
×
1153
             pNode->vgId, terrstr(), lastVer, commitIndex);
1154
      TAOS_RETURN(code);
×
1155
    }
1156
  }
1157
  TAOS_RETURN(code);
4,044,058✔
1158
}
1159

1160
// open/close --------------
1161
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion, int32_t electInterval, int32_t heartbeatInterval) {
4,043,894✔
1162
  int32_t    code = 0;
4,043,894✔
1163
  SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode));
4,043,894✔
1164
  if (pSyncNode == NULL) {
4,044,058✔
1165
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1166
    goto _error;
×
1167
  }
1168

1169
  if (!taosDirExist((char*)(pSyncInfo->path))) {
4,044,058✔
1170
    if (taosMkDir(pSyncInfo->path) != 0) {
2,911,266✔
1171
      terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
1172
      sError("vgId:%d, failed to create dir:%s since %s", pSyncInfo->vgId, pSyncInfo->path, terrstr());
×
1173
      goto _error;
×
1174
    }
1175
  }
1176

1177
  memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
4,043,349✔
1178
  snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s%sraft_store.json", pSyncInfo->path,
4,044,058✔
1179
           TD_DIRSEP);
1180
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
4,043,349✔
1181

1182
  if (!taosCheckExistFile(pSyncNode->configPath)) {
4,044,058✔
1183
    // create a new raft config file
1184
    sInfo("vgId:%d, create a new raft config file", pSyncInfo->vgId);
2,911,975✔
1185
    pSyncNode->vgId = pSyncInfo->vgId;
2,911,975✔
1186
    pSyncNode->mountVgId = pSyncInfo->mountVgId;
2,911,975✔
1187
    pSyncNode->raftCfg.isStandBy = pSyncInfo->isStandBy;
2,911,975✔
1188
    pSyncNode->raftCfg.snapshotStrategy = pSyncInfo->snapshotStrategy;
2,911,975✔
1189
    pSyncNode->raftCfg.lastConfigIndex = pSyncInfo->syncCfg.lastIndex;
2,911,975✔
1190
    pSyncNode->raftCfg.batchSize = pSyncInfo->batchSize;
2,911,975✔
1191
    pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
2,911,975✔
1192
    pSyncNode->raftCfg.configIndexCount = 1;
2,911,975✔
1193
    pSyncNode->raftCfg.configIndexArr[0] = -1;
2,911,975✔
1194

1195
    if ((code = syncWriteCfgFile(pSyncNode, "new")) != 0) {
2,911,975✔
1196
      terrno = code;
×
1197
      sError("vgId:%d, failed to create sync cfg file", pSyncNode->vgId);
×
1198
      goto _error;
×
1199
    }
1200
  } else {
1201
    // update syncCfg by raft_config.json
1202
    if ((code = syncReadCfgFile(pSyncNode)) != 0) {
1,132,001✔
1203
      terrno = code;
×
1204
      sError("vgId:%d, failed to read sync cfg file", pSyncNode->vgId);
×
1205
      goto _error;
×
1206
    }
1207

1208
    if (vnodeVersion > pSyncNode->raftCfg.cfg.changeVersion) {
1,132,083✔
1209
      sInfo("vgId:%d, is going to judge update, in SyncInfo, totalReplicaNum:%d", pSyncNode->vgId,
839,546✔
1210
            pSyncInfo->syncCfg.totalReplicaNum);
1211
      if (pSyncInfo->syncCfg.totalReplicaNum > 0 && syncIsConfigChanged(&pSyncNode->raftCfg.cfg, &pSyncInfo->syncCfg)) {
839,546✔
1212
        sInfo("vgId:%d, use sync config from input options and write to cfg file", pSyncNode->vgId);
756,414✔
1213
        pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
756,414✔
1214
        if ((code = syncWriteCfgFile(pSyncNode, "changed")) != 0) {
756,414✔
1215
          terrno = code;
×
1216
          sError("vgId:%d, failed to write sync cfg file", pSyncNode->vgId);
×
1217
          goto _error;
×
1218
        }
1219
      } else {
1220
        sInfo("vgId:%d, use sync config from sync cfg file", pSyncNode->vgId);
83,132✔
1221
        pSyncInfo->syncCfg = pSyncNode->raftCfg.cfg;
83,132✔
1222
      }
1223
    } else {
1224
      sInfo("vgId:%d, skip save sync cfg file since request ver:%d <= file ver:%d", pSyncNode->vgId, vnodeVersion,
292,537✔
1225
            pSyncInfo->syncCfg.changeVersion);
1226
    }
1227
  }
1228

1229
  // init by SSyncInfo
1230
  pSyncNode->vgId = pSyncInfo->vgId;
4,043,382✔
1231
  pSyncNode->mountVgId = pSyncInfo->mountVgId;
4,044,058✔
1232
  SSyncCfg* pCfg = &pSyncNode->raftCfg.cfg;
4,043,382✔
1233
  bool      updated = false;
4,043,382✔
1234
  sInfo("vgId:%d, start to open sync node, totalReplicaNum:%d replicaNum:%d selfIndex:%d", pSyncNode->vgId,
4,043,382✔
1235
        pCfg->totalReplicaNum, pCfg->replicaNum, pCfg->myIndex);
1236
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
10,851,541✔
1237
    SNodeInfo* pNode = &pCfg->nodeInfo[i];
6,807,562✔
1238
    if (tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort)) {
6,807,562✔
1239
      updated = true;
×
1240
    }
1241
    sInfo("vgId:%d, index:%d ep:%s:%u dnode:%d cluster:%" PRId64, pSyncNode->vgId, i, pNode->nodeFqdn, pNode->nodePort,
6,807,562✔
1242
          pNode->nodeId, pNode->clusterId);
1243
  }
1244

1245
  if (vnodeVersion > pSyncInfo->syncCfg.changeVersion) {
4,044,058✔
1246
    if (updated) {
389,828✔
1247
      sInfo("vgId:%d, save config info since dnode info changed", pSyncNode->vgId);
×
1248
      if ((code = syncWriteCfgFile(pSyncNode, "updated")) != 0) {
×
1249
        terrno = code;
×
1250
        sError("vgId:%d, failed to write sync cfg file on dnode info updated", pSyncNode->vgId);
×
1251
        goto _error;
×
1252
      }
1253
    }
1254
  }
1255

1256
  pSyncNode->pWal = pSyncInfo->pWal;
4,044,058✔
1257
  pSyncNode->msgcb = pSyncInfo->msgcb;
4,044,058✔
1258
  pSyncNode->syncSendMSg = pSyncInfo->syncSendMSg;
4,044,058✔
1259
  pSyncNode->syncEqMsg = pSyncInfo->syncEqMsg;
4,044,058✔
1260
  pSyncNode->syncEqCtrlMsg = pSyncInfo->syncEqCtrlMsg;
4,044,058✔
1261

1262
  // create raft log ring buffer
1263
  code = syncLogBufferCreate(&pSyncNode->pLogBuf);
4,044,058✔
1264
  if (pSyncNode->pLogBuf == NULL) {
4,044,058✔
1265
    sError("failed to init sync log buffer since %s. vgId:%d", tstrerror(code), pSyncNode->vgId);
×
1266
    goto _error;
×
1267
  }
1268

1269
  // init replicaNum, replicasId
1270
  pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
4,044,058✔
1271
  pSyncNode->totalReplicaNum = pSyncNode->raftCfg.cfg.totalReplicaNum;
4,043,794✔
1272
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
10,851,620✔
1273
    if (syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]) ==
6,807,562✔
1274
        false) {
1275
      sError("vgId:%d, failed to determine raft member id, replica:%d", pSyncNode->vgId, i);
×
1276
      goto _error;
×
1277
    }
1278
  }
1279

1280
  // init internal
1281
  pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
4,044,058✔
1282
  pSyncNode->myRaftId = pSyncNode->replicasId[pSyncNode->raftCfg.cfg.myIndex];
4,044,058✔
1283

1284
  // init peersNum, peers, peersId
1285
  pSyncNode->peersNum = pSyncNode->raftCfg.cfg.totalReplicaNum - 1;
4,044,058✔
1286
  int32_t j = 0;
4,043,507✔
1287
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
10,851,069✔
1288
    if (i != pSyncNode->raftCfg.cfg.myIndex) {
6,807,011✔
1289
      pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
2,763,519✔
1290
      pSyncNode->peersId[j] = pSyncNode->replicasId[i];
2,763,519✔
1291
      syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
2,763,519✔
1292
      j++;
2,764,070✔
1293
    }
1294
  }
1295

1296
  pSyncNode->arbTerm = -1;
4,044,058✔
1297
  (void)taosThreadMutexInit(&pSyncNode->arbTokenMutex, NULL);
4,044,058✔
1298
  syncUtilGenerateArbToken(pSyncNode->myNodeInfo.nodeId, pSyncInfo->vgId, pSyncNode->arbToken);
4,044,058✔
1299
  sInfo("vgId:%d, generate arb token:%s", pSyncNode->vgId, pSyncNode->arbToken);
4,043,671✔
1300

1301
  // init raft algorithm
1302
  pSyncNode->pFsm = pSyncInfo->pFsm;
4,043,671✔
1303
  pSyncInfo->pFsm = NULL;
4,044,058✔
1304
  pSyncNode->quorum = syncUtilQuorum(pSyncNode->raftCfg.cfg.replicaNum);
4,044,058✔
1305
  pSyncNode->leaderCache = EMPTY_RAFT_ID;
4,044,058✔
1306
  pSyncNode->leaderCacheEp.port = 0;
4,044,058✔
1307
  pSyncNode->leaderCacheEp.fqdn[0] = '\0';
4,044,058✔
1308

1309
  // init life cycle outside
1310

1311
  // TLA+ Spec
1312
  // InitHistoryVars == /\ elections = {}
1313
  //                    /\ allLogs   = {}
1314
  //                    /\ voterLog  = [i \in Server |-> [j \in {} |-> <<>>]]
1315
  // InitServerVars == /\ currentTerm = [i \in Server |-> 1]
1316
  //                   /\ state       = [i \in Server |-> Follower]
1317
  //                   /\ votedFor    = [i \in Server |-> Nil]
1318
  // InitCandidateVars == /\ votesResponded = [i \in Server |-> {}]
1319
  //                      /\ votesGranted   = [i \in Server |-> {}]
1320
  // \* The values nextIndex[i][i] and matchIndex[i][i] are never read, since the
1321
  // \* leader does not send itself messages. It's still easier to include these
1322
  // \* in the functions.
1323
  // InitLeaderVars == /\ nextIndex  = [i \in Server |-> [j \in Server |-> 1]]
1324
  //                   /\ matchIndex = [i \in Server |-> [j \in Server |-> 0]]
1325
  // InitLogVars == /\ log          = [i \in Server |-> << >>]
1326
  //                /\ commitIndex  = [i \in Server |-> 0]
1327
  // Init == /\ messages = [m \in {} |-> 0]
1328
  //         /\ InitHistoryVars
1329
  //         /\ InitServerVars
1330
  //         /\ InitCandidateVars
1331
  //         /\ InitLeaderVars
1332
  //         /\ InitLogVars
1333
  //
1334

1335
  // init TLA+ server vars
1336
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
4,044,058✔
1337
  pSyncNode->roleTimeMs = taosGetTimestampMs();
4,044,058✔
1338
  if ((code = raftStoreOpen(pSyncNode)) != 0) {
4,044,058✔
1339
    terrno = code;
×
1340
    sError("vgId:%d, failed to open raft store at path %s", pSyncNode->vgId, pSyncNode->raftStorePath);
×
1341
    goto _error;
×
1342
  }
1343

1344
  // init TLA+ candidate vars
1345
  pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode);
4,044,058✔
1346
  if (pSyncNode->pVotesGranted == NULL) {
4,044,058✔
1347
    sError("vgId:%d, failed to create VotesGranted", pSyncNode->vgId);
×
1348
    goto _error;
×
1349
  }
1350
  pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode);
4,043,382✔
1351
  if (pSyncNode->pVotesRespond == NULL) {
4,043,382✔
1352
    sError("vgId:%d, failed to create VotesRespond", pSyncNode->vgId);
×
1353
    goto _error;
×
1354
  }
1355

1356
  // init TLA+ leader vars
1357
  pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode);
4,042,706✔
1358
  if (pSyncNode->pNextIndex == NULL) {
4,043,382✔
1359
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
×
1360
    goto _error;
×
1361
  }
1362
  pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode);
4,042,995✔
1363
  if (pSyncNode->pMatchIndex == NULL) {
4,042,673✔
1364
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
×
1365
    goto _error;
×
1366
  }
1367

1368
  // init TLA+ log vars
1369
  pSyncNode->pLogStore = logStoreCreate(pSyncNode);
4,041,610✔
1370
  if (pSyncNode->pLogStore == NULL) {
4,044,058✔
1371
    sError("vgId:%d, failed to create SyncLogStore", pSyncNode->vgId);
×
1372
    goto _error;
×
1373
  }
1374

1375
  SyncIndex commitIndex = SYNC_INDEX_INVALID;
4,042,983✔
1376
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
4,042,983✔
1377
    SSnapshot snapshot = {0};
4,043,676✔
1378
    // TODO check return value
1379
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
4,043,499✔
1380
    if (snapshot.lastApplyIndex > commitIndex) {
4,043,676✔
1381
      commitIndex = snapshot.lastApplyIndex;
644,103✔
1382
      sNTrace(pSyncNode, "reset commit index by snapshot");
644,103✔
1383
    }
1384
    pSyncNode->fsmState = snapshot.state;
4,043,676✔
1385
    if (pSyncNode->fsmState == SYNC_FSM_STATE_INCOMPLETE) {
4,043,112✔
1386
      sError("vgId:%d, fsm state is incomplete.", pSyncNode->vgId);
×
1387
      if (pSyncNode->replicaNum == 1) {
×
1388
        terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1389
        goto _error;
×
1390
      }
1391
    }
1392
  }
1393
  pSyncNode->commitIndex = commitIndex;
4,040,311✔
1394
  sInfo("vgId:%d, sync node commitIndex initialized as %" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
4,041,359✔
1395

1396
  // restore log store on need
1397
  if ((code = syncNodeLogStoreRestoreOnNeed(pSyncNode)) < 0) {
4,043,117✔
1398
    terrno = code;
×
1399
    sError("vgId:%d, failed to restore log store since %s.", pSyncNode->vgId, terrstr());
×
1400
    goto _error;
×
1401
  }
1402

1403
  // timer ms init
1404
  pSyncNode->pingBaseLine = PING_TIMER_MS;
4,044,058✔
1405
  pSyncNode->electBaseLine = electInterval;
4,044,058✔
1406
  pSyncNode->hbBaseLine = heartbeatInterval;
4,044,058✔
1407

1408
  // init ping timer
1409
  pSyncNode->pPingTimer = NULL;
4,044,058✔
1410
  pSyncNode->pingTimerMS = pSyncNode->pingBaseLine;
4,044,058✔
1411
  atomic_store_64(&pSyncNode->pingTimerLogicClock, 0);
4,044,058✔
1412
  atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0);
4,044,058✔
1413
  pSyncNode->FpPingTimerCB = syncNodeEqPingTimer;
4,044,058✔
1414
  pSyncNode->pingTimerCounter = 0;
4,043,735✔
1415

1416
  // init elect timer
1417
  pSyncNode->pElectTimer = NULL;
4,044,058✔
1418
  pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
4,044,058✔
1419
  atomic_store_64(&pSyncNode->electTimerLogicClock, 0);
4,044,058✔
1420
  pSyncNode->FpElectTimerCB = syncNodeEqElectTimer;
4,044,058✔
1421
  pSyncNode->electTimerCounter = 0;
4,044,058✔
1422

1423
  // init heartbeat timer
1424
  pSyncNode->pHeartbeatTimer = NULL;
4,044,058✔
1425
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
4,044,058✔
1426
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0);
4,044,058✔
1427
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0);
4,044,058✔
1428
#ifdef BUILD_NO_CALL
1429
  pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer;
1430
#endif
1431
  pSyncNode->heartbeatTimerCounter = 0;
4,044,058✔
1432

1433
  // init peer heartbeat timer
1434
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
64,704,928✔
1435
    if ((code = syncHbTimerInit(pSyncNode, &(pSyncNode->peerHeartbeatTimerArr[i]), (pSyncNode->replicasId)[i])) != 0) {
60,660,870✔
1436
      terrno = code;
×
1437
      goto _error;
×
1438
    }
1439
  }
1440

1441
  // tools
1442
  if ((code = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS, &pSyncNode->pSyncRespMgr)) != 0) {
4,044,058✔
1443
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
×
1444
    goto _error;
×
1445
  }
1446
  if (pSyncNode->pSyncRespMgr == NULL) {
4,044,058✔
1447
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
×
1448
    goto _error;
×
1449
  }
1450

1451
  // restore state
1452
  pSyncNode->restoreFinish = false;
4,044,058✔
1453

1454
  // snapshot senders
1455
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
64,702,484✔
1456
    SSyncSnapshotSender* pSender = NULL;
60,658,426✔
1457
    code = snapshotSenderCreate(pSyncNode, i, &pSender);
60,657,481✔
1458
    if (pSender == NULL) return NULL;
60,658,426✔
1459

1460
    pSyncNode->senders[i] = pSender;
60,658,426✔
1461
    sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender);
60,658,092✔
1462
  }
1463

1464
  // snapshot receivers
1465
  code = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID, &pSyncNode->pNewNodeReceiver);
4,044,058✔
1466
  if (pSyncNode->pNewNodeReceiver == NULL) return NULL;
4,043,671✔
1467
  sRDebug(pSyncNode->pNewNodeReceiver, "snapshot receiver create while open sync node, data:%p",
4,043,024✔
1468
          pSyncNode->pNewNodeReceiver);
1469

1470
  // is config changing
1471
  pSyncNode->changing = false;
4,043,024✔
1472

1473
  // replication mgr
1474
  if ((code = syncNodeLogReplInit(pSyncNode)) < 0) {
4,043,024✔
1475
    terrno = code;
×
1476
    sError("vgId:%d, failed to init repl mgr since %s.", pSyncNode->vgId, terrstr());
×
1477
    goto _error;
×
1478
  }
1479

1480
  // peer state
1481
  if ((code = syncNodePeerStateInit(pSyncNode)) < 0) {
4,044,058✔
1482
    terrno = code;
×
1483
    sError("vgId:%d, failed to init peer stat since %s.", pSyncNode->vgId, terrstr());
×
1484
    goto _error;
×
1485
  }
1486

1487
  //
1488
  // min match index
1489
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
4,044,058✔
1490

1491
  // start in syncNodeStart
1492
  // start raft
1493

1494
  int64_t timeNow = taosGetTimestampMs();
4,044,058✔
1495
  pSyncNode->startTime = timeNow;
4,044,058✔
1496
  pSyncNode->lastReplicateTime = timeNow;
4,044,058✔
1497

1498
  // snapshotting
1499
  atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
4,044,058✔
1500

1501
  // init log buffer
1502
  if ((code = syncLogBufferInit(pSyncNode->pLogBuf, pSyncNode)) < 0) {
4,044,058✔
1503
    terrno = code;
×
1504
    sError("vgId:%d, failed to init sync log buffer since %s", pSyncNode->vgId, terrstr());
×
1505
    goto _error;
×
1506
  }
1507

1508
  pSyncNode->isStart = true;
4,044,058✔
1509
  pSyncNode->electNum = 0;
4,044,058✔
1510
  pSyncNode->becomeLeaderNum = 0;
4,043,799✔
1511
  pSyncNode->becomeAssignedLeaderNum = 0;
4,043,881✔
1512
  pSyncNode->configChangeNum = 0;
4,043,759✔
1513
  pSyncNode->hbSlowNum = 0;
4,043,759✔
1514
  pSyncNode->hbrSlowNum = 0;
4,043,759✔
1515
  pSyncNode->tmrRoutineNum = 0;
4,043,976✔
1516

1517
  sNInfo(pSyncNode, "sync node opened, node:%p electBaseLine:%d hbBaseLine:%d heartbeatTimeout:%d", pSyncNode,
4,043,799✔
1518
         pSyncNode->electBaseLine, pSyncNode->hbBaseLine, tsHeartbeatTimeout);
1519
  return pSyncNode;
4,044,058✔
1520

UNCOV
1521
_error:
×
UNCOV
1522
  if (pSyncInfo->pFsm) {
×
1523
    taosMemoryFree(pSyncInfo->pFsm);
×
1524
    pSyncInfo->pFsm = NULL;
×
1525
  }
1526
  syncNodeClose(pSyncNode);
×
1527
  pSyncNode = NULL;
×
1528
  return NULL;
×
1529
}
1530

1531
#ifdef BUILD_NO_CALL
1532
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
1533
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
1534
    SSnapshot snapshot = {0};
1535
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
1536
    if (snapshot.lastApplyIndex > pSyncNode->commitIndex) {
1537
      pSyncNode->commitIndex = snapshot.lastApplyIndex;
1538
    }
1539
  }
1540
}
1541
#endif
1542

1543
int32_t syncNodeRestore(SSyncNode* pSyncNode) {
4,043,674✔
1544
  int32_t code = 0;
4,043,674✔
1545
  if (pSyncNode->pLogStore == NULL) {
4,043,674✔
1546
    sError("vgId:%d, log store not created", pSyncNode->vgId);
×
1547
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1548
  }
1549
  if (pSyncNode->pLogBuf == NULL) {
4,043,999✔
1550
    sError("vgId:%d, ring log buffer not created", pSyncNode->vgId);
×
1551
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1552
  }
1553

1554
  (void)taosThreadMutexLock(&pSyncNode->pLogBuf->mutex);
4,043,999✔
1555
  SyncIndex lastVer = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
4,043,999✔
1556
  SyncIndex commitIndex = pSyncNode->pLogStore->syncLogCommitIndex(pSyncNode->pLogStore);
4,043,999✔
1557
  SyncIndex endIndex = pSyncNode->pLogBuf->endIndex;
4,043,999✔
1558
  (void)taosThreadMutexUnlock(&pSyncNode->pLogBuf->mutex);
4,043,999✔
1559

1560
  if (lastVer != -1 && endIndex != lastVer + 1) {
4,043,999✔
1561
    code = TSDB_CODE_WAL_LOG_INCOMPLETE;
×
1562
    sWarn("vgId:%d, failed to restore sync node since %s. expected lastLogIndex:%" PRId64 ", lastVer:%" PRId64,
×
1563
          pSyncNode->vgId, terrstr(), endIndex - 1, lastVer);
1564
    // TAOS_RETURN(code);
1565
  }
1566

1567
  // if (endIndex != lastVer + 1) return TSDB_CODE_SYN_INTERNAL_ERROR;
1568
  pSyncNode->commitIndex = TMAX(pSyncNode->commitIndex, commitIndex);
4,043,999✔
1569
  sInfo("vgId:%d, restore began, and keep syncing until commitIndex:%" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
4,043,999✔
1570

1571
  if (pSyncNode->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
8,087,732✔
1572
      (code = syncLogBufferCommit(pSyncNode->pLogBuf, pSyncNode, pSyncNode->commitIndex, NULL, "restore")) < 0) {
4,043,999✔
1573
    TAOS_RETURN(code);
×
1574
  }
1575

1576
  TAOS_RETURN(code);
4,043,733✔
1577
}
1578

1579
int32_t syncNodeStart(SSyncNode* pSyncNode) {
4,043,999✔
1580
  // start raft
1581
  sInfo("vgId:%d, begin to start sync node", pSyncNode->vgId);
4,043,999✔
1582
  if (pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole == TAOS_SYNC_ROLE_LEARNER) {
4,043,735✔
1583
    syncNodeBecomeLearner(pSyncNode, "first start");
104,313✔
1584
  } else {
1585
    if (pSyncNode->replicaNum == 1) {
3,939,686✔
1586
      raftStoreNextTerm(pSyncNode);
2,778,332✔
1587
      syncNodeBecomeLeader(pSyncNode, "one replica start");
2,777,601✔
1588

1589
      // Raft 3.6.2 Committing entries from previous terms
1590
      TAOS_CHECK_RETURN(syncNodeAppendNoop(pSyncNode));
2,778,332✔
1591
    } else {
1592
      SRaftId id = {0};
1,161,354✔
1593
      syncNodeBecomeFollower(pSyncNode, id, "first start");
1,161,354✔
1594
    }
1595
  }
1596

1597
  int32_t ret = 0;
4,043,999✔
1598
  ret = syncNodeStartPingTimer(pSyncNode);
4,043,999✔
1599
  if (ret != 0) {
4,043,999✔
1600
    sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, tstrerror(ret));
×
1601
  }
1602
  sInfo("vgId:%d, sync node started", pSyncNode->vgId);
4,043,999✔
1603
  return ret;
4,043,819✔
1604
}
1605

1606
#ifdef BUILD_NO_CALL
1607
int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) {
1608
  // state change
1609
  int32_t code = 0;
1610
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
1611
  pSyncNode->roleTimeMs = taosGetTimestampMs();
1612
  // TODO check return value
1613
  TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
1614

1615
  // reset elect timer, long enough
1616
  int32_t electMS = TIMER_MAX_MS;
1617
  code = syncNodeRestartElectTimer(pSyncNode, electMS);
1618
  if (code < 0) {
1619
    sError("vgId:%d, failed to restart elect timer since %s", pSyncNode->vgId, terrstr());
1620
    return -1;
1621
  }
1622

1623
  code = syncNodeStartPingTimer(pSyncNode);
1624
  if (code < 0) {
1625
    sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, terrstr());
1626
    return -1;
1627
  }
1628
  return code;
1629
}
1630
#endif
1631

1632
void syncNodePreClose(SSyncNode* pSyncNode) {
4,043,999✔
1633
  int32_t code = 0;
4,043,999✔
1634
  if (pSyncNode == NULL) {
4,043,999✔
1635
    sError("failed to pre close sync node since sync node is null");
×
1636
    return;
×
1637
  }
1638
  if (pSyncNode->pFsm == NULL) {
4,043,999✔
1639
    sError("failed to pre close sync node since fsm is null");
×
1640
    return;
×
1641
  }
1642
  if (pSyncNode->pFsm->FpApplyQueueItems == NULL) {
4,043,999✔
1643
    sError("failed to pre close sync node since FpApplyQueueItems is null");
×
1644
    return;
×
1645
  }
1646

1647
  // stop elect timer
1648
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
4,043,999✔
1649
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
1650
    return;
×
1651
  }
1652

1653
  // stop heartbeat timer
1654
  if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
4,043,999✔
1655
    sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
1656
    return;
×
1657
  }
1658

1659
  // stop ping timer
1660
  if ((code = syncNodeStopPingTimer(pSyncNode)) != 0) {
4,043,999✔
1661
    sError("vgId:%d, failed to stop ping timer since %s", pSyncNode->vgId, tstrerror(code));
×
1662
    return;
×
1663
  }
1664

1665
  // clean rsp
1666
  syncRespCleanRsp(pSyncNode->pSyncRespMgr);
4,043,999✔
1667
}
1668

1669
void syncNodePostClose(SSyncNode* pSyncNode) {
3,654,230✔
1670
  if (pSyncNode->pNewNodeReceiver != NULL) {
3,654,230✔
1671
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
3,654,230✔
1672
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
1673
    }
1674

1675
    sDebug("vgId:%d, snapshot receiver destroy while preclose sync node, data:%p", pSyncNode->vgId,
3,654,230✔
1676
           pSyncNode->pNewNodeReceiver);
1677
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
3,654,230✔
1678
    pSyncNode->pNewNodeReceiver = NULL;
3,654,230✔
1679
  }
1680
}
3,654,151✔
1681

1682
void syncHbTimerDataFree(SSyncHbTimerData* pData) { taosMemoryFree(pData); }
922,084✔
1683

1684
void syncNodeClose(SSyncNode* pSyncNode) {
4,033,322✔
1685
  int32_t code = 0;
4,033,322✔
1686
  if (pSyncNode == NULL) return;
4,033,322✔
1687
  sNInfo(pSyncNode, "sync close, node:%p", pSyncNode);
4,033,322✔
1688

1689
  syncRespCleanRsp(pSyncNode->pSyncRespMgr);
4,033,322✔
1690

1691
  if ((code = syncNodeStopPingTimer(pSyncNode)) != 0) {
4,033,322✔
1692
    sError("vgId:%d, failed to stop ping timer since %s", pSyncNode->vgId, tstrerror(code));
×
1693
    return;
×
1694
  }
1695
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
4,033,019✔
1696
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
1697
    return;
×
1698
  }
1699
  if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
4,033,322✔
1700
    sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
1701
    return;
×
1702
  }
1703
  syncNodeLogReplDestroy(pSyncNode);
4,033,322✔
1704

1705
  syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
4,033,254✔
1706
  pSyncNode->pSyncRespMgr = NULL;
4,033,322✔
1707
  voteGrantedDestroy(pSyncNode->pVotesGranted);
4,033,322✔
1708
  pSyncNode->pVotesGranted = NULL;
4,033,163✔
1709
  votesRespondDestory(pSyncNode->pVotesRespond);
4,033,163✔
1710
  pSyncNode->pVotesRespond = NULL;
4,033,322✔
1711
  syncIndexMgrDestroy(pSyncNode->pNextIndex);
4,033,322✔
1712
  pSyncNode->pNextIndex = NULL;
4,033,322✔
1713
  syncIndexMgrDestroy(pSyncNode->pMatchIndex);
4,033,322✔
1714
  pSyncNode->pMatchIndex = NULL;
4,033,322✔
1715
  logStoreDestory(pSyncNode->pLogStore);
4,033,322✔
1716
  pSyncNode->pLogStore = NULL;
4,033,322✔
1717
  syncLogBufferDestroy(pSyncNode->pLogBuf);
4,033,322✔
1718
  pSyncNode->pLogBuf = NULL;
4,033,322✔
1719

1720
  (void)taosThreadMutexDestroy(&pSyncNode->arbTokenMutex);
4,033,322✔
1721

1722
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
64,531,339✔
1723
    if (pSyncNode->senders[i] != NULL) {
60,498,017✔
1724
      sDebug("vgId:%d, snapshot sender destroy while close, data:%p", pSyncNode->vgId, pSyncNode->senders[i]);
60,498,116✔
1725

1726
      if (snapshotSenderIsStart(pSyncNode->senders[i])) {
60,498,116✔
1727
        snapshotSenderStop(pSyncNode->senders[i], false);
97✔
1728
      }
1729

1730
      snapshotSenderDestroy(pSyncNode->senders[i]);
60,498,674✔
1731
      pSyncNode->senders[i] = NULL;
60,498,501✔
1732
    }
1733
  }
1734

1735
  if (pSyncNode->pNewNodeReceiver != NULL) {
4,033,322✔
1736
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
389,769✔
1737
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
1738
    }
1739

1740
    sDebug("vgId:%d, snapshot receiver destroy while close, data:%p", pSyncNode->vgId, pSyncNode->pNewNodeReceiver);
389,769✔
1741
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
389,769✔
1742
    pSyncNode->pNewNodeReceiver = NULL;
389,769✔
1743
  }
1744

1745
  if (pSyncNode->pFsm != NULL) {
4,033,322✔
1746
    taosMemoryFree(pSyncNode->pFsm);
4,033,322✔
1747
  }
1748

1749
  raftStoreClose(pSyncNode);
4,033,322✔
1750

1751
  taosMemoryFree(pSyncNode);
4,033,322✔
1752
}
1753

1754
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->raftCfg.snapshotStrategy; }
×
1755

1756
// timer control --------------
1757
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
4,043,999✔
1758
  int32_t code = 0;
4,043,999✔
1759
  if (syncIsInit()) {
4,043,999✔
1760
    bool stopped = taosTmrResetPriority(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, (void*)pSyncNode->rid,
8,087,488✔
1761
                                        syncEnv()->pTimerManager, &pSyncNode->pPingTimer, 2);
4,043,999✔
1762
    if (stopped) {
4,043,999✔
1763
      sError("vgId:%d, failed to reset ping timer, ms:%d, stopped:%d", pSyncNode->vgId, pSyncNode->pingTimerMS,
×
1764
             stopped);
1765
    }
1766
    atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
4,043,999✔
1767
  } else {
1768
    sError("vgId:%d, start ping timer error, sync env is stop", pSyncNode->vgId);
×
1769
  }
1770
  return code;
4,043,999✔
1771
}
1772

1773
int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
8,077,321✔
1774
  int32_t code = 0;
8,077,321✔
1775
  (void)atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
8,077,321✔
1776
  bool stop = taosTmrStop(pSyncNode->pPingTimer);
8,077,321✔
1777
  sDebug("vgId:%d, stop ping timer, stop:%d", pSyncNode->vgId, stop);
8,077,321✔
1778
  pSyncNode->pPingTimer = NULL;
8,077,321✔
1779
  return code;
8,077,018✔
1780
}
1781

1782
int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
111,774,908✔
1783
  int32_t code = 0;
111,774,908✔
1784
  if (syncIsInit()) {
111,774,908✔
1785
    pSyncNode->electTimerMS = ms;
111,775,173✔
1786

1787
    int64_t execTime = taosGetTimestampMs() + ms;
111,774,896✔
1788
    atomic_store_64(&(pSyncNode->electTimerParam.executeTime), execTime);
111,774,896✔
1789
    atomic_store_64(&(pSyncNode->electTimerParam.logicClock), pSyncNode->electTimerLogicClock);
111,775,173✔
1790
    pSyncNode->electTimerParam.pSyncNode = pSyncNode;
111,775,173✔
1791
    pSyncNode->electTimerParam.pData = NULL;
111,774,908✔
1792

1793
    bool stopped = taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, (void*)(pSyncNode->rid),
223,549,308✔
1794
                                syncEnv()->pTimerManager, &pSyncNode->pElectTimer);
111,775,173✔
1795
    if (stopped) sWarn("vgId:%d, failed to reset elect timer, ms:%d", pSyncNode->vgId, ms);
111,775,173✔
1796
  } else {
1797
    sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
×
1798
  }
1799
  return code;
111,775,173✔
1800
}
1801

1802
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
123,035,574✔
1803
  int32_t code = 0;
123,035,574✔
1804
  (void)atomic_add_fetch_64(&pSyncNode->electTimerLogicClock, 1);
123,035,574✔
1805
  bool stop = taosTmrStop(pSyncNode->pElectTimer);
123,035,104✔
1806
  sTrace("vgId:%d, stop elect timer, stop:%d", pSyncNode->vgId, stop);
123,033,268✔
1807
  pSyncNode->pElectTimer = NULL;
123,033,268✔
1808

1809
  return code;
123,033,720✔
1810
}
1811

1812
int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
111,775,173✔
1813
  int32_t ret = 0;
111,775,173✔
1814
  TAOS_CHECK_RETURN(syncNodeStopElectTimer(pSyncNode));
111,775,173✔
1815
  TAOS_CHECK_RETURN(syncNodeStartElectTimer(pSyncNode, ms));
111,775,173✔
1816
  return ret;
111,775,173✔
1817
}
1818

1819
void syncNodeResetElectTimer(SSyncNode* pSyncNode) {
111,775,074✔
1820
  int32_t code = 0;
111,775,074✔
1821
  int32_t electMS;
1822

1823
  if (pSyncNode->raftCfg.isStandBy) {
111,775,074✔
1824
    electMS = TIMER_MAX_MS;
×
1825
  } else {
1826
    electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
111,775,173✔
1827
  }
1828

1829
  if ((code = syncNodeRestartElectTimer(pSyncNode, electMS)) != 0) {
111,775,173✔
1830
    sWarn("vgId:%d, failed to restart elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
1831
    return;
×
1832
  };
1833

1834
  sNTrace(pSyncNode, "reset elect timer, min:%d, max:%d, ms:%d", pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine,
111,775,173✔
1835
          electMS);
1836
}
1837

1838
#ifdef BUILD_NO_CALL
1839
static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) {
1840
  int32_t code = 0;
1841
  if (syncIsInit()) {
1842
    TAOS_CHECK_RETURN(taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, (void*)pSyncNode->rid,
1843
                                   syncEnv()->pTimerManager, &pSyncNode->pHeartbeatTimer));
1844
    atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
1845
  } else {
1846
    sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId);
1847
  }
1848

1849
  sNTrace(pSyncNode, "start heartbeat timer, ms:%d", pSyncNode->heartbeatTimerMS);
1850
  return code;
1851
}
1852
#endif
1853

1854
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
3,489,982✔
1855
  int32_t ret = 0;
3,489,982✔
1856

1857
#if 0
1858
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
1859
  ret = syncNodeDoStartHeartbeatTimer(pSyncNode);
1860
#endif
1861

1862
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
4,412,239✔
1863
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
922,257✔
1864
    if (pSyncTimer != NULL) {
922,257✔
1865
      TAOS_CHECK_RETURN(syncHbTimerStart(pSyncNode, pSyncTimer));
922,257✔
1866
    }
1867
  }
1868

1869
  return ret;
3,487,804✔
1870
}
1871

1872
int32_t syncNodeSetHeartbeatTimerMs(SSyncNode* pSyncNode, int32_t ms) {
×
1873
  int32_t code = 0;
×
1874

1875
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
×
1876
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
×
1877
    if (pSyncTimer != NULL) {
×
1878
      syncHBSetTimerMS(pSyncTimer, ms);
×
1879
    }
1880
  }
1881

1882
  return code;
×
1883
}
1884

1885
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
10,375,623✔
1886
  int32_t code = 0;
10,375,623✔
1887

1888
#if 0
1889
  TAOS_CHECK_RETURN(atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1));
1890
  bool stop = taosTmrStop(pSyncNode->pHeartbeatTimer);
1891
  sDebug("vgId:%d, stop heartbeat timer, stop:%d", pSyncNode->vgId, stop);
1892
  pSyncNode->pHeartbeatTimer = NULL;
1893
#endif
1894

1895
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
20,274,363✔
1896
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
9,898,740✔
1897
    if (pSyncTimer != NULL) {
9,899,219✔
1898
      TAOS_CHECK_RETURN(syncHbTimerStop(pSyncNode, pSyncTimer));
9,899,219✔
1899
    }
1900
  }
1901

1902
  return code;
10,376,248✔
1903
}
1904

1905
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode, int32_t heartbeatInterval) {
×
1906
  int32_t code = 0;
×
1907
  sInfo("vgId:%d, sync Node Restart HeartbeatTimer, state=%d", pSyncNode->vgId, pSyncNode->state);
×
1908
  TAOS_CHECK_RETURN(syncNodeSetHeartbeatTimerMs(pSyncNode, heartbeatInterval));
×
1909
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
×
1910
    TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
×
1911
    TAOS_CHECK_RETURN(syncNodeStartHeartbeatTimer(pSyncNode));
×
1912
  }
1913

1914
  return 0;
×
1915
}
1916

1917
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pNode, SRpcMsg* pMsg) {
239,942,482✔
1918
  SEpSet* epSet = NULL;
239,942,482✔
1919
  for (int32_t i = 0; i < pNode->peersNum; ++i) {
363,019,422✔
1920
    if (destRaftId->addr == pNode->peersId[i].addr) {
362,979,319✔
1921
      epSet = &pNode->peersEpset[i];
239,902,157✔
1922
      break;
239,902,556✔
1923
    }
1924
  }
1925

1926
  int32_t code = -1;
239,945,289✔
1927
  if (pNode->syncSendMSg != NULL && epSet != NULL) {
239,945,289✔
1928
    syncUtilMsgHtoN(pMsg->pCont);
239,902,380✔
1929
    pMsg->info.noResp = 1;
239,901,525✔
1930
    code = pNode->syncSendMSg(epSet, pMsg);
239,899,534✔
1931
  }
1932

1933
  if (code < 0) {
239,947,968✔
1934
    sError("vgId:%d, failed to send sync msg since %s. epset:%p dnode:%d addr:0x%" PRIx64, pNode->vgId, tstrerror(code),
41,865✔
1935
           epSet, DID(destRaftId), destRaftId->addr);
1936
    rpcFreeCont(pMsg->pCont);
41,865✔
1937
    pMsg->pCont = NULL;
41,865✔
1938
  }
1939

1940
  TAOS_RETURN(code);
239,947,968✔
1941
}
1942

1943
inline bool syncNodeInConfig(SSyncNode* pNode, const SSyncCfg* pCfg) {
408,202✔
1944
  bool b1 = false;
408,202✔
1945
  bool b2 = false;
408,202✔
1946

1947
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
467,681✔
1948
    if (strcmp(pCfg->nodeInfo[i].nodeFqdn, pNode->myNodeInfo.nodeFqdn) == 0 &&
467,681✔
1949
        pCfg->nodeInfo[i].nodePort == pNode->myNodeInfo.nodePort) {
467,681✔
1950
      b1 = true;
408,202✔
1951
      break;
408,202✔
1952
    }
1953
  }
1954

1955
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
467,681✔
1956
    SRaftId raftId = {
467,681✔
1957
        .addr = SYNC_ADDR(&pCfg->nodeInfo[i]),
467,681✔
1958
        .vgId = pNode->vgId,
467,681✔
1959
    };
1960

1961
    if (syncUtilSameId(&raftId, &pNode->myRaftId)) {
467,681✔
1962
      b2 = true;
408,202✔
1963
      break;
408,202✔
1964
    }
1965
  }
1966

1967
  if (b1 != b2) {
408,202✔
1968
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1969
    return false;
×
1970
  }
1971
  return b1;
408,202✔
1972
}
1973

1974
static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg) {
1,088,618✔
1975
  if (pOldCfg->totalReplicaNum != pNewCfg->totalReplicaNum) return true;
1,088,618✔
1976
  if (pOldCfg->myIndex != pNewCfg->myIndex) return true;
591,738✔
1977
  for (int32_t i = 0; i < pOldCfg->totalReplicaNum; ++i) {
1,569,150✔
1978
    const SNodeInfo* pOldInfo = &pOldCfg->nodeInfo[i];
1,274,945✔
1979
    const SNodeInfo* pNewInfo = &pNewCfg->nodeInfo[i];
1,274,945✔
1980
    if (strcmp(pOldInfo->nodeFqdn, pNewInfo->nodeFqdn) != 0) return true;
1,274,945✔
1981
    if (pOldInfo->nodePort != pNewInfo->nodePort) return true;
1,274,945✔
1982
    if (pOldInfo->nodeRole != pNewInfo->nodeRole) return true;
1,274,945✔
1983
  }
1984

1985
  return false;
294,205✔
1986
}
1987

1988
int32_t syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) {
332,204✔
1989
  int32_t  code = 0;
332,204✔
1990
  SSyncCfg oldConfig = pSyncNode->raftCfg.cfg;
332,204✔
1991
  if (!syncIsConfigChanged(&oldConfig, pNewConfig)) {
332,204✔
1992
    sInfo("vgId:1, sync not reconfig since not changed");
294,205✔
1993
    return 0;
294,205✔
1994
  }
1995

1996
  pSyncNode->raftCfg.cfg = *pNewConfig;
37,999✔
1997
  pSyncNode->raftCfg.lastConfigIndex = lastConfigChangeIndex;
37,999✔
1998

1999
  pSyncNode->configChangeNum++;
37,999✔
2000

2001
  bool IamInOld = syncNodeInConfig(pSyncNode, &oldConfig);
37,999✔
2002
  bool IamInNew = syncNodeInConfig(pSyncNode, pNewConfig);
37,999✔
2003

2004
  bool isDrop = false;
37,999✔
2005
  bool isAdd = false;
37,999✔
2006

2007
  if (IamInOld && !IamInNew) {
37,999✔
2008
    isDrop = true;
×
2009
  } else {
2010
    isDrop = false;
37,999✔
2011
  }
2012

2013
  if (!IamInOld && IamInNew) {
37,999✔
2014
    isAdd = true;
×
2015
  } else {
2016
    isAdd = false;
37,999✔
2017
  }
2018

2019
  // log begin config change
2020
  sNInfo(pSyncNode, "begin do config change, from %d to %d, from %" PRId64 " to %" PRId64 ", replicas:%d",
37,999✔
2021
         pSyncNode->vgId, oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum, oldConfig.lastIndex,
2022
         pNewConfig->lastIndex);
2023

2024
  if (IamInNew) {
37,999✔
2025
    pSyncNode->raftCfg.isStandBy = 0;  // change isStandBy to normal
37,999✔
2026
  }
2027
  if (isDrop) {
37,999✔
2028
    pSyncNode->raftCfg.isStandBy = 1;  // set standby
×
2029
  }
2030

2031
  // add last config index
2032
  SRaftCfg* pCfg = &pSyncNode->raftCfg;
37,999✔
2033
  if (pCfg->configIndexCount >= MAX_CONFIG_INDEX_COUNT) {
37,999✔
2034
    sNError(pSyncNode, "failed to add cfg index:%d since out of range", pCfg->configIndexCount);
×
2035
    terrno = TSDB_CODE_OUT_OF_RANGE;
×
2036
    return -1;
×
2037
  }
2038

2039
  pCfg->configIndexArr[pCfg->configIndexCount] = lastConfigChangeIndex;
37,999✔
2040
  pCfg->configIndexCount++;
37,999✔
2041

2042
  if (IamInNew) {
37,999✔
2043
    //-----------------------------------------
2044
    int32_t ret = 0;
37,999✔
2045

2046
    // save snapshot senders
2047
    SRaftId oldReplicasId[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
37,999✔
2048
    memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId));
37,999✔
2049
    SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
37,999✔
2050
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
607,984✔
2051
      oldSenders[i] = pSyncNode->senders[i];
569,985✔
2052
      sSTrace(oldSenders[i], "snapshot sender save old");
569,985✔
2053
    }
2054

2055
    // init internal
2056
    pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
37,999✔
2057
    if (syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId) == false) return terrno;
37,999✔
2058

2059
    // init peersNum, peers, peersId
2060
    pSyncNode->peersNum = pSyncNode->raftCfg.cfg.totalReplicaNum - 1;
37,999✔
2061
    int32_t j = 0;
37,999✔
2062
    for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
138,153✔
2063
      if (i != pSyncNode->raftCfg.cfg.myIndex) {
100,154✔
2064
        pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
62,155✔
2065
        syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
62,155✔
2066
        j++;
62,155✔
2067
      }
2068
    }
2069
    for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
100,154✔
2070
      if (syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]) == false)
62,155✔
2071
        return terrno;
×
2072
    }
2073

2074
    // init replicaNum, replicasId
2075
    pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
37,999✔
2076
    pSyncNode->totalReplicaNum = pSyncNode->raftCfg.cfg.totalReplicaNum;
37,999✔
2077
    for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
138,153✔
2078
      if (syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]) ==
100,154✔
2079
          false)
2080
        return terrno;
×
2081
    }
2082

2083
    // update quorum first
2084
    pSyncNode->quorum = syncUtilQuorum(pSyncNode->raftCfg.cfg.replicaNum);
37,999✔
2085

2086
    syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode);
37,999✔
2087
    syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode);
37,999✔
2088
    voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode);
37,999✔
2089
    votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode);
37,999✔
2090

2091
    // reset snapshot senders
2092

2093
    // clear new
2094
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
607,984✔
2095
      pSyncNode->senders[i] = NULL;
569,985✔
2096
    }
2097

2098
    // reset new
2099
    for (int32_t i = 0; i < pSyncNode->totalReplicaNum; ++i) {
138,153✔
2100
      // reset sender
2101
      bool reset = false;
100,154✔
2102
      for (int32_t j = 0; j < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++j) {
432,774✔
2103
        if (syncUtilSameId(&(pSyncNode->replicasId)[i], &oldReplicasId[j]) && oldSenders[j] != NULL) {
414,409✔
2104
          sNTrace(pSyncNode, "snapshot sender reset for:%" PRId64 ", newIndex:%d, dnode:%d, %p",
81,789✔
2105
                  (pSyncNode->replicasId)[i].addr, i, DID(&pSyncNode->replicasId[i]), oldSenders[j]);
2106

2107
          pSyncNode->senders[i] = oldSenders[j];
81,789✔
2108
          oldSenders[j] = NULL;
81,789✔
2109
          reset = true;
81,789✔
2110

2111
          // reset replicaIndex
2112
          int32_t oldreplicaIndex = pSyncNode->senders[i]->replicaIndex;
81,789✔
2113
          pSyncNode->senders[i]->replicaIndex = i;
81,789✔
2114

2115
          sNTrace(pSyncNode, "snapshot sender udpate replicaIndex from %d to %d, dnode:%d, %p, reset:%d",
81,789✔
2116
                  oldreplicaIndex, i, DID(&pSyncNode->replicasId[i]), pSyncNode->senders[i], reset);
2117

2118
          break;
81,789✔
2119
        }
2120
      }
2121
    }
2122

2123
    // create new
2124
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
607,984✔
2125
      if (pSyncNode->senders[i] == NULL) {
569,985✔
2126
        TAOS_CHECK_RETURN(snapshotSenderCreate(pSyncNode, i, &pSyncNode->senders[i]));
488,196✔
2127
        if (pSyncNode->senders[i] == NULL) {
488,196✔
2128
          // will be created later while send snapshot
2129
          sSError(pSyncNode->senders[i], "snapshot sender create failed while reconfig");
×
2130
        } else {
2131
          sSDebug(pSyncNode->senders[i], "snapshot sender create while reconfig, data:%p", pSyncNode->senders[i]);
488,196✔
2132
        }
2133
      } else {
2134
        sSDebug(pSyncNode->senders[i], "snapshot sender already exist, data:%p", pSyncNode->senders[i]);
81,789✔
2135
      }
2136
    }
2137

2138
    // free old
2139
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
607,984✔
2140
      if (oldSenders[i] != NULL) {
569,985✔
2141
        sSDebug(oldSenders[i], "snapshot sender destroy old, data:%p replica-index:%d", oldSenders[i], i);
488,196✔
2142
        snapshotSenderDestroy(oldSenders[i]);
488,196✔
2143
        oldSenders[i] = NULL;
488,196✔
2144
      }
2145
    }
2146

2147
    // persist cfg
2148
    TAOS_CHECK_RETURN(syncWriteCfgFile(pSyncNode, "config_change_with_new_members"));
37,999✔
2149
  } else {
2150
    // persist cfg
2151
    TAOS_CHECK_RETURN(syncWriteCfgFile(pSyncNode, "config_change_no_new_members"));
×
2152
    sNInfo(pSyncNode, "do not config change from %d to %d", oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum);
×
2153
  }
2154

2155
_END:
×
2156
  // log end config change
2157
  sNInfo(pSyncNode, "end do config change, from %d to %d", oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum);
37,999✔
2158
  return 0;
37,999✔
2159
}
2160

2161
// raft state change --------------
2162
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
225,453✔
2163
  if (term > raftStoreGetTerm(pSyncNode)) {
225,453✔
2164
    raftStoreSetTerm(pSyncNode, term);
×
2165
  }
2166
}
225,453✔
2167

2168
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm, SRaftId id, char* strFrom) {
86,567,066✔
2169
  SyncTerm currentTerm = raftStoreGetTerm(pSyncNode);
86,567,066✔
2170
  if (currentTerm > newTerm) {
86,567,066✔
2171
    sNTrace(pSyncNode, "step down, ignore, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
×
2172
    return;
×
2173
  }
2174

2175
  do {
2176
    sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
86,567,066✔
2177
  } while (0);
2178

2179
  if (pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
86,567,066✔
2180
    (void)taosThreadMutexLock(&pSyncNode->arbTokenMutex);
×
2181
    syncUtilGenerateArbToken(pSyncNode->myNodeInfo.nodeId, pSyncNode->vgId, pSyncNode->arbToken);
×
2182
    sInfo("vgId:%d, generate arb token, will step down from assigned leader, new arbToken:%s", pSyncNode->vgId,
×
2183
          pSyncNode->arbToken);
2184
    (void)taosThreadMutexUnlock(&pSyncNode->arbTokenMutex);
×
2185
  }
2186

2187
  if (currentTerm < newTerm) {
86,567,066✔
2188
    raftStoreSetTerm(pSyncNode, newTerm);
822,262✔
2189
    char tmpBuf[64];
822,230✔
2190
    snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64 " from %" PRId64 ", since %s", newTerm,
822,262✔
2191
             currentTerm, strFrom);
2192
    syncNodeBecomeFollower(pSyncNode, id, tmpBuf);
822,262✔
2193
    raftStoreClearVote(pSyncNode);
822,262✔
2194
  } else {
2195
    if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
85,744,804✔
2196
      char tmpBuf[64];
4,865✔
2197
      snprintf(tmpBuf, sizeof(tmpBuf), "step down, with same term to %" PRId64 " from %" PRId64 ", since %s", newTerm, 
4,865✔
2198
               currentTerm, strFrom);
2199
      syncNodeBecomeFollower(pSyncNode, id, tmpBuf);
4,865✔
2200
    }
2201
  }
2202
}
2203

2204
void syncNodeLeaderChangeRsp(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); }
1,992,025✔
2205

2206
void syncNodeBecomeFollower(SSyncNode* pSyncNode, SRaftId leaderId, const char* debugStr) {
1,992,025✔
2207
  int32_t code = 0;  // maybe clear leader cache
1,992,025✔
2208
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
1,992,025✔
2209
    pSyncNode->leaderCache = EMPTY_RAFT_ID;
6,406✔
2210
    pSyncNode->leaderCacheEp.port = 0;
6,406✔
2211
    pSyncNode->leaderCacheEp.fqdn[0] = '\0';
6,406✔
2212
  }
2213

2214
  pSyncNode->hbSlowNum = 0;
1,992,025✔
2215

2216
  pSyncNode->leaderCache = leaderId;  // state change
1,992,025✔
2217

2218
  for (int32_t i = 0; i < pSyncNode->totalReplicaNum; ++i) {
6,503,069✔
2219
    if (syncUtilSameId(&pSyncNode->replicasId[i], &leaderId)) {
5,338,171✔
2220
      pSyncNode->leaderCacheEp.port = pSyncNode->raftCfg.cfg.nodeInfo[i].nodePort;
827,127✔
2221
      strncpy(pSyncNode->leaderCacheEp.fqdn, pSyncNode->raftCfg.cfg.nodeInfo[i].nodeFqdn, TSDB_FQDN_LEN);
827,127✔
2222
      break;
827,127✔
2223
    }
2224
  }
2225
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
1,992,025✔
2226
  pSyncNode->roleTimeMs = taosGetTimestampMs();
1,992,025✔
2227
  if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
1,992,025✔
2228
    sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
2229
    return;
×
2230
  }
2231

2232
  // trace log
2233
  sNTrace(pSyncNode, "become follower %s", debugStr);
1,992,025✔
2234

2235
  // send rsp to client
2236
  syncNodeLeaderChangeRsp(pSyncNode);
1,992,025✔
2237

2238
  // call back
2239
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeFollowerCb != NULL) {
1,991,725✔
2240
    pSyncNode->pFsm->FpBecomeFollowerCb(pSyncNode->pFsm);
1,992,025✔
2241
  }
2242

2243
  // min match index
2244
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
1,991,725✔
2245

2246
  // reset log buffer
2247
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
1,992,025✔
2248
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2249
    return;
×
2250
  }
2251

2252
  // reset elect timer
2253
  syncNodeResetElectTimer(pSyncNode);
1,992,025✔
2254

2255
  sInfo("vgId:%d, become follower. %s", pSyncNode->vgId, debugStr);
1,992,025✔
2256
}
2257

2258
void syncNodeBecomeLearner(SSyncNode* pSyncNode, const char* debugStr) {
104,313✔
2259
  pSyncNode->hbSlowNum = 0;
104,313✔
2260

2261
  // state change
2262
  pSyncNode->state = TAOS_SYNC_STATE_LEARNER;
104,313✔
2263
  pSyncNode->roleTimeMs = taosGetTimestampMs();
104,313✔
2264

2265
  // trace log
2266
  sNTrace(pSyncNode, "become learner %s", debugStr);
104,313✔
2267

2268
  // call back
2269
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLearnerCb != NULL) {
104,313✔
2270
    pSyncNode->pFsm->FpBecomeLearnerCb(pSyncNode->pFsm);
104,313✔
2271
  }
2272

2273
  // min match index
2274
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
104,313✔
2275

2276
  // reset log buffer
2277
  int32_t code = 0;
104,313✔
2278
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
104,313✔
2279
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2280
    return;
×
2281
  };
2282
}
2283

2284
// TLA+ Spec
2285
// \* Candidate i transitions to leader.
2286
// BecomeLeader(i) ==
2287
//     /\ state[i] = Candidate
2288
//     /\ votesGranted[i] \in Quorum
2289
//     /\ state'      = [state EXCEPT ![i] = Leader]
2290
//     /\ nextIndex'  = [nextIndex EXCEPT ![i] =
2291
//                          [j \in Server |-> Len(log[i]) + 1]]
2292
//     /\ matchIndex' = [matchIndex EXCEPT ![i] =
2293
//                          [j \in Server |-> 0]]
2294
//     /\ elections'  = elections \cup
2295
//                          {[eterm     |-> currentTerm[i],
2296
//                            eleader   |-> i,
2297
//                            elog      |-> log[i],
2298
//                            evotes    |-> votesGranted[i],
2299
//                            evoterLog |-> voterLog[i]]}
2300
//     /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
2301
//
2302
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
3,183,064✔
2303
  int32_t code = 0;
3,183,064✔
2304
  pSyncNode->becomeLeaderNum++;
3,183,064✔
2305
  pSyncNode->hbrSlowNum = 0;
3,183,064✔
2306

2307
  // reset restoreFinish
2308
  pSyncNode->restoreFinish = false;
3,183,064✔
2309

2310
  // state change
2311
  pSyncNode->state = TAOS_SYNC_STATE_LEADER;
3,181,594✔
2312
  pSyncNode->roleTimeMs = taosGetTimestampMs();
3,181,722✔
2313

2314
  // set leader cache
2315
  pSyncNode->leaderCache = pSyncNode->myRaftId;
3,181,886✔
2316
  strncpy(pSyncNode->leaderCacheEp.fqdn, pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeFqdn,
3,182,233✔
2317
          TSDB_FQDN_LEN);
2318
  pSyncNode->leaderCacheEp.port = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodePort;
3,182,078✔
2319

2320
  for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
7,157,364✔
2321
    SyncIndex lastIndex;
3,973,854✔
2322
    SyncTerm  lastTerm;
3,973,854✔
2323
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
3,974,348✔
2324
    if (code != 0) {
3,975,811✔
2325
      sError("vgId:%d, failed to become leader since %s", pSyncNode->vgId, tstrerror(code));
×
2326
      return;
×
2327
    }
2328
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
3,975,811✔
2329
  }
2330

2331
  for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
7,156,571✔
2332
    // maybe overwrite myself, no harm
2333
    // just do it!
2334
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
3,976,448✔
2335
  }
2336

2337
  // init peer mgr
2338
  if ((code = syncNodePeerStateInit(pSyncNode)) != 0) {
3,182,360✔
2339
    sError("vgId:%d, failed to init peer state since %s", pSyncNode->vgId, tstrerror(code));
×
2340
    return;
×
2341
  }
2342

2343
#if 0
2344
  // update sender private term
2345
  SSyncSnapshotSender* pMySender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->myRaftId));
2346
  if (pMySender != NULL) {
2347
    for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
2348
      if (pSyncNode->senders[i]->privateTerm > pMySender->privateTerm) {
2349
        pMySender->privateTerm = pSyncNode->senders[i]->privateTerm;
2350
      }
2351
    }
2352
    (pMySender->privateTerm) += 100;
2353
  }
2354
#endif
2355

2356
  // close receiver
2357
  if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
3,183,064✔
2358
    snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
2359
  }
2360

2361
  // stop elect timer
2362
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
3,182,717✔
2363
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
2364
    return;
×
2365
  }
2366

2367
  // start heartbeat timer
2368
  if ((code = syncNodeStartHeartbeatTimer(pSyncNode)) != 0) {
3,180,508✔
2369
    sError("vgId:%d, failed to start heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
2370
    return;
×
2371
  }
2372

2373
  // send heartbeat right now
2374
  if ((code = syncNodeHeartbeatPeers(pSyncNode)) != 0) {
3,180,886✔
2375
    sError("vgId:%d, failed to send heartbeat to peers since %s", pSyncNode->vgId, tstrerror(code));
×
2376
    return;
×
2377
  }
2378

2379
  // call back
2380
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLeaderCb != NULL) {
3,181,785✔
2381
    pSyncNode->pFsm->FpBecomeLeaderCb(pSyncNode->pFsm);
3,181,516✔
2382
  }
2383

2384
  // min match index
2385
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
3,182,813✔
2386

2387
  // reset log buffer
2388
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
3,183,064✔
2389
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2390
    return;
×
2391
  }
2392

2393
  // trace log
2394
  sNInfo(pSyncNode, "node become leader, %s", debugStr);
3,183,064✔
2395
}
2396

2397
void syncNodeBecomeAssignedLeader(SSyncNode* pSyncNode) {
16✔
2398
  int32_t code = 0;
16✔
2399
  pSyncNode->becomeAssignedLeaderNum++;
16✔
2400
  pSyncNode->hbrSlowNum = 0;
16✔
2401

2402
  // reset restoreFinish
2403
  // pSyncNode->restoreFinish = false;
2404

2405
  // state change
2406
  pSyncNode->state = TAOS_SYNC_STATE_ASSIGNED_LEADER;
16✔
2407
  pSyncNode->roleTimeMs = taosGetTimestampMs();
16✔
2408

2409
  // set leader cache
2410
  pSyncNode->leaderCache = pSyncNode->myRaftId;
16✔
2411

2412
  for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
48✔
2413
    SyncIndex lastIndex;
×
2414
    SyncTerm  lastTerm;
×
2415
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
32✔
2416
    if (code != 0) {
32✔
2417
      sError("vgId:%d, failed to become assigned leader since %s", pSyncNode->vgId, tstrerror(code));
×
2418
      return;
×
2419
    }
2420
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
32✔
2421
  }
2422

2423
  for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
48✔
2424
    // maybe overwrite myself, no harm
2425
    // just do it!
2426
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
32✔
2427
  }
2428

2429
  // init peer mgr
2430
  if ((code = syncNodePeerStateInit(pSyncNode)) != 0) {
16✔
2431
    sError("vgId:%d, failed to init peer state since %s", pSyncNode->vgId, tstrerror(code));
×
2432
    return;
×
2433
  }
2434

2435
  // close receiver
2436
  if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
16✔
2437
    snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
2438
  }
2439

2440
  // stop elect timer
2441
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
16✔
2442
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
2443
    return;
×
2444
  }
2445

2446
  // start heartbeat timer
2447
  if ((code = syncNodeStartHeartbeatTimer(pSyncNode)) != 0) {
16✔
2448
    sError("vgId:%d, failed to start heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
2449
    return;
×
2450
  }
2451

2452
  // send heartbeat right now
2453
  if ((code = syncNodeHeartbeatPeers(pSyncNode)) != 0) {
16✔
2454
    sError("vgId:%d, failed to send heartbeat to peers since %s", pSyncNode->vgId, tstrerror(code));
×
2455
    return;
×
2456
  }
2457

2458
  // call back
2459
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeAssignedLeaderCb != NULL) {
16✔
2460
    pSyncNode->pFsm->FpBecomeAssignedLeaderCb(pSyncNode->pFsm);
16✔
2461
  }
2462

2463
  // min match index
2464
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
16✔
2465

2466
  // reset log buffer
2467
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
16✔
2468
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2469
    return;
×
2470
  }
2471

2472
  // trace log
2473
  sNInfo(pSyncNode, "become assigned leader");
16✔
2474
}
2475

2476
void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
404,732✔
2477
  if (pSyncNode->state != TAOS_SYNC_STATE_CANDIDATE) {
404,732✔
2478
    sError("vgId:%d, failed leader from candidate since node state is wrong:%d", pSyncNode->vgId, pSyncNode->state);
×
2479
    return;
×
2480
  }
2481
  bool granted = voteGrantedMajority(pSyncNode->pVotesGranted);
404,732✔
2482
  if (!granted) {
404,732✔
2483
    sError("vgId:%d, not granted by majority.", pSyncNode->vgId);
×
2484
    return;
×
2485
  }
2486
  syncNodeBecomeLeader(pSyncNode, "from candidate to leader");
404,732✔
2487

2488
  sNTrace(pSyncNode, "state change syncNodeCandidate2Leader");
404,732✔
2489

2490
  int32_t ret = syncNodeAppendNoop(pSyncNode);
404,732✔
2491
  if (ret < 0) {
404,732✔
2492
    sError("vgId:%d, failed to append noop entry since %s", pSyncNode->vgId, terrstr());
×
2493
  }
2494

2495
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
404,732✔
2496

2497
  sInfo("vgId:%d, become leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64, pSyncNode->vgId,
404,732✔
2498
        raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
2499
}
2500

2501
bool syncNodeIsMnode(SSyncNode* pSyncNode) { return (pSyncNode->vgId == 1); }
57,391,938✔
2502

2503
int32_t syncSetElectBaseline(int64_t rid, int32_t ms){
21,057✔
2504
  int32_t code = 0;
21,057✔
2505
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
21,057✔
2506
  if (pSyncNode == NULL) {
21,057✔
2507
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
2508
    if (terrno != 0) code = terrno;
×
2509
    sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid);
×
2510
    TAOS_RETURN(code);
×
2511
  }
2512
  pSyncNode->electBaseLine = ms;
21,057✔
2513
  syncNodeResetElectTimer(pSyncNode);
21,057✔
2514
  return code;
21,057✔
2515
}
2516

2517
int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) {
7,226,434✔
2518
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
115,568,930✔
2519
    pSyncNode->peerStates[i].lastSendIndex = SYNC_INDEX_INVALID;
108,341,792✔
2520
    pSyncNode->peerStates[i].lastSendTime = 0;
108,346,068✔
2521
  }
2522

2523
  return 0;
7,227,138✔
2524
}
2525

2526
void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
448,082✔
2527
  if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
448,082✔
2528
    sError("vgId:%d, failed candidate from follower since node state is wrong:%d", pSyncNode->vgId, pSyncNode->state);
×
2529
    return;
×
2530
  }
2531
  pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
448,082✔
2532
  pSyncNode->roleTimeMs = taosGetTimestampMs();
448,082✔
2533
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
448,082✔
2534
  sInfo("vgId:%d, become candidate from follower. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
448,082✔
2535
        pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
2536

2537
  sNTrace(pSyncNode, "follower to candidate");
448,082✔
2538
}
2539

2540
int32_t syncNodeAssignedLeader2Leader(SSyncNode* pSyncNode) {
×
2541
  if (pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) return TSDB_CODE_SYN_INTERNAL_ERROR;
×
2542
  syncNodeBecomeLeader(pSyncNode, "assigned leader to leader");
×
2543

2544
  sNTrace(pSyncNode, "assigned leader to leader");
×
2545

2546
  int32_t ret = syncNodeAppendNoop(pSyncNode);
×
2547
  if (ret < 0) {
×
2548
    sError("vgId:%d, failed to append noop entry since %s", pSyncNode->vgId, tstrerror(ret));
×
2549
  }
2550

2551
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
×
2552
  sInfo("vgId:%d, become leader from assigned leader. term:%" PRId64 ", commit index:%" PRId64
×
2553
        "assigned commit index:%" PRId64 ", last index:%" PRId64,
2554
        pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, pSyncNode->assignedCommitIndex,
2555
        lastIndex);
2556
  return 0;
×
2557
}
2558

2559
// just called by syncNodeVoteForSelf
2560
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
474,929✔
2561
  SyncTerm storeTerm = raftStoreGetTerm(pSyncNode);
474,929✔
2562
  if (term != storeTerm) {
474,929✔
2563
    sError("vgId:%d, failed to vote for term, term:%" PRId64 ", storeTerm:%" PRId64, pSyncNode->vgId, term, storeTerm);
×
2564
    return;
×
2565
  }
2566
  sTrace("vgId:%d, begin hasVoted", pSyncNode->vgId);
474,929✔
2567
  bool voted = raftStoreHasVoted(pSyncNode);
474,929✔
2568
  if (voted) {
474,929✔
2569
    sError("vgId:%d, failed to vote for term since not voted", pSyncNode->vgId);
×
2570
    return;
×
2571
  }
2572

2573
  raftStoreVote(pSyncNode, pRaftId);
474,929✔
2574
}
2575

2576
// simulate get vote from outside
2577
void syncNodeVoteForSelf(SSyncNode* pSyncNode, SyncTerm currentTerm) {
474,929✔
2578
  syncNodeVoteForTerm(pSyncNode, currentTerm, &pSyncNode->myRaftId);
474,929✔
2579

2580
  SRpcMsg rpcMsg = {0};
474,929✔
2581
  int32_t ret = syncBuildRequestVoteReply(&rpcMsg, pSyncNode->vgId);
474,929✔
2582
  if (ret != 0) return;
474,929✔
2583

2584
  SyncRequestVoteReply* pMsg = rpcMsg.pCont;
474,929✔
2585
  pMsg->srcId = pSyncNode->myRaftId;
474,929✔
2586
  pMsg->destId = pSyncNode->myRaftId;
474,929✔
2587
  pMsg->term = currentTerm;
474,929✔
2588
  pMsg->voteGranted = true;
474,929✔
2589

2590
  voteGrantedVote(pSyncNode->pVotesGranted, pMsg);
474,929✔
2591
  votesRespondAdd(pSyncNode->pVotesRespond, pMsg);
474,929✔
2592
  rpcFreeCont(rpcMsg.pCont);
474,929✔
2593
}
2594

2595
// return if has a snapshot
2596
bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
5,703,346✔
2597
  bool      ret = false;
5,703,346✔
2598
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
5,703,346✔
2599
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
5,703,346✔
2600
    // TODO check return value
2601
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
5,703,218✔
2602
    if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) {
5,703,346✔
2603
      ret = true;
1,189,493✔
2604
    }
2605
  }
2606
  return ret;
5,703,474✔
2607
}
2608

2609
// return max(logLastIndex, snapshotLastIndex)
2610
// if no snapshot and log, return -1
2611
SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode) {
5,724,848✔
2612
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
5,724,848✔
2613
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
5,724,848✔
2614
    // TODO check return value
2615
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
5,724,848✔
2616
  }
2617
  SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
5,725,579✔
2618

2619
  SyncIndex lastIndex = logLastIndex > snapshot.lastApplyIndex ? logLastIndex : snapshot.lastApplyIndex;
5,725,579✔
2620
  return lastIndex;
5,725,579✔
2621
}
2622

2623
// return the last term of snapshot and log
2624
// if error, return SYNC_TERM_INVALID (by syncLogLastTerm)
2625
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) {
5,703,346✔
2626
  SyncTerm lastTerm = 0;
5,703,346✔
2627
  if (syncNodeHasSnapshot(pSyncNode)) {
5,703,346✔
2628
    // has snapshot
2629
    SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
1,189,493✔
2630
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
1,189,493✔
2631
      // TODO check return value
2632
      (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
1,189,493✔
2633
    }
2634

2635
    SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
1,189,493✔
2636
    if (logLastIndex > snapshot.lastApplyIndex) {
1,189,493✔
2637
      lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
829,907✔
2638
    } else {
2639
      lastTerm = snapshot.lastApplyTerm;
359,586✔
2640
    }
2641

2642
  } else {
2643
    // no snapshot
2644
    lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
4,513,853✔
2645
  }
2646

2647
  return lastTerm;
5,701,629✔
2648
}
2649

2650
// get last index and term along with snapshot
2651
int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm) {
4,871,525✔
2652
  *pLastIndex = syncNodeGetLastIndex(pSyncNode);
4,871,525✔
2653
  *pLastTerm = syncNodeGetLastTerm(pSyncNode);
4,873,545✔
2654
  return 0;
4,871,700✔
2655
}
2656

2657
#ifdef BUILD_NO_CALL
2658
// return append-entries first try index
2659
SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) {
2660
  SyncIndex syncStartIndex = syncNodeGetLastIndex(pSyncNode) + 1;
2661
  return syncStartIndex;
2662
}
2663

2664
// if index > 0, return index - 1
2665
// else, return -1
2666
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
2667
  SyncIndex preIndex = index - 1;
2668
  if (preIndex < SYNC_INDEX_INVALID) {
2669
    preIndex = SYNC_INDEX_INVALID;
2670
  }
2671

2672
  return preIndex;
2673
}
2674

2675
// if index < 0, return SYNC_TERM_INVALID
2676
// if index == 0, return 0
2677
// if index > 0, return preTerm
2678
// if error, return SYNC_TERM_INVALID
2679
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
2680
  if (index < SYNC_INDEX_BEGIN) {
2681
    return SYNC_TERM_INVALID;
2682
  }
2683

2684
  if (index == SYNC_INDEX_BEGIN) {
2685
    return 0;
2686
  }
2687

2688
  SyncTerm  preTerm = 0;
2689
  SyncIndex preIndex = index - 1;
2690

2691
  SSyncRaftEntry* pPreEntry = NULL;
2692
  SLRUCache*      pCache = pSyncNode->pLogStore->pCache;
2693
  LRUHandle*      h = taosLRUCacheLookup(pCache, &preIndex, sizeof(preIndex));
2694
  int32_t         code = 0;
2695
  if (h) {
2696
    pPreEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
2697
    code = 0;
2698

2699
    pSyncNode->pLogStore->cacheHit++;
2700
    sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", preIndex, pPreEntry->bytes, pPreEntry);
2701

2702
  } else {
2703
    pSyncNode->pLogStore->cacheMiss++;
2704
    sNTrace(pSyncNode, "miss cache index:%" PRId64, preIndex);
2705

2706
    code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry);
2707
  }
2708

2709
  SSnapshot snapshot = {.data = NULL,
2710
                        .lastApplyIndex = SYNC_INDEX_INVALID,
2711
                        .lastApplyTerm = SYNC_TERM_INVALID,
2712
                        .lastConfigIndex = SYNC_INDEX_INVALID};
2713

2714
  if (code == 0) {
2715
    if (pPreEntry == NULL) return -1;
2716
    preTerm = pPreEntry->term;
2717

2718
    if (h) {
2719
      taosLRUCacheRelease(pCache, h, false);
2720
    } else {
2721
      syncEntryDestroy(pPreEntry);
2722
    }
2723

2724
    return preTerm;
2725
  } else {
2726
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
2727
      // TODO check return value
2728
      (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
2729
      if (snapshot.lastApplyIndex == preIndex) {
2730
        return snapshot.lastApplyTerm;
2731
      }
2732
    }
2733
  }
2734

2735
  sNError(pSyncNode, "sync node get pre term error, index:%" PRId64 ", snap-index:%" PRId64 ", snap-term:%" PRId64,
2736
          index, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
2737
  return SYNC_TERM_INVALID;
2738
}
2739

2740
// get pre index and term of "index"
2741
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm) {
2742
  *pPreIndex = syncNodeGetPreIndex(pSyncNode, index);
2743
  *pPreTerm = syncNodeGetPreTerm(pSyncNode, index);
2744
  return 0;
2745
}
2746
#endif
2747

2748
static void syncNodeEqPingTimer(void* param, void* tmrId) {
25,899,621✔
2749
  if (!syncIsInit()) return;
25,899,621✔
2750

2751
  int64_t    rid = (int64_t)param;
25,899,621✔
2752
  SSyncNode* pNode = syncNodeAcquire(rid);
25,899,621✔
2753

2754
  if (pNode == NULL) return;
25,899,621✔
2755

2756
  if (atomic_load_64(&pNode->pingTimerLogicClockUser) <= atomic_load_64(&pNode->pingTimerLogicClock)) {
25,899,621✔
2757
    SRpcMsg rpcMsg = {0};
25,899,621✔
2758
    int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock),
25,899,621✔
2759
                                    pNode->pingTimerMS, pNode);
2760
    if (code != 0) {
25,899,070✔
2761
      sError("failed to build ping msg");
×
2762
      rpcFreeCont(rpcMsg.pCont);
×
2763
      goto _out;
×
2764
    }
2765

2766
    // sTrace("enqueue ping msg");
2767
    code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
25,899,070✔
2768
    if (code != 0) {
25,899,070✔
2769
      sError("failed to sync enqueue ping msg since %s", terrstr());
1,040✔
2770
      rpcFreeCont(rpcMsg.pCont);
1,040✔
2771
      goto _out;
1,040✔
2772
    }
2773

2774
  _out:
25,899,070✔
2775
    if (taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager,
25,899,070✔
2776
                     &pNode->pPingTimer))
2777
      sError("failed to reset ping timer");
×
2778
  }
2779
  syncNodeRelease(pNode);
25,899,621✔
2780
}
2781

2782
static void syncNodeEqElectTimer(void* param, void* tmrId) {
480,034✔
2783
  if (!syncIsInit()) return;
480,034✔
2784

2785
  int64_t    rid = (int64_t)param;
480,034✔
2786
  SSyncNode* pNode = syncNodeAcquire(rid);
480,034✔
2787

2788
  if (pNode == NULL) return;
480,034✔
2789

2790
  if (pNode->syncEqMsg == NULL) {
479,403✔
2791
    syncNodeRelease(pNode);
×
2792
    return;
×
2793
  }
2794

2795
  int64_t tsNow = taosGetTimestampMs();
479,403✔
2796
  if (tsNow < pNode->electTimerParam.executeTime) {
479,403✔
2797
    syncNodeRelease(pNode);
2,959✔
2798
    return;
2,959✔
2799
  }
2800

2801
  SRpcMsg rpcMsg = {0};
476,444✔
2802
  int32_t code =
2803
      syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pNode->electTimerParam.logicClock, pNode->electTimerMS, pNode);
476,444✔
2804

2805
  if (code != 0) {
476,444✔
2806
    sError("failed to build elect msg");
×
2807
    syncNodeRelease(pNode);
×
2808
    return;
×
2809
  }
2810

2811
  SyncTimeout* pTimeout = rpcMsg.pCont;
476,444✔
2812
  sNTrace(pNode, "enqueue elect msg lc:%" PRId64, pTimeout->logicClock);
476,444✔
2813

2814
  code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
476,444✔
2815
  if (code != 0) {
476,444✔
2816
    sError("failed to sync enqueue elect msg since %s", terrstr());
×
2817
    rpcFreeCont(rpcMsg.pCont);
×
2818
    syncNodeRelease(pNode);
×
2819
    return;
×
2820
  }
2821

2822
  syncNodeRelease(pNode);
476,444✔
2823
}
2824

2825
#ifdef BUILD_NO_CALL
2826
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
2827
  if (!syncIsInit()) return;
2828

2829
  int64_t    rid = (int64_t)param;
2830
  SSyncNode* pNode = syncNodeAcquire(rid);
2831

2832
  if (pNode == NULL) return;
2833

2834
  if (pNode->totalReplicaNum > 1) {
2835
    if (atomic_load_64(&pNode->heartbeatTimerLogicClockUser) <= atomic_load_64(&pNode->heartbeatTimerLogicClock)) {
2836
      SRpcMsg rpcMsg = {0};
2837
      int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pNode->heartbeatTimerLogicClock),
2838
                                      pNode->heartbeatTimerMS, pNode);
2839

2840
      if (code != 0) {
2841
        sError("failed to build heartbeat msg");
2842
        goto _out;
2843
      }
2844

2845
      sTrace("vgId:%d, enqueue heartbeat timer", pNode->vgId);
2846
      code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
2847
      if (code != 0) {
2848
        sError("failed to enqueue heartbeat msg since %s", terrstr());
2849
        rpcFreeCont(rpcMsg.pCont);
2850
        goto _out;
2851
      }
2852

2853
    _out:
2854
      if (taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager,
2855
                       &pNode->pHeartbeatTimer) != 0)
2856
        return;
2857

2858
    } else {
2859
      sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRId64 ", heartbeatTimerLogicClockUser:%" PRId64,
2860
             pNode->heartbeatTimerLogicClock, pNode->heartbeatTimerLogicClockUser);
2861
    }
2862
  }
2863
}
2864
#endif
2865

2866
static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
23,719,529✔
2867
  if (tsSyncLogHeartbeat) {
23,719,529✔
2868
    sInfo("heartbeat timer start");
×
2869
  }
2870
  int32_t code = 0;
23,719,529✔
2871
  int64_t hbDataRid = (int64_t)param;
23,719,529✔
2872
  int64_t tsNow = taosGetTimestampMs();
23,719,529✔
2873

2874
  SSyncHbTimerData* pData = syncHbTimerDataAcquire(hbDataRid);
23,719,529✔
2875
  if (pData == NULL) {
23,719,529✔
2876
    sError("hb timer get pData NULL, %" PRId64, hbDataRid);
×
2877
    return;
×
2878
  }
2879

2880
  SSyncNode* pSyncNode = syncNodeAcquire(pData->syncNodeRid);
23,719,529✔
2881
  if (pSyncNode == NULL) {
23,719,529✔
UNCOV
2882
    syncHbTimerDataRelease(pData);
×
UNCOV
2883
    sError("hb timer get pSyncNode NULL");
×
UNCOV
2884
    return;
×
2885
  }
2886

2887
  SSyncTimer* pSyncTimer = pData->pTimer;
23,719,529✔
2888

2889
  if (!pSyncNode->isStart) {
23,719,529✔
2890
    syncNodeRelease(pSyncNode);
×
2891
    syncHbTimerDataRelease(pData);
×
2892
    sError("vgId:%d, hb timer sync node already stop", pSyncNode->vgId);
×
2893
    return;
×
2894
  }
2895

2896
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
23,719,529✔
2897
    syncNodeRelease(pSyncNode);
×
2898
    syncHbTimerDataRelease(pData);
×
2899
    sError("vgId:%d, hb timer sync node not leader", pSyncNode->vgId);
×
2900
    return;
×
2901
  }
2902

2903
  if (tsSyncLogHeartbeat) {
23,719,529✔
2904
    sInfo("vgId:%d, peer hb timer execution, rid:%" PRId64 " addr:0x%" PRIx64, pSyncNode->vgId, hbDataRid,
×
2905
          pData->destId.addr);
2906
  } else {
2907
    sTrace("vgId:%d, peer hb timer execution, rid:%" PRId64 " addr:0x%" PRIx64, pSyncNode->vgId, hbDataRid,
23,719,529✔
2908
           pData->destId.addr);
2909
  }
2910

2911
  if (pSyncNode->totalReplicaNum > 1) {
23,719,529✔
2912
    int64_t timerLogicClock = atomic_load_64(&pSyncTimer->logicClock);
23,719,257✔
2913
    int64_t msgLogicClock = atomic_load_64(&pData->logicClock);
23,719,257✔
2914

2915
    if (timerLogicClock == msgLogicClock) {
23,719,257✔
2916
      if (tsNow > pData->execTime) {
23,718,923✔
2917
        pData->execTime += pSyncTimer->timerMS;
23,707,003✔
2918

2919
        SRpcMsg rpcMsg = {0};
23,707,003✔
2920
        if ((code = syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId)) != 0) {
23,707,003✔
2921
          sError("vgId:%d, failed to build heartbeat msg since %s", pSyncNode->vgId, tstrerror(code));
×
2922
          syncNodeRelease(pSyncNode);
×
2923
          syncHbTimerDataRelease(pData);
×
2924
          return;
×
2925
        }
2926

2927
        pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode);
23,707,003✔
2928

2929
        SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
23,707,003✔
2930
        pSyncMsg->srcId = pSyncNode->myRaftId;
23,707,003✔
2931
        pSyncMsg->destId = pData->destId;
23,707,003✔
2932
        pSyncMsg->term = raftStoreGetTerm(pSyncNode);
23,707,003✔
2933
        pSyncMsg->commitIndex = pSyncNode->commitIndex;
23,707,003✔
2934
        pSyncMsg->minMatchIndex = pSyncNode->minMatchIndex;
23,707,003✔
2935
        pSyncMsg->privateTerm = 0;
23,707,003✔
2936
        pSyncMsg->timeStamp = tsNow;
23,707,003✔
2937

2938
        // update reset time
2939
        int64_t timerElapsed = tsNow - pSyncTimer->timeStamp;
23,707,003✔
2940
        pSyncTimer->timeStamp = tsNow;
23,707,003✔
2941

2942
        // send msg
2943
        TRACE_SET_MSGID(&(rpcMsg.info.traceId), tGenIdPI64());
23,707,003✔
2944
        TRACE_SET_ROOTID(&(rpcMsg.info.traceId), tGenIdPI64());
23,707,003✔
2945
        syncLogSendHeartbeat(pSyncNode, pSyncMsg, false, timerElapsed, pData->execTime, &(rpcMsg.info.traceId));
23,707,003✔
2946
        int ret = syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
23,707,003✔
2947
        if (ret != 0) {
23,707,003✔
2948
          sError("vgId:%d, failed to send heartbeat since %s", pSyncNode->vgId, tstrerror(ret));
41,865✔
2949
        }
2950
      }
2951

2952
      if (syncIsInit()) {
23,718,923✔
2953
        if (tsSyncLogHeartbeat) {
23,718,923✔
2954
          sInfo("vgId:%d, reset peer hb timer at %d", pSyncNode->vgId, pSyncTimer->timerMS);
×
2955
        } else {
2956
          sTrace("vgId:%d, reset peer hb timer at %d", pSyncNode->vgId, pSyncTimer->timerMS);
23,718,923✔
2957
        }
2958
        bool stopped = taosTmrResetPriority(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS, (void*)hbDataRid,
23,718,923✔
2959
                                            syncEnv()->pTimerManager, &pSyncTimer->pTimer, 2);
23,718,923✔
2960
        if (stopped) sError("vgId:%d, reset peer hb timer error, %s", pSyncNode->vgId, tstrerror(code));
23,718,923✔
2961

2962
      } else {
2963
        sError("sync env is stop, reset peer hb timer error");
×
2964
      }
2965

2966
    } else {
2967
      sTrace("vgId:%d, do not send hb, timerLogicClock:%" PRId64 ", msgLogicClock:%" PRId64, pSyncNode->vgId,
334✔
2968
             timerLogicClock, msgLogicClock);
2969
    }
2970

2971
    if (tsSyncLogHeartbeat) {
23,719,257✔
2972
      sInfo("vgId:%d, finish send sync-heartbeat", pSyncNode->vgId);
×
2973
    }
2974
  }
2975

2976
  syncHbTimerDataRelease(pData);
23,719,529✔
2977
  syncNodeRelease(pSyncNode);
23,719,529✔
2978
  if (tsSyncLogHeartbeat) {
23,719,529✔
2979
    sInfo("heartbeat timer stop");
×
2980
  }
2981
}
2982

2983
#ifdef BUILD_NO_CALL
2984
static void deleteCacheEntry(const void* key, size_t keyLen, void* value, void* ud) {
2985
  (void)ud;
2986
  taosMemoryFree(value);
2987
}
2988

2989
int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) {
2990
  SSyncLogStoreData* pData = pLogStore->data;
2991
  sNTrace(pData->pSyncNode, "in cache index:%" PRId64 ", bytes:%u, %p", pEntry->index, pEntry->bytes, pEntry);
2992

2993
  int32_t   code = 0;
2994
  int32_t   entryLen = sizeof(*pEntry) + pEntry->dataLen;
2995
  LRUStatus status = taosLRUCacheInsert(pLogStore->pCache, &pEntry->index, sizeof(pEntry->index), pEntry, entryLen,
2996
                                        deleteCacheEntry, h, TAOS_LRU_PRIORITY_LOW, NULL);
2997
  if (status != TAOS_LRU_STATUS_OK) {
2998
    code = -1;
2999
  }
3000

3001
  return code;
3002
}
3003
#endif
3004

3005
void syncBuildConfigFromReq(SAlterVnodeReplicaReq* pReq, SSyncCfg* cfg) {  // TODO SAlterVnodeReplicaReq name is proper?
×
3006
  cfg->replicaNum = 0;
×
3007
  cfg->totalReplicaNum = 0;
×
3008
  int32_t code = 0;
×
3009

3010
  for (int i = 0; i < pReq->replica; ++i) {
×
3011
    SNodeInfo* pNode = &cfg->nodeInfo[i];
×
3012
    pNode->nodeId = pReq->replicas[i].id;
×
3013
    pNode->nodePort = pReq->replicas[i].port;
×
3014
    tstrncpy(pNode->nodeFqdn, pReq->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
×
3015
    pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3016
    bool update = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
×
3017
    sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d, update:%d", pReq->vgId, i, pNode->nodeFqdn,
×
3018
          pNode->nodePort, pNode->nodeId, pNode->nodeRole, update);
3019
    cfg->replicaNum++;
×
3020
  }
3021
  if (pReq->selfIndex != -1) {
×
3022
    cfg->myIndex = pReq->selfIndex;
×
3023
  }
3024
  for (int i = cfg->replicaNum; i < pReq->replica + pReq->learnerReplica; ++i) {
×
3025
    SNodeInfo* pNode = &cfg->nodeInfo[i];
×
3026
    pNode->nodeId = pReq->learnerReplicas[cfg->totalReplicaNum].id;
×
3027
    pNode->nodePort = pReq->learnerReplicas[cfg->totalReplicaNum].port;
×
3028
    pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3029
    tstrncpy(pNode->nodeFqdn, pReq->learnerReplicas[cfg->totalReplicaNum].fqdn, sizeof(pNode->nodeFqdn));
×
3030
    bool update = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
×
3031
    sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d, update:%d", pReq->vgId, i, pNode->nodeFqdn,
×
3032
          pNode->nodePort, pNode->nodeId, pNode->nodeRole, update);
3033
    cfg->totalReplicaNum++;
×
3034
  }
3035
  cfg->totalReplicaNum += pReq->replica;
×
3036
  if (pReq->learnerSelfIndex != -1) {
×
3037
    cfg->myIndex = pReq->replica + pReq->learnerSelfIndex;
×
3038
  }
3039
  cfg->changeVersion = pReq->changeVersion;
×
3040
}
×
3041

3042
int32_t syncNodeCheckChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry) {
×
3043
  int32_t code = 0;
×
3044
  if (pEntry->originalRpcType != TDMT_SYNC_CONFIG_CHANGE) {
×
3045
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3046
  }
3047

3048
  SMsgHead* head = (SMsgHead*)pEntry->data;
×
3049
  void*     pReq = POINTER_SHIFT(head, sizeof(SMsgHead));
×
3050

3051
  SAlterVnodeTypeReq req = {0};
×
3052
  if (tDeserializeSAlterVnodeReplicaReq(pReq, head->contLen, &req) != 0) {
×
3053
    code = TSDB_CODE_INVALID_MSG;
×
3054
    TAOS_RETURN(code);
×
3055
  }
3056

3057
  SSyncCfg cfg = {0};
×
3058
  syncBuildConfigFromReq(&req, &cfg);
×
3059

3060
  if (cfg.totalReplicaNum >= 1 &&
×
3061
      (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER)) {
×
3062
    bool incfg = false;
×
3063
    for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
×
3064
      if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
×
3065
          ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
×
3066
        incfg = true;
×
3067
        break;
×
3068
      }
3069
    }
3070

3071
    if (!incfg) {
×
3072
      SyncTerm currentTerm = raftStoreGetTerm(ths);
×
3073
      SRaftId  id = EMPTY_RAFT_ID;
×
3074
      syncNodeStepDown(ths, currentTerm, id, "changeConfig");
×
3075
      return 1;
×
3076
    }
3077
  }
3078
  return 0;
×
3079
}
3080

3081
void syncNodeLogConfigInfo(SSyncNode* ths, SSyncCfg* cfg, char* str) {
×
3082
  sInfo("vgId:%d, %s. SyncNode, replicaNum:%d, peersNum:%d, lastConfigIndex:%" PRId64
×
3083
        ", changeVersion:%d, "
3084
        "restoreFinish:%d",
3085
        ths->vgId, str, ths->replicaNum, ths->peersNum, ths->raftCfg.lastConfigIndex, ths->raftCfg.cfg.changeVersion,
3086
        ths->restoreFinish);
3087

3088
  sInfo("vgId:%d, %s, myNodeInfo, clusterId:0x%" PRIx64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str,
×
3089
        ths->myNodeInfo.clusterId, ths->myNodeInfo.nodeId, ths->myNodeInfo.nodeFqdn, ths->myNodeInfo.nodePort,
3090
        ths->myNodeInfo.nodeRole);
3091

3092
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
3093
    sInfo("vgId:%d, %s, peersNodeInfo%d, clusterId:0x%" PRIx64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str,
×
3094
          i, ths->peersNodeInfo[i].clusterId, ths->peersNodeInfo[i].nodeId, ths->peersNodeInfo[i].nodeFqdn,
3095
          ths->peersNodeInfo[i].nodePort, ths->peersNodeInfo[i].nodeRole);
3096
  }
3097

3098
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
3099
    char    buf[256];
×
3100
    int32_t len = 256;
×
3101
    int32_t n = 0;
×
3102
    n += tsnprintf(buf + n, len - n, "%s", "{");
×
3103
    for (int i = 0; i < ths->peersEpset->numOfEps; i++) {
×
3104
      n += tsnprintf(buf + n, len - n, "%s:%d%s", ths->peersEpset->eps[i].fqdn, ths->peersEpset->eps[i].port,
×
3105
                     (i + 1 < ths->peersEpset->numOfEps ? ", " : ""));
×
3106
    }
3107
    n += tsnprintf(buf + n, len - n, "%s", "}");
×
3108

3109
    sInfo("vgId:%d, %s, peersEpset%d, %s, inUse:%d", ths->vgId, str, i, buf, ths->peersEpset->inUse);
×
3110
  }
3111

3112
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
3113
    sInfo("vgId:%d, %s, peersId%d, addr:0x%" PRIx64, ths->vgId, str, i, ths->peersId[i].addr);
×
3114
  }
3115

3116
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3117
    sInfo("vgId:%d, %s, nodeInfo%d, clusterId:0x%" PRIx64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str, i,
×
3118
          ths->raftCfg.cfg.nodeInfo[i].clusterId, ths->raftCfg.cfg.nodeInfo[i].nodeId,
3119
          ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, ths->raftCfg.cfg.nodeInfo[i].nodePort,
3120
          ths->raftCfg.cfg.nodeInfo[i].nodeRole);
3121
  }
3122

3123
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3124
    sInfo("vgId:%d, %s, replicasId%d, addr:0x%" PRIx64, ths->vgId, str, i, ths->replicasId[i].addr);
×
3125
  }
3126
}
×
3127

3128
int32_t syncNodeRebuildPeerAndCfg(SSyncNode* ths, SSyncCfg* cfg) {
×
3129
  int32_t i = 0;
×
3130

3131
  // change peersNodeInfo
3132
  i = 0;
×
3133
  for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
3134
    if (!(strcmp(ths->myNodeInfo.nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
3135
          ths->myNodeInfo.nodePort == cfg->nodeInfo[j].nodePort)) {
×
3136
      ths->peersNodeInfo[i].nodeRole = cfg->nodeInfo[j].nodeRole;
×
3137
      ths->peersNodeInfo[i].clusterId = cfg->nodeInfo[j].clusterId;
×
3138
      tstrncpy(ths->peersNodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn, TSDB_FQDN_LEN);
×
3139
      ths->peersNodeInfo[i].nodeId = cfg->nodeInfo[j].nodeId;
×
3140
      ths->peersNodeInfo[i].nodePort = cfg->nodeInfo[j].nodePort;
×
3141

3142
      syncUtilNodeInfo2EpSet(&ths->peersNodeInfo[i], &ths->peersEpset[i]);
×
3143

3144
      if (syncUtilNodeInfo2RaftId(&ths->peersNodeInfo[i], ths->vgId, &ths->peersId[i]) == false) {
×
3145
        sError("vgId:%d, failed to determine raft member id, peer:%d", ths->vgId, i);
×
3146
        return terrno;
×
3147
      }
3148

3149
      i++;
×
3150
    }
3151
  }
3152
  ths->peersNum = i;
×
3153

3154
  // change cfg nodeInfo
3155
  ths->raftCfg.cfg.replicaNum = 0;
×
3156
  i = 0;
×
3157
  for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
3158
    if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3159
      ths->raftCfg.cfg.replicaNum++;
×
3160
    }
3161
    ths->raftCfg.cfg.nodeInfo[i].nodeRole = cfg->nodeInfo[j].nodeRole;
×
3162
    ths->raftCfg.cfg.nodeInfo[i].clusterId = cfg->nodeInfo[j].clusterId;
×
3163
    tstrncpy(ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn, TSDB_FQDN_LEN);
×
3164
    ths->raftCfg.cfg.nodeInfo[i].nodeId = cfg->nodeInfo[j].nodeId;
×
3165
    ths->raftCfg.cfg.nodeInfo[i].nodePort = cfg->nodeInfo[j].nodePort;
×
3166
    if ((strcmp(ths->myNodeInfo.nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
3167
         ths->myNodeInfo.nodePort == cfg->nodeInfo[j].nodePort)) {
×
3168
      ths->raftCfg.cfg.myIndex = i;
×
3169
    }
3170
    i++;
×
3171
  }
3172
  ths->raftCfg.cfg.totalReplicaNum = i;
×
3173

3174
  return 0;
×
3175
}
3176

3177
void syncNodeChangePeerAndCfgToVoter(SSyncNode* ths, SSyncCfg* cfg) {
×
3178
  // change peersNodeInfo
3179
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
3180
    for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
3181
      if (strcmp(ths->peersNodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
3182
          ths->peersNodeInfo[i].nodePort == cfg->nodeInfo[j].nodePort) {
×
3183
        if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3184
          ths->peersNodeInfo[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3185
        }
3186
      }
3187
    }
3188
  }
3189

3190
  // change cfg nodeInfo
3191
  ths->raftCfg.cfg.replicaNum = 0;
×
3192
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3193
    for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
3194
      if (strcmp(ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
3195
          ths->raftCfg.cfg.nodeInfo[i].nodePort == cfg->nodeInfo[j].nodePort) {
×
3196
        if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3197
          ths->raftCfg.cfg.nodeInfo[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3198
          ths->raftCfg.cfg.replicaNum++;
×
3199
        }
3200
      }
3201
    }
3202
  }
3203
}
×
3204

3205
int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum) {
×
3206
  int32_t code = 0;
×
3207
  // 1.rebuild replicasId, remove deleted one
3208
  SRaftId oldReplicasId[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
×
3209
  memcpy(oldReplicasId, ths->replicasId, sizeof(oldReplicasId));
×
3210

3211
  ths->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3212
  ths->totalReplicaNum = ths->raftCfg.cfg.totalReplicaNum;
×
3213
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3214
    if (syncUtilNodeInfo2RaftId(&ths->raftCfg.cfg.nodeInfo[i], ths->vgId, &ths->replicasId[i]) == false) return terrno;
×
3215
  }
3216

3217
  // 2.rebuild MatchIndex, remove deleted one
3218
  SSyncIndexMgr* oldIndex = ths->pMatchIndex;
×
3219

3220
  ths->pMatchIndex = syncIndexMgrCreate(ths);
×
3221
  if (ths->pMatchIndex == NULL) {
×
3222
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
3223
    if (terrno != 0) code = terrno;
×
3224
    TAOS_RETURN(code);
×
3225
  }
3226

3227
  syncIndexMgrCopyIfExist(ths->pMatchIndex, oldIndex, oldReplicasId);
×
3228

3229
  syncIndexMgrDestroy(oldIndex);
×
3230

3231
  // 3.rebuild NextIndex, remove deleted one
3232
  SSyncIndexMgr* oldNextIndex = ths->pNextIndex;
×
3233

3234
  ths->pNextIndex = syncIndexMgrCreate(ths);
×
3235
  if (ths->pNextIndex == NULL) {
×
3236
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
3237
    if (terrno != 0) code = terrno;
×
3238
    TAOS_RETURN(code);
×
3239
  }
3240

3241
  syncIndexMgrCopyIfExist(ths->pNextIndex, oldNextIndex, oldReplicasId);
×
3242

3243
  syncIndexMgrDestroy(oldNextIndex);
×
3244

3245
  // 4.rebuild pVotesGranted, pVotesRespond, no need to keep old vote state, only rebuild
3246
  voteGrantedUpdate(ths->pVotesGranted, ths);
×
3247
  votesRespondUpdate(ths->pVotesRespond, ths);
×
3248

3249
  // 5.rebuild logReplMgr
3250
  for (int i = 0; i < oldtotalReplicaNum; ++i) {
×
3251
    sDebug("vgId:%d, old logReplMgrs i:%d, peerId:%d, restoreed:%d, [%" PRId64 " %" PRId64 ", %" PRId64 ")", ths->vgId,
×
3252
           i, ths->logReplMgrs[i]->peerId, ths->logReplMgrs[i]->restored, ths->logReplMgrs[i]->startIndex,
3253
           ths->logReplMgrs[i]->matchIndex, ths->logReplMgrs[i]->endIndex);
3254
  }
3255

3256
  SSyncLogReplMgr* oldLogReplMgrs = NULL;
×
3257
  int64_t          length = sizeof(SSyncLogReplMgr) * (TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA);
×
3258
  oldLogReplMgrs = taosMemoryMalloc(length);
×
3259
  if (NULL == oldLogReplMgrs) return terrno;
×
3260
  memset(oldLogReplMgrs, 0, length);
×
3261

3262
  for (int i = 0; i < oldtotalReplicaNum; i++) {
×
3263
    oldLogReplMgrs[i] = *(ths->logReplMgrs[i]);
×
3264
  }
3265

3266
  syncNodeLogReplDestroy(ths);
×
3267
  if ((code = syncNodeLogReplInit(ths)) != 0) {
×
3268
    taosMemoryFree(oldLogReplMgrs);
×
3269
    TAOS_RETURN(code);
×
3270
  }
3271

3272
  for (int i = 0; i < ths->totalReplicaNum; ++i) {
×
3273
    for (int j = 0; j < oldtotalReplicaNum; j++) {
×
3274
      if (syncUtilSameId(&ths->replicasId[i], &oldReplicasId[j])) {
×
3275
        *(ths->logReplMgrs[i]) = oldLogReplMgrs[j];
×
3276
        ths->logReplMgrs[i]->peerId = i;
×
3277
      }
3278
    }
3279
  }
3280

3281
  for (int i = 0; i < ths->totalReplicaNum; ++i) {
×
3282
    sDebug("vgId:%d, new logReplMgrs i:%d, peerId:%d, restoreed:%d, [%" PRId64 " %" PRId64 ", %" PRId64 ")", ths->vgId,
×
3283
           i, ths->logReplMgrs[i]->peerId, ths->logReplMgrs[i]->restored, ths->logReplMgrs[i]->startIndex,
3284
           ths->logReplMgrs[i]->matchIndex, ths->logReplMgrs[i]->endIndex);
3285
  }
3286

3287
  // 6.rebuild sender
3288
  for (int i = 0; i < oldtotalReplicaNum; ++i) {
×
3289
    sDebug("vgId:%d, old sender i:%d, replicaIndex:%d, lastSendTime:%" PRId64, ths->vgId, i,
×
3290
           ths->senders[i]->replicaIndex, ths->senders[i]->lastSendTime)
3291
  }
3292

3293
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
×
3294
    if (ths->senders[i] != NULL) {
×
3295
      sDebug("vgId:%d, snapshot sender destroy while close, data:%p", ths->vgId, ths->senders[i]);
×
3296

3297
      if (snapshotSenderIsStart(ths->senders[i])) {
×
3298
        snapshotSenderStop(ths->senders[i], false);
×
3299
      }
3300

3301
      snapshotSenderDestroy(ths->senders[i]);
×
3302
      ths->senders[i] = NULL;
×
3303
    }
3304
  }
3305

3306
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
×
3307
    SSyncSnapshotSender* pSender = NULL;
×
3308
    int32_t              code = snapshotSenderCreate(ths, i, &pSender);
×
3309
    if (pSender == NULL) return terrno = code;
×
3310

3311
    ths->senders[i] = pSender;
×
3312
    sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender);
×
3313
  }
3314

3315
  for (int i = 0; i < ths->totalReplicaNum; i++) {
×
3316
    sDebug("vgId:%d, new sender i:%d, replicaIndex:%d, lastSendTime:%" PRId64, ths->vgId, i,
×
3317
           ths->senders[i]->replicaIndex, ths->senders[i]->lastSendTime)
3318
  }
3319

3320
  // 7.rebuild synctimer
3321
  if ((code = syncNodeStopHeartbeatTimer(ths)) != 0) {
×
3322
    taosMemoryFree(oldLogReplMgrs);
×
3323
    TAOS_RETURN(code);
×
3324
  }
3325

3326
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
×
3327
    if ((code = syncHbTimerInit(ths, &ths->peerHeartbeatTimerArr[i], ths->replicasId[i])) != 0) {
×
3328
      taosMemoryFree(oldLogReplMgrs);
×
3329
      TAOS_RETURN(code);
×
3330
    }
3331
  }
3332

3333
  if ((code = syncNodeStartHeartbeatTimer(ths)) != 0) {
×
3334
    taosMemoryFree(oldLogReplMgrs);
×
3335
    TAOS_RETURN(code);
×
3336
  }
3337

3338
  // 8.rebuild peerStates
3339
  SPeerState oldState[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA] = {0};
×
3340
  for (int i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; i++) {
×
3341
    oldState[i] = ths->peerStates[i];
×
3342
  }
3343

3344
  for (int i = 0; i < ths->totalReplicaNum; i++) {
×
3345
    for (int j = 0; j < oldtotalReplicaNum; j++) {
×
3346
      if (syncUtilSameId(&ths->replicasId[i], &oldReplicasId[j])) {
×
3347
        ths->peerStates[i] = oldState[j];
×
3348
      }
3349
    }
3350
  }
3351

3352
  taosMemoryFree(oldLogReplMgrs);
×
3353

3354
  return 0;
×
3355
}
3356

3357
void syncNodeChangeToVoter(SSyncNode* ths) {
×
3358
  // replicasId, only need to change replicaNum when 1->3
3359
  ths->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3360
  sDebug("vgId:%d, totalReplicaNum:%d", ths->vgId, ths->totalReplicaNum);
×
3361
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
×
3362
    sDebug("vgId:%d, i:%d, replicaId.addr:0x%" PRIx64, ths->vgId, i, ths->replicasId[i].addr);
×
3363
  }
3364

3365
  // pMatchIndex, pNextIndex, only need to change replicaNum when 1->3
3366
  ths->pMatchIndex->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3367
  ths->pNextIndex->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3368

3369
  sDebug("vgId:%d, pMatchIndex->totalReplicaNum:%d", ths->vgId, ths->pMatchIndex->totalReplicaNum);
×
3370
  for (int32_t i = 0; i < ths->pMatchIndex->totalReplicaNum; ++i) {
×
3371
    sDebug("vgId:%d, i:%d, match.index:%" PRId64, ths->vgId, i, ths->pMatchIndex->index[i]);
×
3372
  }
3373

3374
  // pVotesGranted, pVotesRespond
3375
  voteGrantedUpdate(ths->pVotesGranted, ths);
×
3376
  votesRespondUpdate(ths->pVotesRespond, ths);
×
3377

3378
  // logRepMgrs
3379
  // no need to change logRepMgrs when 1->3
3380
}
×
3381

3382
void syncNodeResetPeerAndCfg(SSyncNode* ths) {
×
3383
  SNodeInfo node = {0};
×
3384
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
3385
    memcpy(&ths->peersNodeInfo[i], &node, sizeof(SNodeInfo));
×
3386
  }
3387

3388
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3389
    memcpy(&ths->raftCfg.cfg.nodeInfo[i], &node, sizeof(SNodeInfo));
×
3390
  }
3391
}
×
3392

3393
int32_t syncNodeChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry, char* str) {
×
3394
  int32_t code = 0;
×
3395
  if (pEntry->originalRpcType != TDMT_SYNC_CONFIG_CHANGE) {
×
3396
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3397
  }
3398

3399
  SMsgHead* head = (SMsgHead*)pEntry->data;
×
3400
  void*     pReq = POINTER_SHIFT(head, sizeof(SMsgHead));
×
3401

3402
  SAlterVnodeTypeReq req = {0};
×
3403
  if (tDeserializeSAlterVnodeReplicaReq(pReq, head->contLen, &req) != 0) {
×
3404
    code = TSDB_CODE_INVALID_MSG;
×
3405
    TAOS_RETURN(code);
×
3406
  }
3407

3408
  SSyncCfg cfg = {0};
×
3409
  syncBuildConfigFromReq(&req, &cfg);
×
3410

3411
  if (cfg.changeVersion <= ths->raftCfg.cfg.changeVersion) {
×
3412
    sInfo(
×
3413
        "vgId:%d, skip conf change entry since lower version. "
3414
        "this entry, index:%" PRId64 ", term:%" PRId64
3415
        ", totalReplicaNum:%d, changeVersion:%d; "
3416
        "current node, replicaNum:%d, peersNum:%d, lastConfigIndex:%" PRId64 ", changeVersion:%d",
3417
        ths->vgId, pEntry->index, pEntry->term, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum, ths->peersNum,
3418
        ths->raftCfg.lastConfigIndex, ths->raftCfg.cfg.changeVersion);
3419
    return 0;
×
3420
  }
3421

3422
  if (strcmp(str, "Commit") == 0) {
×
3423
    sInfo(
×
3424
        "vgId:%d, change config from %s. "
3425
        "this, i:%" PRId64
3426
        ", trNum:%d, vers:%d; "
3427
        "node, rNum:%d, pNum:%d, trNum:%d, "
3428
        "buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
3429
        "), "
3430
        "cond:(next i:%" PRId64 ", t:%" PRId64 " ==%s)",
3431
        ths->vgId, str, pEntry->index - 1, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum, ths->peersNum,
3432
        ths->totalReplicaNum, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex, ths->pLogBuf->matchIndex,
3433
        ths->pLogBuf->endIndex, pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType));
3434
  } else {
3435
    sInfo(
×
3436
        "vgId:%d, change config from %s. "
3437
        "this, i:%" PRId64 ", t:%" PRId64
3438
        ", trNum:%d, vers:%d; "
3439
        "node, rNum:%d, pNum:%d, trNum:%d, "
3440
        "buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
3441
        "), "
3442
        "cond:(pre i:%" PRId64 "==ci:%" PRId64 ", bci:%" PRId64 ")",
3443
        ths->vgId, str, pEntry->index, pEntry->term, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum,
3444
        ths->peersNum, ths->totalReplicaNum, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
3445
        ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex, pEntry->index - 1, ths->commitIndex,
3446
        ths->pLogBuf->commitIndex);
3447
  }
3448

3449
  syncNodeLogConfigInfo(ths, &cfg, "before config change");
×
3450

3451
  int32_t oldTotalReplicaNum = ths->totalReplicaNum;
×
3452

3453
  if (cfg.totalReplicaNum == 1 || cfg.totalReplicaNum == 2) {  // remove replica
×
3454

3455
    bool incfg = false;
×
3456
    for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
×
3457
      if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
×
3458
          ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
×
3459
        incfg = true;
×
3460
        break;
×
3461
      }
3462
    }
3463

3464
    if (incfg) {  // remove other
×
3465
      syncNodeResetPeerAndCfg(ths);
×
3466

3467
      // no need to change myNodeInfo
3468

3469
      if ((code = syncNodeRebuildPeerAndCfg(ths, &cfg)) != 0) {
×
3470
        TAOS_RETURN(code);
×
3471
      };
3472

3473
      if ((code = syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum)) != 0) {
×
3474
        TAOS_RETURN(code);
×
3475
      };
3476
    } else {  // remove myself
3477
      // no need to do anything actually, to change the following to reduce distruptive server chance
3478

3479
      syncNodeResetPeerAndCfg(ths);
×
3480

3481
      // change myNodeInfo
3482
      ths->myNodeInfo.nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3483

3484
      // change peer and cfg
3485
      ths->peersNum = 0;
×
3486
      memcpy(&ths->raftCfg.cfg.nodeInfo[0], &ths->myNodeInfo, sizeof(SNodeInfo));
×
3487
      ths->raftCfg.cfg.replicaNum = 0;
×
3488
      ths->raftCfg.cfg.totalReplicaNum = 1;
×
3489

3490
      // change other
3491
      if ((code = syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum)) != 0) {
×
3492
        TAOS_RETURN(code);
×
3493
      }
3494

3495
      // change state
3496
      ths->state = TAOS_SYNC_STATE_LEARNER;
×
3497
    }
3498

3499
    ths->restoreFinish = false;
×
3500
  } else {                            // add replica, or change replica type
3501
    if (ths->totalReplicaNum == 3) {  // change replica type
×
3502
      sInfo("vgId:%d, begin change replica type", ths->vgId);
×
3503

3504
      // change myNodeInfo
3505
      for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
×
3506
        if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
×
3507
            ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
×
3508
          if (cfg.nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3509
            ths->myNodeInfo.nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3510
          }
3511
        }
3512
      }
3513

3514
      // change peer and cfg
3515
      syncNodeChangePeerAndCfgToVoter(ths, &cfg);
×
3516

3517
      // change other
3518
      syncNodeChangeToVoter(ths);
×
3519

3520
      // change state
3521
      if (ths->state == TAOS_SYNC_STATE_LEARNER) {
×
3522
        if (ths->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3523
          ths->state = TAOS_SYNC_STATE_FOLLOWER;
×
3524
        }
3525
      }
3526

3527
      ths->restoreFinish = false;
×
3528
    } else {  // add replica
3529
      sInfo("vgId:%d, begin add replica", ths->vgId);
×
3530

3531
      // no need to change myNodeInfo
3532

3533
      // change peer and cfg
3534
      if ((code = syncNodeRebuildPeerAndCfg(ths, &cfg)) != 0) {
×
3535
        TAOS_RETURN(code);
×
3536
      };
3537

3538
      // change other
3539
      if ((code = syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum)) != 0) {
×
3540
        TAOS_RETURN(code);
×
3541
      };
3542

3543
      // no need to change state
3544

3545
      if (ths->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_LEARNER) {
×
3546
        ths->restoreFinish = false;
×
3547
      }
3548
    }
3549
  }
3550

3551
  ths->quorum = syncUtilQuorum(ths->replicaNum);
×
3552

3553
  ths->raftCfg.lastConfigIndex = pEntry->index;
×
3554
  ths->raftCfg.cfg.lastIndex = pEntry->index;
×
3555
  ths->raftCfg.cfg.changeVersion = cfg.changeVersion;
×
3556

3557
  syncNodeLogConfigInfo(ths, &cfg, "after config change");
×
3558

3559
  if ((code = syncWriteCfgFile(ths, "apply_config_change_entry")) != 0) {
×
3560
    sError("vgId:%d, failed to create sync cfg file", ths->vgId);
×
3561
    TAOS_RETURN(code);
×
3562
  };
3563

3564
  TAOS_RETURN(code);
×
3565
}
3566

3567
int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry, SRpcMsg* pMsg) {
622,427,801✔
3568
  int32_t code = -1;
622,427,801✔
3569
  if (pEntry->dataLen < sizeof(SMsgHead)) {
622,427,801✔
3570
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
3571
    sError("vgId:%d, msg:%p, cannot append an invalid client request with no msg head, type:%s dataLen:%d", ths->vgId,
×
3572
           pMsg, TMSG_INFO(pEntry->originalRpcType), pEntry->dataLen);
3573
    syncEntryDestroy(pEntry);
×
3574
    pEntry = NULL;
×
3575
    goto _out;
×
3576
  }
3577

3578
  // append to log buffer
3579
  if ((code = syncLogBufferAppend(ths->pLogBuf, ths, pEntry)) < 0) {
622,419,018✔
3580
    sError("vgId:%d, index:%" PRId64 ", failed to enqueue sync log buffer", ths->vgId, pEntry->index);
×
3581
    int32_t ret = 0;
×
3582
    if ((ret = syncFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, terrno, false)) != 0) {
×
3583
      sError("vgId:%d, index:%" PRId64 ", failed to execute fsm since %s", ths->vgId, pEntry->index, tstrerror(ret));
×
3584
    }
3585
    syncEntryDestroy(pEntry);
×
3586
    pEntry = NULL;
×
3587
    goto _out;
×
3588
  }
3589

3590
  code = 0;
622,419,397✔
3591
_out:;
622,419,397✔
3592
  // proceed match index, with replicating on needed
3593
  SyncIndex       matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, NULL, "Append", pMsg);
622,419,397✔
3594
  const STraceId* trace = pEntry ? &pEntry->originRpcTraceId : NULL;
622,423,393✔
3595

3596
  if (pEntry != NULL) {
622,431,251✔
3597
    sGDebug(trace,
622,434,471✔
3598
            "vgId:%d, index:%" PRId64 ", raft entry appended, msg:%p term:%" PRId64 " buf:[%" PRId64 " %" PRId64
3599
            " %" PRId64 ", %" PRId64 ")",
3600
            ths->vgId, pEntry->index, pMsg, pEntry->term, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
3601
            ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex);
3602
  }
3603

3604
  if (code == 0 && ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
622,406,920✔
3605
    int64_t index = syncNodeUpdateAssignedCommitIndex(ths, matchIndex);
32✔
3606
    sGTrace(trace, "vgId:%d, index:%" PRId64 ", update assigned commit, msg:%p", ths->vgId, index, pMsg);
32✔
3607

3608
    if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
64✔
3609
        syncLogBufferCommit(ths->pLogBuf, ths, ths->assignedCommitIndex, trace, "append-entry") < 0) {
32✔
3610
      sGError(trace, "vgId:%d, index:%" PRId64 ", failed to commit, msg:%p commit index:%" PRId64, ths->vgId, index,
×
3611
              pMsg, ths->commitIndex);
3612
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
3613
    }
3614
  }
3615

3616
  // multi replica
3617
  if (ths->replicaNum > 1) {
622,434,733✔
3618
    TAOS_RETURN(code);
40,547,560✔
3619
  }
3620

3621
  // single replica
3622
  SyncIndex returnIndex = syncNodeUpdateCommitIndex(ths, matchIndex);
581,812,146✔
3623
  sGTrace(trace, "vgId:%d, index:%" PRId64 ", raft entry update commit, msg:%p return index:%" PRId64, ths->vgId,
581,854,363✔
3624
          matchIndex, pMsg, returnIndex);
3625

3626
  if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
1,163,689,525✔
3627
      (code = syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex, trace, "append-entry")) < 0) {
581,853,880✔
3628
    sGError(trace,
×
3629
            "vgId:%d, index:%" PRId64 ", failed to commit, msg:%p commit index:%" PRId64 " return index:%" PRId64,
3630
            ths->vgId, matchIndex, pMsg, ths->commitIndex, returnIndex);
3631
  }
3632

3633
  TAOS_RETURN(code);
581,808,283✔
3634
}
3635

3636
bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode) {
619,780,774✔
3637
  if (pSyncNode->totalReplicaNum == 1) {
619,780,774✔
3638
    return false;
578,630,300✔
3639
  }
3640

3641
  int32_t toCount = 0;
41,158,538✔
3642
  int64_t tsNow = taosGetTimestampMs();
41,158,844✔
3643
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
122,967,907✔
3644
    if (pSyncNode->peersNodeInfo[i].nodeRole == TAOS_SYNC_ROLE_LEARNER) {
81,808,680✔
3645
      continue;
643,804✔
3646
    }
3647
    int64_t recvTime = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
81,165,507✔
3648
    if (recvTime == 0 || recvTime == -1) {
81,165,259✔
3649
      continue;
×
3650
    }
3651

3652
    if (tsNow - recvTime > tsHeartbeatTimeout) {
81,165,259✔
3653
      toCount++;
107,705✔
3654
    }
3655
  }
3656

3657
  bool b = (toCount >= pSyncNode->quorum ? true : false);
41,158,844✔
3658

3659
  return b;
41,158,844✔
3660
}
3661

3662
bool syncNodeSnapshotSending(SSyncNode* pSyncNode) {
×
3663
  if (pSyncNode == NULL) return false;
×
3664
  bool b = false;
×
3665
  for (int32_t i = 0; i < pSyncNode->totalReplicaNum; ++i) {
×
3666
    if (pSyncNode->senders[i] != NULL && pSyncNode->senders[i]->start) {
×
3667
      b = true;
×
3668
      break;
×
3669
    }
3670
  }
3671
  return b;
×
3672
}
3673

3674
bool syncNodeSnapshotRecving(SSyncNode* pSyncNode) {
×
3675
  if (pSyncNode == NULL) return false;
×
3676
  if (pSyncNode->pNewNodeReceiver == NULL) return false;
×
3677
  if (pSyncNode->pNewNodeReceiver->start) return true;
×
3678
  return false;
×
3679
}
3680

3681
static int32_t syncNodeAppendNoop(SSyncNode* ths) {
3,181,882✔
3682
  int32_t   code = 0;
3,181,882✔
3683
  SyncIndex index = syncLogBufferGetEndIndex(ths->pLogBuf);
3,181,882✔
3684
  SyncTerm  term = raftStoreGetTerm(ths);
3,183,080✔
3685

3686
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
3,183,080✔
3687
  if (pEntry == NULL) {
3,182,078✔
3688
    code = TSDB_CODE_OUT_OF_MEMORY;
×
3689
    TAOS_RETURN(code);
×
3690
  }
3691

3692
  code = syncNodeAppend(ths, pEntry, NULL);
3,182,078✔
3693
  TAOS_RETURN(code);
3,183,080✔
3694
}
3695

3696
#ifdef BUILD_NO_CALL
3697
static int32_t syncNodeAppendNoopOld(SSyncNode* ths) {
3698
  int32_t ret = 0;
3699

3700
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
3701
  SyncTerm        term = raftStoreGetTerm(ths);
3702
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
3703
  if (pEntry == NULL) return -1;
3704

3705
  LRUHandle* h = NULL;
3706

3707
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
3708
    int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry, false);
3709
    if (code != 0) {
3710
      sError("append noop error");
3711
      return -1;
3712
    }
3713

3714
    syncCacheEntry(ths->pLogStore, pEntry, &h);
3715
  }
3716

3717
  if (h) {
3718
    taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
3719
  } else {
3720
    syncEntryDestroy(pEntry);
3721
  }
3722

3723
  return ret;
3724
}
3725
#endif
3726

3727
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
23,174,960✔
3728
  SyncHeartbeat* pMsg = pRpcMsg->pCont;
23,174,960✔
3729
  bool           resetElect = false;
23,174,960✔
3730

3731
  int64_t tsMs = taosGetTimestampMs();
23,174,960✔
3732

3733
  int64_t lastRecvTime = syncIndexMgrGetRecvTime(ths->pNextIndex, &(pMsg->srcId));
23,174,960✔
3734
  syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs);
23,174,960✔
3735
  syncIndexMgrIncRecvCount(ths->pNextIndex, &(pMsg->srcId));
23,174,960✔
3736

3737
  int64_t netElapsed = tsMs - pMsg->timeStamp;
23,174,960✔
3738
  int64_t timeDiff = tsMs - lastRecvTime;
23,174,960✔
3739
  syncLogRecvHeartbeat(ths, pMsg, netElapsed, &pRpcMsg->info.traceId, timeDiff, pRpcMsg);
23,174,960✔
3740

3741
  if (!syncNodeInRaftGroup(ths, &pMsg->srcId)) {
23,174,960✔
3742
    sWarn(
×
3743
        "vgId:%d, drop heartbeat msg from dnode:%d, because it come from another cluster:%d, differ from current "
3744
        "cluster:%d",
3745
        ths->vgId, DID(&(pMsg->srcId)), CID(&(pMsg->srcId)), CID(&(ths->myRaftId)));
3746
    return 0;
×
3747
  }
3748

3749
  SyncTerm currentTerm = raftStoreGetTerm(ths);
23,174,960✔
3750

3751
  if (pMsg->term > currentTerm && ths->state == TAOS_SYNC_STATE_LEARNER) {
23,174,960✔
3752
    raftStoreSetTerm(ths, pMsg->term);
102,713✔
3753
    currentTerm = pMsg->term;
102,713✔
3754
  }
3755

3756
  int64_t tsMs2 = taosGetTimestampMs();
23,174,960✔
3757

3758
  int64_t processTime = tsMs2 - tsMs;
23,174,960✔
3759
  if (processTime > SYNC_HEARTBEAT_SLOW_MS) {
23,174,960✔
3760
    sGError(&pRpcMsg->info.traceId,
×
3761
            "vgId:%d, process sync-heartbeat msg from dnode:%d, commit-index:%" PRId64 ", cluster:%d msgTerm:%" PRId64
3762
            " currentTerm:%" PRId64 ", processTime:%" PRId64,
3763
            ths->vgId, DID(&(pMsg->srcId)), pMsg->commitIndex, CID(&(pMsg->srcId)), pMsg->term, currentTerm,
3764
            processTime);
3765
  } else {
3766
    sGDebug(&pRpcMsg->info.traceId,
23,174,960✔
3767
            "vgId:%d, process sync-heartbeat msg from dnode:%d, commit-index:%" PRId64 ", cluster:%d msgTerm:%" PRId64
3768
            " currentTerm:%" PRId64 ", processTime:%" PRId64,
3769
            ths->vgId, DID(&(pMsg->srcId)), pMsg->commitIndex, CID(&(pMsg->srcId)), pMsg->term, currentTerm,
3770
            processTime);
3771
  }
3772

3773
  if (pMsg->term == currentTerm &&
23,174,960✔
3774
      (ths->state != TAOS_SYNC_STATE_LEADER && ths->state != TAOS_SYNC_STATE_ASSIGNED_LEADER)) {
23,100,363✔
3775
    resetElect = true;
23,100,363✔
3776

3777
    ths->minMatchIndex = pMsg->minMatchIndex;
23,100,363✔
3778

3779
    if (ths->state == TAOS_SYNC_STATE_FOLLOWER || ths->state == TAOS_SYNC_STATE_LEARNER) {
23,100,363✔
3780
      SRpcMsg rpcMsgLocalCmd = {0};
23,095,811✔
3781
      TAOS_CHECK_RETURN(syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId));
23,095,913✔
3782
      rpcMsgLocalCmd.info.traceId = pRpcMsg->info.traceId;
23,094,991✔
3783

3784
      SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
23,095,297✔
3785
      pSyncMsg->cmd =
23,095,297✔
3786
          (ths->state == TAOS_SYNC_STATE_LEARNER) ? SYNC_LOCAL_CMD_LEARNER_CMT : SYNC_LOCAL_CMD_FOLLOWER_CMT;
23,095,297✔
3787
      pSyncMsg->commitIndex = pMsg->commitIndex;
23,094,991✔
3788
      pSyncMsg->currentTerm = pMsg->term;
23,094,991✔
3789

3790
      if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
23,094,991✔
3791
        int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
23,095,913✔
3792
        if (code != 0) {
23,095,297✔
3793
          sError("vgId:%d, failed to enqueue sync-local-cmd msg(cmd=commit) from heartbeat since %s",
×
3794
                 ths->vgId, tstrerror(code));
3795
          rpcFreeCont(rpcMsgLocalCmd.pCont);
×
3796
        } else {
3797
          sGTrace(&pRpcMsg->info.traceId,
23,095,297✔
3798
                  "vgId:%d, enqueue sync-local-cmd msg(cmd=commit) from heartbeat, commit-index:%" PRId64
3799
                  ", term:%" PRId64,
3800
                  ths->vgId, pMsg->commitIndex, pMsg->term);
3801
        }
3802
      }
3803
    }
3804
  }
3805

3806
  if (pMsg->term >= currentTerm &&
23,174,038✔
3807
      (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER)) {
23,173,956✔
UNCOV
3808
    SRpcMsg rpcMsgLocalCmd = {0};
×
3809
    TAOS_CHECK_RETURN(syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId));
×
3810
    rpcMsgLocalCmd.info.traceId = pRpcMsg->info.traceId;
×
3811

3812
    SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
×
3813
    pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN;
×
3814
    pSyncMsg->currentTerm = pMsg->term;
×
3815
    pSyncMsg->commitIndex = pMsg->commitIndex;
×
3816

3817
    if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
×
3818
      int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
×
3819
      if (code != 0) {
×
3820
        sError("vgId:%d, sync enqueue sync-local-cmd msg(cmd=step-down) error, code:%d", ths->vgId, code);
×
3821
        rpcFreeCont(rpcMsgLocalCmd.pCont);
×
3822
      } else {
3823
        sTrace("vgId:%d, sync enqueue sync-local-cmd msg(cmd=step-down), new-term:%" PRId64, ths->vgId, pMsg->term);
×
3824
      }
3825
    }
3826
  }
3827

3828
  SRpcMsg rpcMsg = {0};
23,174,120✔
3829
  TAOS_CHECK_RETURN(syncBuildHeartbeatReply(&rpcMsg, ths->vgId));
23,173,422✔
3830
  SyncHeartbeatReply* pMsgReply = rpcMsg.pCont;
23,173,650✔
3831
  pMsgReply->destId = pMsg->srcId;
23,173,650✔
3832
  pMsgReply->srcId = ths->myRaftId;
23,173,116✔
3833
  pMsgReply->term = currentTerm;
23,173,116✔
3834
  pMsgReply->privateTerm = 8864;  // magic number
23,174,960✔
3835
  pMsgReply->startTime = ths->startTime;
23,174,344✔
3836
  pMsgReply->timeStamp = tsMs;
23,174,262✔
3837
  rpcMsg.info.traceId = pRpcMsg->info.traceId;
23,173,034✔
3838

3839
  // reply
3840
  int64_t tsMs3 = taosGetTimestampMs();
23,173,034✔
3841

3842
  int64_t processTime2 = tsMs3 - tsMs2;
23,173,034✔
3843
  TRACE_SET_MSGID(&(rpcMsg.info.traceId), tGenIdPI64());
23,173,034✔
3844
  if (processTime2 > SYNC_HEARTBEAT_SLOW_MS) {
23,174,796✔
3845
    sGError(&rpcMsg.info.traceId,
2,460✔
3846
            "vgId:%d, send sync-heartbeat-reply to dnode:%d term:%" PRId64 " timestamp:%" PRId64
3847
            ", processTime:%" PRId64,
3848
            ths->vgId, DID(&(pMsgReply->destId)), pMsgReply->term, pMsgReply->timeStamp, processTime2);
3849
  } else {
3850
    if(tsSyncLogHeartbeat){
23,172,336✔
3851
      sGInfo(&rpcMsg.info.traceId,
×
3852
            "vgId:%d, send sync-heartbeat-reply to dnode:%d term:%" PRId64 " timestamp:%" PRId64
3853
            ", processTime:%" PRId64,
3854
            ths->vgId, DID(&(pMsgReply->destId)), pMsgReply->term, pMsgReply->timeStamp, processTime2);
3855
    }
3856
    else{
3857
      sGDebug(&rpcMsg.info.traceId,
23,172,336✔
3858
            "vgId:%d, send sync-heartbeat-reply to dnode:%d term:%" PRId64 " timestamp:%" PRId64
3859
            ", processTime:%" PRId64,
3860
            ths->vgId, DID(&(pMsgReply->destId)), pMsgReply->term, pMsgReply->timeStamp, processTime2);
3861
    }
3862
  }
3863

3864
  TAOS_CHECK_RETURN(syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg));
23,174,796✔
3865

3866
  if (resetElect) syncNodeResetElectTimer(ths);
23,174,960✔
3867
  return 0;
23,174,960✔
3868
}
3869

3870
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
22,970,173✔
3871
  int32_t code = 0;
22,970,173✔
3872

3873
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
22,970,173✔
3874
  SSyncLogReplMgr*    pMgr = syncNodeGetLogReplMgr(ths, &pMsg->srcId);
22,970,173✔
3875
  if (pMgr == NULL) {
22,970,173✔
3876
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
3877
    if (terrno != 0) code = terrno;
×
3878
    sError("vgId:%d, failed to get log repl mgr for the peer at addr 0x016%" PRIx64, ths->vgId, pMsg->srcId.addr);
×
3879
    TAOS_RETURN(code);
×
3880
  }
3881

3882
  int64_t tsMs = taosGetTimestampMs();
22,970,173✔
3883
  int64_t lastRecvTime = syncIndexMgrGetRecvTime(ths->pMatchIndex, &pMsg->srcId);
22,970,173✔
3884
  syncLogRecvHeartbeatReply(ths, pMsg, tsMs - pMsg->timeStamp, &pRpcMsg->info.traceId, tsMs - lastRecvTime);
22,970,173✔
3885

3886
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);
22,970,173✔
3887
  syncIndexMgrIncRecvCount(ths->pMatchIndex, &(pMsg->srcId));
22,970,173✔
3888

3889
  return syncLogReplProcessHeartbeatReply(pMgr, ths, pMsg);
22,970,173✔
3890
}
3891

3892
#ifdef BUILD_NO_CALL
3893
int32_t syncNodeOnHeartbeatReplyOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
3894
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
3895

3896
  int64_t tsMs = taosGetTimestampMs();
3897
  int64_t timeDiff = tsMs - pMsg->timeStamp;
3898
  syncLogRecvHeartbeatReply(ths, pMsg, timeDiff, &pRpcMsg->info.traceId, timeDiff);
3899

3900
  // update last reply time, make decision whether the other node is alive or not
3901
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);
3902
  return 0;
3903
}
3904
#endif
3905

3906
int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
23,095,913✔
3907
  SyncLocalCmd* pMsg = pRpcMsg->pCont;
23,095,913✔
3908
  syncLogRecvLocalCmd(ths, pMsg, &pRpcMsg->info.traceId);
23,095,913✔
3909

3910
  if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
23,095,913✔
3911
    SRaftId id = EMPTY_RAFT_ID;
×
3912
    syncNodeStepDown(ths, pMsg->currentTerm, id, "localCmd");
×
3913

3914
  } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT || pMsg->cmd == SYNC_LOCAL_CMD_LEARNER_CMT) {
46,191,826✔
3915
    if (syncLogBufferIsEmpty(ths->pLogBuf)) {
23,095,811✔
3916
      sError("vgId:%d, sync log buffer is empty", ths->vgId);
×
3917
      return 0;
×
3918
    }
3919
    SyncTerm matchTerm = syncLogBufferGetLastMatchTerm(ths->pLogBuf);
23,095,811✔
3920
    if (matchTerm < 0) {
23,095,913✔
3921
      return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3922
    }
3923
    if (pMsg->currentTerm == matchTerm) {
23,095,913✔
3924
      SyncIndex returnIndex = syncNodeUpdateCommitIndex(ths, pMsg->commitIndex);
21,821,849✔
3925
      sTrace("vgId:%d, raft entry update commit, return index:%" PRId64, ths->vgId, returnIndex);
21,821,849✔
3926
    }
3927
    if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
46,191,826✔
3928
        syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex, &pRpcMsg->info.traceId, "heartbeat") < 0) {
23,095,913✔
UNCOV
3929
      sError("vgId:%d, failed to commit raft log since %s. commit index:%" PRId64, ths->vgId, terrstr(),
×
3930
             ths->commitIndex);
3931
    }
3932
  } else {
3933
    sError("error local cmd");
×
3934
  }
3935

3936
  return 0;
23,095,913✔
3937
}
3938

3939
// TLA+ Spec
3940
// ClientRequest(i, v) ==
3941
//     /\ state[i] = Leader
3942
//     /\ LET entry == [term  |-> currentTerm[i],
3943
//                      value |-> v]
3944
//            newLog == Append(log[i], entry)
3945
//        IN  log' = [log EXCEPT ![i] = newLog]
3946
//     /\ UNCHANGED <<messages, serverVars, candidateVars,
3947
//                    leaderVars, commitIndex>>
3948
//
3949

3950
int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex) {
619,255,676✔
3951
  sGDebug(&pMsg->info.traceId, "vgId:%d, msg:%p, process client request", ths->vgId, pMsg);
619,255,676✔
3952
  int32_t code = 0;
619,255,676✔
3953

3954
  SyncIndex       index = syncLogBufferGetEndIndex(ths->pLogBuf);
619,255,676✔
3955
  SyncTerm        term = raftStoreGetTerm(ths);
619,267,383✔
3956
  SSyncRaftEntry* pEntry = NULL;
619,267,668✔
3957
  if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
619,267,668✔
3958
    pEntry = syncEntryBuildFromClientRequest(pMsg->pCont, term, index, &pMsg->info.traceId);
72,242,935✔
3959
  } else {
3960
    pEntry = syncEntryBuildFromRpcMsg(pMsg, term, index);
547,010,754✔
3961
  }
3962

3963
  if (pEntry == NULL) {
619,249,243✔
3964
    sGError(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to process client request since %s", ths->vgId, pMsg,
×
3965
            terrstr());
3966
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3967
  }
3968

3969
  // 1->2, config change is add in write thread, and will continue in sync thread
3970
  // need save message for it
3971
  if (pMsg->msgType == TDMT_SYNC_CONFIG_CHANGE) {
619,249,243✔
3972
    SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
×
3973
    uint64_t  seqNum = syncRespMgrAdd(ths->pSyncRespMgr, &stub);
×
3974
    pEntry->seqNum = seqNum;
×
3975
  }
3976

3977
  if (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
619,250,040✔
3978
    if (pRetIndex) {
619,256,269✔
3979
      (*pRetIndex) = index;
547,008,379✔
3980
    }
3981

3982
    if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) {
619,256,169✔
3983
      int32_t code = syncNodeCheckChangeConfig(ths, pEntry);
×
3984
      if (code < 0) {
×
3985
        sGError(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to check change config since %s", ths->vgId, pMsg,
×
3986
                terrstr());
3987
        syncEntryDestroy(pEntry);
×
3988
        pEntry = NULL;
×
3989
        TAOS_RETURN(code);
×
3990
      }
3991

3992
      if (code > 0) {
×
3993
        SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
×
3994
        int32_t num = syncRespMgrGetAndDel(ths->pSyncRespMgr, pEntry->seqNum, &rsp.info);
×
3995
        sGDebug(&pMsg->info.traceId, "vgId:%d, msg:%p, get response stub for config change, seqNum:%" PRIu64 " num:%d",
×
3996
                ths->vgId, pMsg, pEntry->seqNum, num);
3997
        if (rsp.info.handle != NULL) {
×
3998
          tmsgSendRsp(&rsp);
×
3999
        }
4000
        syncEntryDestroy(pEntry);
×
4001
        pEntry = NULL;
×
4002
        TAOS_RETURN(code);
×
4003
      }
4004
    }
4005

4006
    code = syncNodeAppend(ths, pEntry, pMsg);
619,242,174✔
4007
    return code;
619,206,957✔
4008
  } else {
4009
    syncEntryDestroy(pEntry);
×
4010
    pEntry = NULL;
×
4011
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
4012
  }
4013
}
4014

4015
const char* syncStr(ESyncState state) {
245,917,543✔
4016
  switch (state) {
245,917,543✔
4017
    case TAOS_SYNC_STATE_FOLLOWER:
130,906,065✔
4018
      return "follower";
130,906,065✔
4019
    case TAOS_SYNC_STATE_CANDIDATE:
935,468✔
4020
      return "candidate";
935,468✔
4021
    case TAOS_SYNC_STATE_LEADER:
104,220,926✔
4022
      return "leader";
104,220,926✔
4023
    case TAOS_SYNC_STATE_ERROR:
×
4024
      return "error";
×
4025
    case TAOS_SYNC_STATE_OFFLINE:
1,174,264✔
4026
      return "offline";
1,174,264✔
4027
    case TAOS_SYNC_STATE_LEARNER:
8,676,060✔
4028
      return "learner";
8,676,060✔
4029
    case TAOS_SYNC_STATE_ASSIGNED_LEADER:
96✔
4030
      return "assigned leader";
96✔
4031
    default:
4,664✔
4032
      return "unknown";
4,664✔
4033
  }
4034
}
4035

4036
int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) {
332,204✔
4037
  for (int32_t i = 0; i < pNewCfg->totalReplicaNum; ++i) {
364,355✔
4038
    SRaftId raftId = {
364,355✔
4039
        .addr = SYNC_ADDR(&pNewCfg->nodeInfo[i]),
364,355✔
4040
        .vgId = ths->vgId,
364,355✔
4041
    };
4042

4043
    if (syncUtilSameId(&(ths->myRaftId), &raftId)) {
364,355✔
4044
      pNewCfg->myIndex = i;
332,204✔
4045
      return 0;
332,204✔
4046
    }
4047
  }
4048

4049
  return TSDB_CODE_SYN_INTERNAL_ERROR;
×
4050
}
4051

4052
bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) {
619,781,513✔
4053
  return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1);
619,781,513✔
4054
}
4055

4056
bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId) {
214,649,572✔
4057
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
429,688,186✔
4058
    if (syncUtilSameId(&((ths->replicasId)[i]), pRaftId)) {
429,688,284✔
4059
      return true;
214,649,670✔
4060
    }
4061
  }
4062
  return false;
×
4063
}
4064

4065
SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId) {
15,553,543✔
4066
  SSyncSnapshotSender* pSender = NULL;
15,553,543✔
4067
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
62,737,863✔
4068
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
47,196,046✔
4069
      pSender = (ths->senders)[i];
15,551,873✔
4070
    }
4071
  }
4072
  return pSender;
15,551,282✔
4073
}
4074

4075
SSyncTimer* syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId) {
10,819,173✔
4076
  SSyncTimer* pTimer = NULL;
10,819,173✔
4077
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
46,733,438✔
4078
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
35,913,454✔
4079
      pTimer = &((ths->peerHeartbeatTimerArr)[i]);
10,821,558✔
4080
    }
4081
  }
4082
  return pTimer;
10,821,476✔
4083
}
4084

4085
SPeerState* syncNodeGetPeerState(SSyncNode* ths, const SRaftId* pDestId) {
928,420✔
4086
  SPeerState* pState = NULL;
928,420✔
4087
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
3,279,349✔
4088
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
2,350,929✔
4089
      pState = &((ths->peerStates)[i]);
928,420✔
4090
    }
4091
  }
4092
  return pState;
928,420✔
4093
}
4094

4095
#ifdef BUILD_NO_CALL
4096
bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const SyncAppendEntries* pMsg) {
4097
  SPeerState* pState = syncNodeGetPeerState(ths, pDestId);
4098
  if (pState == NULL) {
4099
    sError("vgId:%d, replica maybe dropped", ths->vgId);
4100
    return false;
4101
  }
4102

4103
  SyncIndex sendIndex = pMsg->prevLogIndex + 1;
4104
  int64_t   tsNow = taosGetTimestampMs();
4105

4106
  if (pState->lastSendIndex == sendIndex && tsNow - pState->lastSendTime < SYNC_APPEND_ENTRIES_TIMEOUT_MS) {
4107
    return false;
4108
  }
4109

4110
  return true;
4111
}
4112

4113
bool syncNodeCanChange(SSyncNode* pSyncNode) {
4114
  if (pSyncNode->changing) {
4115
    sError("sync cannot change");
4116
    return false;
4117
  }
4118

4119
  if ((pSyncNode->commitIndex >= SYNC_INDEX_BEGIN)) {
4120
    SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
4121
    if (pSyncNode->commitIndex != lastIndex) {
4122
      sError("sync cannot change2");
4123
      return false;
4124
    }
4125
  }
4126

4127
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
4128
    SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->peersId)[i]);
4129
    if (pSender != NULL && pSender->start) {
4130
      sError("sync cannot change3");
4131
      return false;
4132
    }
4133
  }
4134

4135
  return true;
4136
}
4137
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc