• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4877

11 Dec 2025 02:43AM UTC coverage: 64.586% (-0.05%) from 64.632%
#4877

push

travis-ci

guanshengliang
feat(TS-7270): internal dependence

307 of 617 new or added lines in 24 files covered. (49.76%)

673 existing lines in 130 files now uncovered.

163673 of 253417 relevant lines covered (64.59%)

105540806.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.17
/source/libs/sync/src/syncMain.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "sync.h"
18
#include "syncAppendEntries.h"
19
#include "syncAppendEntriesReply.h"
20
#include "syncCommit.h"
21
#include "syncElection.h"
22
#include "syncEnv.h"
23
#include "syncIndexMgr.h"
24
#include "syncInt.h"
25
#include "syncMessage.h"
26
#include "syncPipeline.h"
27
#include "syncRaftCfg.h"
28
#include "syncRaftLog.h"
29
#include "syncRaftStore.h"
30
#include "syncReplication.h"
31
#include "syncRequestVote.h"
32
#include "syncRequestVoteReply.h"
33
#include "syncRespMgr.h"
34
#include "syncSnapshot.h"
35
#include "syncTimeout.h"
36
#include "syncUtil.h"
37
#include "syncVoteMgr.h"
38
#include "tglobal.h"
39
#include "tmisce.h"
40
#include "tref.h"
41

42
static void    syncNodeEqPingTimer(void* param, void* tmrId);
43
static void    syncNodeEqElectTimer(void* param, void* tmrId);
44
static void    syncNodeEqHeartbeatTimer(void* param, void* tmrId);
45
static int32_t syncNodeAppendNoop(SSyncNode* ths);
46
static void    syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId);
47
static bool    syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg);
48
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId);
49
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
50
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
51
static int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg);
52
static bool    syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config);
53
static int32_t syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex);
54
static bool    syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg);
55

56
static bool    syncNodeCanChange(SSyncNode* pSyncNode);
57
static int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode);
58
static int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader);
59
static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry);
60

61
static ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
62

63
int64_t syncOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
4,731,072✔
64
  sInfo("vgId:%d, start to open sync", pSyncInfo->vgId);
4,731,072✔
65

66
  SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo, vnodeVersion, pSyncInfo->electMs, pSyncInfo->heartbeatMs);
4,730,711✔
67
  if (pSyncNode == NULL) {
4,732,043✔
68
    sError("vgId:%d, failed to open sync node", pSyncInfo->vgId);
×
69
    return -1;
×
70
  }
71

72
  pSyncNode->rid = syncNodeAdd(pSyncNode);
4,732,043✔
73
  if (pSyncNode->rid < 0) {
4,732,043✔
74
    syncNodeClose(pSyncNode);
×
75
    return -1;
×
76
  }
77

78
  pSyncNode->pingBaseLine = pSyncInfo->pingMs;
4,732,043✔
79
  pSyncNode->pingTimerMS = pSyncInfo->pingMs;
4,732,043✔
80
  pSyncNode->electBaseLine = pSyncInfo->electMs;
4,732,043✔
81
  pSyncNode->hbBaseLine = pSyncInfo->heartbeatMs;
4,732,043✔
82
  pSyncNode->heartbeatTimerMS = pSyncInfo->heartbeatMs;
4,732,043✔
83
  pSyncNode->msgcb = pSyncInfo->msgcb;
4,732,043✔
84
  sInfo("vgId:%d, sync opened, electBaseLine:%d, hbBaseLine:%d", pSyncInfo->vgId, pSyncNode->electBaseLine,
4,732,043✔
85
        pSyncNode->hbBaseLine);
86
  return pSyncNode->rid;
4,732,150✔
87
}
88

89
int32_t syncStart(int64_t rid) {
4,731,224✔
90
  int32_t    code = 0;
4,731,224✔
91
  int32_t    vgId = 0;
4,731,224✔
92
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
4,731,224✔
93
  if (pSyncNode == NULL) {
4,731,224✔
94
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
95
    if (terrno != 0) code = terrno;
×
96
    sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid);
×
97
    TAOS_RETURN(code);
×
98
  }
99
  vgId = pSyncNode->vgId;
4,731,224✔
100
  sInfo("vgId:%d, begin to start sync", pSyncNode->vgId);
4,731,224✔
101

102
  if ((code = syncNodeRestore(pSyncNode)) < 0) {
4,731,224✔
103
    sError("vgId:%d, failed to restore sync log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
104
    goto _err;
×
105
  }
106
  sInfo("vgId:%d, sync node restore is executed", pSyncNode->vgId);
4,731,224✔
107

108
  if ((code = syncNodeStart(pSyncNode)) < 0) {
4,731,224✔
109
    sError("vgId:%d, failed to start sync node since %s", pSyncNode->vgId, tstrerror(code));
×
110
    goto _err;
×
111
  }
112
  sInfo("vgId:%d, sync node start is executed", pSyncNode->vgId);
4,731,224✔
113

114
  syncNodeRelease(pSyncNode);
4,731,224✔
115

116
  sInfo("vgId:%d, sync started", vgId);
4,731,224✔
117

118
  TAOS_RETURN(code);
4,731,224✔
119

120
_err:
×
121
  syncNodeRelease(pSyncNode);
×
122
  TAOS_RETURN(code);
×
123
}
124

125
int32_t syncNodeGetConfig(int64_t rid, SSyncCfg* cfg) {
5,361,536✔
126
  int32_t    code = 0;
5,361,536✔
127
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
5,361,536✔
128

129
  if (pSyncNode == NULL) {
5,361,536✔
130
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
131
    if (terrno != 0) code = terrno;
×
132
    sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid);
×
133
    TAOS_RETURN(code);
×
134
  }
135

136
  *cfg = pSyncNode->raftCfg.cfg;
5,361,536✔
137

138
  syncNodeRelease(pSyncNode);
5,360,949✔
139

140
  return 0;
5,361,536✔
141
}
142

143
void syncStop(int64_t rid) {
4,731,224✔
144
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
4,731,224✔
145
  if (pSyncNode != NULL) {
4,731,224✔
146
    pSyncNode->isStart = false;
4,731,224✔
147
    syncNodeRelease(pSyncNode);
4,731,224✔
148
    syncNodeRemove(rid);
4,731,224✔
149
  }
150
}
4,731,224✔
151

152
void syncPreStop(int64_t rid) {
4,730,818✔
153
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
4,730,818✔
154
  if (pSyncNode != NULL) {
4,731,224✔
155
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
4,731,224✔
156
      sInfo("vgId:%d, stop snapshot receiver", pSyncNode->vgId);
309✔
157
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
309✔
158
    }
159
    syncNodePreClose(pSyncNode);
4,731,224✔
160
    syncNodeRelease(pSyncNode);
4,728,906✔
161
  }
162
}
4,729,849✔
163

164
void syncPostStop(int64_t rid) {
4,226,918✔
165
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
4,226,918✔
166
  if (pSyncNode != NULL) {
4,226,918✔
167
    syncNodePostClose(pSyncNode);
4,226,918✔
168
    syncNodeRelease(pSyncNode);
4,226,918✔
169
  }
170
}
4,226,810✔
171

172
static bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pCfg) {
452,429✔
173
  if (!syncNodeInConfig(pSyncNode, pCfg)) return false;
452,429✔
174
  return abs(pCfg->replicaNum - pSyncNode->replicaNum) <= 1;
452,429✔
175
}
176

177
int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) {
495,473✔
178
  int32_t    code = 0;
495,473✔
179
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
495,473✔
180
  if (pSyncNode == NULL) {
495,473✔
181
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
182
    if (terrno != 0) code = terrno;
×
183
    TAOS_RETURN(code);
×
184
  }
185

186
  if (pSyncNode->raftCfg.lastConfigIndex >= pNewCfg->lastIndex) {
495,473✔
187
    syncNodeRelease(pSyncNode);
43,044✔
188
    sInfo("vgId:%d, no need Reconfig, current index:%" PRId64 ", new index:%" PRId64, pSyncNode->vgId,
43,044✔
189
          pSyncNode->raftCfg.lastConfigIndex, pNewCfg->lastIndex);
190
    return 0;
43,044✔
191
  }
192

193
  if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) {
452,429✔
194
    syncNodeRelease(pSyncNode);
×
195
    code = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
×
196
    sError("vgId:%d, failed to reconfig since invalid new config", pSyncNode->vgId);
×
197
    TAOS_RETURN(code);
×
198
  }
199

200
  TAOS_CHECK_RETURN(syncNodeUpdateNewConfigIndex(pSyncNode, pNewCfg));
452,429✔
201

202
  if (syncNodeDoConfigChange(pSyncNode, pNewCfg, pNewCfg->lastIndex) != 0) {
452,429✔
203
    code = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
×
204
    sError("vgId:%d, failed to reconfig since do change error", pSyncNode->vgId);
×
205
    TAOS_RETURN(code);
×
206
  }
207

208
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER || pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
452,429✔
209
    // TODO check return value
210
    TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
391,048✔
211

212
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
6,256,768✔
213
      TAOS_CHECK_RETURN(syncHbTimerInit(pSyncNode, &pSyncNode->peerHeartbeatTimerArr[i], pSyncNode->replicasId[i]));
5,865,720✔
214
    }
215

216
    TAOS_CHECK_RETURN(syncNodeStartHeartbeatTimer(pSyncNode));
391,048✔
217
    // syncNodeReplicate(pSyncNode);
218
  }
219

220
  syncNodeRelease(pSyncNode);
452,429✔
221
  TAOS_RETURN(code);
452,429✔
222
}
223

224
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
293,363,686✔
225
  int32_t code = -1;
293,363,686✔
226
  if (!syncIsInit()) {
293,363,686✔
227
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
228
    if (terrno != 0) code = terrno;
×
229
    TAOS_RETURN(code);
×
230
  }
231

232
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
293,364,447✔
233
  if (pSyncNode == NULL) {
293,364,510✔
234
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
235
    if (terrno != 0) code = terrno;
×
236
    TAOS_RETURN(code);
×
237
  }
238

239
  switch (pMsg->msgType) {
293,364,510✔
240
    case TDMT_SYNC_HEARTBEAT:
32,042,052✔
241
      code = syncNodeOnHeartbeat(pSyncNode, pMsg);
32,042,052✔
242
      break;
32,041,356✔
243
    case TDMT_SYNC_HEARTBEAT_REPLY:
31,643,881✔
244
      code = syncNodeOnHeartbeatReply(pSyncNode, pMsg);
31,643,881✔
245
      break;
31,643,881✔
246
    case TDMT_SYNC_TIMEOUT:
27,493,973✔
247
      code = syncNodeOnTimeout(pSyncNode, pMsg);
27,493,973✔
248
      break;
27,481,012✔
249
    case TDMT_SYNC_TIMEOUT_ELECTION:
740,626✔
250
      code = syncNodeOnTimeout(pSyncNode, pMsg);
740,626✔
251
      break;
740,626✔
252
    case TDMT_SYNC_CLIENT_REQUEST:
55,738,862✔
253
      code = syncNodeOnClientRequest(pSyncNode, pMsg, NULL);
55,738,862✔
254
      break;
55,738,862✔
255
    case TDMT_SYNC_REQUEST_VOTE:
1,172,954✔
256
      code = syncNodeOnRequestVote(pSyncNode, pMsg);
1,172,954✔
257
      break;
1,172,954✔
258
    case TDMT_SYNC_REQUEST_VOTE_REPLY:
1,120,004✔
259
      code = syncNodeOnRequestVoteReply(pSyncNode, pMsg);
1,120,004✔
260
      break;
1,120,004✔
261
    case TDMT_SYNC_APPEND_ENTRIES:
55,522,293✔
262
      code = syncNodeOnAppendEntries(pSyncNode, pMsg);
55,522,293✔
263
      break;
55,522,255✔
264
    case TDMT_SYNC_APPEND_ENTRIES_REPLY:
55,446,394✔
265
      code = syncNodeOnAppendEntriesReply(pSyncNode, pMsg);
55,446,394✔
266
      break;
55,446,394✔
267
    case TDMT_SYNC_SNAPSHOT_SEND:
324,633✔
268
      code = syncNodeOnSnapshot(pSyncNode, pMsg);
324,633✔
269
      break;
324,633✔
270
    case TDMT_SYNC_SNAPSHOT_RSP:
322,113✔
271
      code = syncNodeOnSnapshotRsp(pSyncNode, pMsg);
322,113✔
272
      break;
322,113✔
273
    case TDMT_SYNC_LOCAL_CMD:
31,792,398✔
274
      code = syncNodeOnLocalCmd(pSyncNode, pMsg);
31,792,398✔
275
      break;
31,791,660✔
276
    case TDMT_SYNC_FORCE_FOLLOWER:
4,158✔
277
      code = syncForceBecomeFollower(pSyncNode, pMsg);
4,158✔
278
      break;
4,158✔
279
    case TDMT_SYNC_SET_ASSIGNED_LEADER:
407✔
280
      code = syncBecomeAssignedLeader(pSyncNode, pMsg);
407✔
281
      break;
261✔
282
    default:
×
283
      code = TSDB_CODE_MSG_NOT_PROCESSED;
×
284
  }
285

286
  syncNodeRelease(pSyncNode);
293,350,169✔
287
  if (code != 0) {
293,349,024✔
288
    sDebug("vgId:%d, failed to process sync msg:%p type:%s since %s", pSyncNode->vgId, pMsg, TMSG_INFO(pMsg->msgType),
12,115✔
289
           tstrerror(code));
290
  }
291
  TAOS_RETURN(code);
293,349,024✔
292
}
293

294
int32_t syncLeaderTransfer(int64_t rid) {
4,731,224✔
295
  int32_t    code = 0;
4,731,224✔
296
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
4,731,224✔
297
  if (pSyncNode == NULL) {
4,731,224✔
298
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
299
    if (terrno != 0) code = terrno;
×
300
    TAOS_RETURN(code);
×
301
  }
302

303
  int32_t ret = syncNodeLeaderTransfer(pSyncNode);
4,731,224✔
304
  syncNodeRelease(pSyncNode);
4,731,224✔
305
  return ret;
4,731,224✔
306
}
307

308
int32_t syncForceBecomeFollower(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
4,158✔
309
  SRaftId id = {0};
4,158✔
310
  syncNodeBecomeFollower(ths, id, "force election");
4,158✔
311

312
  SRpcMsg rsp = {
8,316✔
313
      .code = 0,
314
      .pCont = pRpcMsg->info.rsp,
4,158✔
315
      .contLen = pRpcMsg->info.rspLen,
4,158✔
316
      .info = pRpcMsg->info,
317
  };
318
  tmsgSendRsp(&rsp);
4,158✔
319

320
  return 0;
4,158✔
321
}
322

323
int32_t syncBecomeAssignedLeader(SSyncNode* ths, SRpcMsg* pRpcMsg) {
261✔
324
  int32_t code = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
261✔
325
  void*   pHead = NULL;
261✔
326
  int32_t contLen = 0;
261✔
327

328
  SVArbSetAssignedLeaderReq req = {0};
261✔
329
  if (tDeserializeSVArbSetAssignedLeaderReq((char*)pRpcMsg->pCont + sizeof(SMsgHead), pRpcMsg->contLen, &req) != 0) {
261✔
330
    sError("vgId:%d, failed to deserialize SVArbSetAssignedLeaderReq", ths->vgId);
×
331
    code = TSDB_CODE_INVALID_MSG;
×
332
    goto _OVER;
×
333
  }
334

335
  if (ths->arbTerm > req.arbTerm) {
261✔
336
    sInfo("vgId:%d, skip to set assigned leader, msg with lower term, local:%" PRId64 "msg:%" PRId64, ths->vgId,
×
337
          ths->arbTerm, req.arbTerm);
338
    goto _OVER;
×
339
  }
340

341
  ths->arbTerm = TMAX(req.arbTerm, ths->arbTerm);
261✔
342

343
  if (!req.force) {
261✔
344
    if (strncmp(req.memberToken, ths->arbToken, TSDB_ARB_TOKEN_SIZE) != 0) {
261✔
345
      sInfo("vgId:%d, skip to set assigned leader, token mismatch, local:%s, msg:%s", ths->vgId, ths->arbToken,
×
346
            req.memberToken);
347
      goto _OVER;
×
348
    }
349
  }
350

351
  if (ths->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
261✔
352
    code = TSDB_CODE_SUCCESS;
261✔
353
    raftStoreNextTerm(ths);
261✔
354
    if (terrno != TSDB_CODE_SUCCESS) {
261✔
355
      code = terrno;
×
356
      sError("vgId:%d, failed to set next term since:%s", ths->vgId, tstrerror(code));
×
357
      goto _OVER;
×
358
    }
359
    syncNodeBecomeAssignedLeader(ths);
261✔
360

361
    if ((code = syncNodeAppendNoop(ths)) < 0) {
261✔
362
      sError("vgId:%d, assigned leader failed to append noop entry since %s", ths->vgId, tstrerror(code));
×
363
    }
364
  }
365

366
  SVArbSetAssignedLeaderRsp rsp = {0};
261✔
367
  rsp.arbToken = req.arbToken;
261✔
368
  rsp.memberToken = req.memberToken;
261✔
369
  rsp.vgId = ths->vgId;
261✔
370

371
  contLen = tSerializeSVArbSetAssignedLeaderRsp(NULL, 0, &rsp);
261✔
372
  if (contLen <= 0) {
261✔
373
    code = TSDB_CODE_OUT_OF_MEMORY;
×
374
    sError("vgId:%d, failed to serialize SVArbSetAssignedLeaderRsp", ths->vgId);
×
375
    goto _OVER;
×
376
  }
377
  pHead = rpcMallocCont(contLen);
261✔
378
  if (!pHead) {
261✔
379
    code = terrno;
×
380
    sError("vgId:%d, failed to malloc memory for SVArbSetAssignedLeaderRsp", ths->vgId);
×
381
    goto _OVER;
×
382
  }
383
  if (tSerializeSVArbSetAssignedLeaderRsp(pHead, contLen, &rsp) <= 0) {
261✔
384
    code = TSDB_CODE_OUT_OF_MEMORY;
×
385
    sError("vgId:%d, failed to serialize SVArbSetAssignedLeaderRsp", ths->vgId);
×
386
    rpcFreeCont(pHead);
×
387
    goto _OVER;
×
388
  }
389

390
  code = TSDB_CODE_SUCCESS;
261✔
391

392
_OVER:;
261✔
393
  SRpcMsg rspMsg = {
261✔
394
      .code = code,
395
      .pCont = pHead,
396
      .contLen = contLen,
397
      .info = pRpcMsg->info,
398
  };
399

400
  tmsgSendRsp(&rspMsg);
261✔
401

402
  tFreeSVArbSetAssignedLeaderReq(&req);
261✔
403
  TAOS_RETURN(code);
261✔
404
}
405

406
int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) {
×
407
  int32_t    code = 0;
×
408
  SSyncNode* pNode = syncNodeAcquire(rid);
×
409
  if (pNode == NULL) {
×
410
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
411
    if (terrno != 0) code = terrno;
×
412
    TAOS_RETURN(code);
×
413
  }
414

415
  SRpcMsg rpcMsg = {0, .info.notFreeAhandle = 1};
×
416
  int32_t ret = syncRespMgrGetAndDel(pNode->pSyncRespMgr, seq, &rpcMsg.info);
×
417
  rpcMsg.code = TSDB_CODE_SYN_TIMEOUT;
×
418

419
  syncNodeRelease(pNode);
×
420
  if (ret == 1) {
×
421
    sInfo("send timeout response, seq:%" PRId64 " handle:%p ahandle:%p", seq, rpcMsg.info.handle, rpcMsg.info.ahandle);
×
422
    code = rpcSendResponse(&rpcMsg);
×
423
    return code;
×
424
  } else {
425
    sError("no message handle to send timeout response, seq:%" PRId64, seq);
×
426
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
427
  }
428
}
429

430
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) {
35,149,373✔
431
  SyncIndex minMatchIndex = SYNC_INDEX_INVALID;
35,149,373✔
432

433
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
105,530,788✔
434
    SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
70,381,415✔
435
    if (minMatchIndex == SYNC_INDEX_INVALID) {
70,381,415✔
436
      minMatchIndex = matchIndex;
37,209,461✔
437
    } else if (matchIndex > 0 && matchIndex < minMatchIndex) {
33,171,954✔
438
      minMatchIndex = matchIndex;
560,939✔
439
    }
440
  }
441
  return minMatchIndex;
35,149,373✔
442
}
443

444
static SyncIndex syncLogRetentionIndex(SSyncNode* pSyncNode, int64_t bytes) {
685,552✔
445
  return pSyncNode->pLogStore->syncLogIndexRetention(pSyncNode->pLogStore, bytes);
685,552✔
446
}
447

448
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
6,298,618✔
449
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
6,298,618✔
450
  int32_t    code = 0;
6,298,171✔
451
  if (pSyncNode == NULL) {
6,298,171✔
452
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
7,487✔
453
    if (terrno != 0) code = terrno;
7,487✔
454
    sError("sync begin snapshot error");
7,487✔
455
    TAOS_RETURN(code);
7,487✔
456
  }
457

458
  SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
6,290,684✔
459
  SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
6,290,593✔
460
  bool      isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);
6,290,523✔
461

462
  if (isEmpty || !(lastApplyIndex >= beginIndex && lastApplyIndex <= endIndex)) {
6,290,684✔
463
    sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", empty:%d, do not delete wal", lastApplyIndex, isEmpty);
20,068✔
464
    syncNodeRelease(pSyncNode);
20,068✔
465
    return 0;
20,068✔
466
  }
467

468
  int64_t logRetention = 0;
6,270,616✔
469

470
  if (syncNodeIsMnode(pSyncNode)) {
6,270,616✔
471
    // mnode
472
    logRetention = tsMndLogRetention;
929,595✔
473
  } else {
474
    // vnode
475
    if (pSyncNode->replicaNum > 1) {
5,340,800✔
476
      logRetention = SYNC_VNODE_LOG_RETENTION;
543,337✔
477
    }
478
  }
479

480
  if (pSyncNode->totalReplicaNum > 1) {
6,270,395✔
481
    if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER &&
686,798✔
482
        pSyncNode->state != TAOS_SYNC_STATE_LEARNER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
71,201✔
483
      sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " candidate or unknown state, do not delete wal",
1,246✔
484
              lastApplyIndex);
485
      syncNodeRelease(pSyncNode);
1,246✔
486
      return 0;
1,246✔
487
    }
488
    SyncIndex retentionIndex =
685,552✔
489
        TMAX(pSyncNode->minMatchIndex, syncLogRetentionIndex(pSyncNode, SYNC_WAL_LOG_RETENTION_SIZE));
685,552✔
490
    logRetention += TMAX(0, lastApplyIndex - retentionIndex);
685,552✔
491
  }
492

493
_DEL_WAL:
5,584,265✔
494

495
  do {
496
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
6,269,817✔
497
    SyncIndex          snapshotVer = walGetSnapshotVer(pData->pWal);
6,269,817✔
498
    SyncIndex          walCommitVer = walGetCommittedVer(pData->pWal);
6,268,307✔
499
    SyncIndex          wallastVer = walGetLastVer(pData->pWal);
6,269,107✔
500
    if (lastApplyIndex <= walCommitVer) {
6,267,452✔
501
      SyncIndex snapshottingIndex = atomic_load_64(&pSyncNode->snapshottingIndex);
6,267,452✔
502

503
      if (snapshottingIndex == SYNC_INDEX_INVALID) {
6,269,177✔
504
        atomic_store_64(&pSyncNode->snapshottingIndex, lastApplyIndex);
6,269,177✔
505
        pSyncNode->snapshottingTime = taosGetTimestampMs();
6,268,957✔
506

507
        code = walBeginSnapshot(pData->pWal, lastApplyIndex, logRetention);
6,268,957✔
508
        if (code == 0) {
6,269,817✔
509
          sNTrace(pSyncNode, "wal snapshot begin, index:%" PRId64 ", last apply index:%" PRId64,
6,269,064✔
510
                  pSyncNode->snapshottingIndex, lastApplyIndex);
511
        } else {
512
          sNError(pSyncNode, "wal snapshot begin error since:%s, index:%" PRId64 ", last apply index:%" PRId64,
753✔
513
                  terrstr(), pSyncNode->snapshottingIndex, lastApplyIndex);
514
          atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
753✔
515
        }
516

517
      } else {
518
        sNTrace(pSyncNode, "snapshotting for %" PRId64 ", do not delete wal for new-snapshot-index:%" PRId64,
×
519
                snapshottingIndex, lastApplyIndex);
520
      }
521
    }
522
  } while (0);
523

524
  syncNodeRelease(pSyncNode);
6,269,817✔
525
  TAOS_RETURN(code);
6,269,817✔
526
}
527

528
int32_t syncEndSnapshot(int64_t rid, bool forceTrim) {
6,290,378✔
529
  int32_t    code = 0;
6,290,378✔
530
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
6,290,378✔
531
  if (pSyncNode == NULL) {
6,290,271✔
532
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
533
    if (terrno != 0) code = terrno;
×
534
    sError("sync end snapshot error");
×
535
    TAOS_RETURN(code);
×
536
  }
537

538
  if (atomic_load_64(&pSyncNode->snapshottingIndex) != SYNC_INDEX_INVALID) {
6,290,271✔
539
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
6,269,064✔
540
    code = walEndSnapshot(pData->pWal, forceTrim);
6,269,064✔
541
    if (code != 0) {
6,269,064✔
542
      sNError(pSyncNode, "wal snapshot end error since:%s", tstrerror(code));
×
543
      syncNodeRelease(pSyncNode);
×
544
      TAOS_RETURN(code);
×
545
    } else {
546
      sNTrace(pSyncNode, "wal snapshot end, index:%" PRId64, atomic_load_64(&pSyncNode->snapshottingIndex));
6,269,064✔
547
      atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
6,269,064✔
548
    }
549
  }
550

551
  syncNodeRelease(pSyncNode);
6,290,378✔
552
  TAOS_RETURN(code);
6,290,378✔
553
}
554

555
bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) {
515,123,764✔
556
  if (pSyncNode == NULL) {
515,123,764✔
557
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
558
    sError("sync ready for read error");
×
559
    return false;
×
560
  }
561

562
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
515,123,764✔
563
    terrno = TSDB_CODE_SYN_NOT_LEADER;
39,183,980✔
564
    return false;
39,183,980✔
565
  }
566

567
  if (!pSyncNode->restoreFinish) {
475,956,224✔
568
    terrno = TSDB_CODE_SYN_RESTORING;
192,264✔
569
    return false;
207,710✔
570
  }
571

572
  return true;
475,774,034✔
573
}
574

575
bool syncIsReadyForRead(int64_t rid) {
394,132,603✔
576
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
394,132,603✔
577
  if (pSyncNode == NULL) {
394,137,527✔
578
    sError("sync ready for read error");
×
579
    return false;
×
580
  }
581

582
  bool ready = syncNodeIsReadyForRead(pSyncNode);
394,137,527✔
583

584
  syncNodeRelease(pSyncNode);
394,158,273✔
585
  return ready;
394,123,280✔
586
}
587

588
#ifdef BUILD_NO_CALL
589
bool syncSnapshotSending(int64_t rid) {
590
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
591
  if (pSyncNode == NULL) {
592
    return false;
593
  }
594

595
  bool b = syncNodeSnapshotSending(pSyncNode);
596
  syncNodeRelease(pSyncNode);
597
  return b;
598
}
599

600
bool syncSnapshotRecving(int64_t rid) {
601
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
602
  if (pSyncNode == NULL) {
603
    return false;
604
  }
605

606
  bool b = syncNodeSnapshotRecving(pSyncNode);
607
  syncNodeRelease(pSyncNode);
608
  return b;
609
}
610
#endif
611

612
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) {
4,731,224✔
613
  if (pSyncNode->peersNum == 0) {
4,731,224✔
614
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
3,120,252✔
615
    return 0;
3,120,252✔
616
  }
617

618
  int32_t ret = 0;
1,610,972✔
619
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && pSyncNode->replicaNum > 1) {
1,610,972✔
620
    SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0];
508,988✔
621
    if (pSyncNode->peersNum == 2) {
508,988✔
622
      SyncIndex matchIndex0 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
346,673✔
623
      SyncIndex matchIndex1 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[1]));
346,673✔
624
      if (matchIndex1 > matchIndex0) {
346,673✔
625
        newLeader = (pSyncNode->peersNodeInfo)[1];
9,257✔
626
      }
627
    }
628
    ret = syncNodeLeaderTransferTo(pSyncNode, newLeader);
508,988✔
629
  }
630

631
  return ret;
1,610,972✔
632
}
633

634
int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) {
508,988✔
635
  if (pSyncNode->replicaNum == 1) {
508,988✔
636
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
×
637
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
638
  }
639

640
  sNTrace(pSyncNode, "begin leader transfer to %s:%u", newLeader.nodeFqdn, newLeader.nodePort);
508,988✔
641

642
  SRpcMsg rpcMsg = {0};
508,988✔
643
  TAOS_CHECK_RETURN(syncBuildLeaderTransfer(&rpcMsg, pSyncNode->vgId));
508,988✔
644

645
  SyncLeaderTransfer* pMsg = rpcMsg.pCont;
508,988✔
646
  pMsg->newLeaderId.addr = SYNC_ADDR(&newLeader);
508,988✔
647
  pMsg->newLeaderId.vgId = pSyncNode->vgId;
508,988✔
648
  pMsg->newNodeInfo = newLeader;
508,988✔
649

650
  int32_t ret = syncNodePropose(pSyncNode, &rpcMsg, false, NULL);
508,988✔
651
  rpcFreeCont(rpcMsg.pCont);
508,988✔
652
  return ret;
508,988✔
653
}
654

655
int32_t syncResetTimer(int64_t rid, int32_t electInterval, int32_t heartbeatInterval) {
×
656
  int32_t code = 0;
×
657
  sInfo("sync Reset Timer, rid:%" PRId64, rid);
×
658
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
×
659
  if (pSyncNode == NULL) {
×
660
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
661
    if (terrno != 0) code = terrno;
×
662
    sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid);
×
663
    TAOS_RETURN(code);
×
664
  }
665
  pSyncNode->electBaseLine = electInterval;
×
666
  syncNodeResetElectTimer(pSyncNode);
×
667

668
  sInfo("vgId:%d, sync Reset Timer, rid:%" PRId64, pSyncNode->vgId, rid);
×
669
  code = syncNodeRestartHeartbeatTimer(pSyncNode, heartbeatInterval);
×
670

671
  syncNodeRelease(pSyncNode);
×
672
  return code;
×
673
}
674

675
SSyncState syncGetState(int64_t rid) {
783,144,601✔
676
  SSyncState state = {.state = TAOS_SYNC_STATE_ERROR};
783,144,601✔
677

678
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
783,144,601✔
679
  if (pSyncNode != NULL) {
783,158,195✔
680
    state.state = pSyncNode->state;
783,160,113✔
681
    state.roleTimeMs = pSyncNode->roleTimeMs;
783,160,113✔
682
    state.startTimeMs = pSyncNode->startTime;
783,151,537✔
683
    state.restored = pSyncNode->restoreFinish;
783,150,122✔
684
    if (pSyncNode->vgId != 1) {
783,149,138✔
685
      state.canRead = syncNodeIsReadyForRead(pSyncNode);
120,997,060✔
686
    } else {
687
      state.canRead = state.restored;
662,148,275✔
688
    }
689
    /*
690
    double progress = 0;
691
    if(pSyncNode->pLogBuf->totalIndex > 0 && pSyncNode->pLogBuf->commitIndex > 0){
692
      progress = (double)pSyncNode->pLogBuf->commitIndex/(double)pSyncNode->pLogBuf->totalIndex;
693
      state.progress = (int32_t)(progress * 100);
694
    }
695
    else{
696
      state.progress = -1;
697
    }
698
    sDebug("vgId:%d, learner progress state, commitIndex:%" PRId64 " totalIndex:%" PRId64 ", "
699
            "progress:%lf, progress:%d",
700
          pSyncNode->vgId,
701
         pSyncNode->pLogBuf->commitIndex, pSyncNode->pLogBuf->totalIndex, progress, state.progress);
702
    */
703
    state.term = raftStoreGetTerm(pSyncNode);
783,143,962✔
704
    syncNodeRelease(pSyncNode);
783,157,902✔
705
  }
706

707
  return state;
783,156,024✔
708
}
709

710
SSyncMetrics syncGetMetrics(int64_t rid) {
×
711
  SSyncMetrics metrics = {0};
×
712

713
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
×
714
  if (pSyncNode != NULL) {
×
715
    sDebug("vgId:%d, sync get metrics, wal_write_bytes:%" PRId64 ", wal_write_time:%" PRId64, pSyncNode->vgId,
×
716
           pSyncNode->wal_write_bytes, pSyncNode->wal_write_time);
717
    metrics.wal_write_bytes = atomic_load_64(&pSyncNode->wal_write_bytes);
×
718
    metrics.wal_write_time = atomic_load_64(&pSyncNode->wal_write_time);
×
719
    syncNodeRelease(pSyncNode);
×
720
  }
721
  return metrics;
×
722
}
723

724
void syncResetMetrics(int64_t rid, const SSyncMetrics* pOldMetrics) {
×
725
  if (pOldMetrics == NULL) return;
×
726

727
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
×
728
  if (pSyncNode != NULL) {
×
729
    // Atomically subtract the old metrics values from current metrics
730
    (void)atomic_sub_fetch_64(&pSyncNode->wal_write_bytes, pOldMetrics->wal_write_bytes);
×
731
    (void)atomic_sub_fetch_64(&pSyncNode->wal_write_time, pOldMetrics->wal_write_time);
×
732
    syncNodeRelease(pSyncNode);
×
733
  }
734
}
735

736
void syncGetCommitIndex(int64_t rid, int64_t* syncCommitIndex) {
116,730,637✔
737
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
116,730,637✔
738
  if (pSyncNode != NULL) {
116,730,637✔
739
    *syncCommitIndex = pSyncNode->commitIndex;
116,730,637✔
740
    syncNodeRelease(pSyncNode);
116,730,637✔
741
  }
742
}
116,730,637✔
743

744
int32_t syncGetArbToken(int64_t rid, char* outToken) {
20,058,630✔
745
  int32_t    code = 0;
20,058,630✔
746
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
20,058,630✔
747
  if (pSyncNode == NULL) {
20,058,630✔
748
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
749
    if (terrno != 0) code = terrno;
×
750
    TAOS_RETURN(code);
×
751
  }
752

753
  memset(outToken, 0, TSDB_ARB_TOKEN_SIZE);
20,058,630✔
754
  (void)taosThreadMutexLock(&pSyncNode->arbTokenMutex);
20,058,630✔
755
  tstrncpy(outToken, pSyncNode->arbToken, TSDB_ARB_TOKEN_SIZE);
20,058,630✔
756
  (void)taosThreadMutexUnlock(&pSyncNode->arbTokenMutex);
20,058,630✔
757

758
  syncNodeRelease(pSyncNode);
20,058,630✔
759
  TAOS_RETURN(code);
20,058,630✔
760
}
761

762
int32_t syncCheckSynced(int64_t rid) {
1,768✔
763
  int32_t    code = 0;
1,768✔
764
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
1,768✔
765
  if (pSyncNode == NULL) {
1,768✔
766
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
767
    if (terrno != 0) code = terrno;
×
768
    TAOS_RETURN(code);
×
769
  }
770

771
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
1,768✔
772
    code = TSDB_CODE_SYN_NOT_LEADER;
×
773
    syncNodeRelease(pSyncNode);
×
774
    TAOS_RETURN(code);
×
775
  }
776

777
  bool isSync = pSyncNode->commitIndex >= pSyncNode->assignedCommitIndex;
1,768✔
778
  code = (isSync ? TSDB_CODE_SUCCESS : TSDB_CODE_VND_ARB_NOT_SYNCED);
1,768✔
779
  if (!isSync) {
1,768✔
780
    sInfo("vgId:%d, not synced, assignedCommitIndex:%" PRId64 ", commitIndex:%" PRId64, pSyncNode->vgId,
×
781
          pSyncNode->assignedCommitIndex, pSyncNode->commitIndex);
782
  }
783

784
  syncNodeRelease(pSyncNode);
1,768✔
785
  TAOS_RETURN(code);
1,768✔
786
}
787

788
int32_t syncUpdateArbTerm(int64_t rid, SyncTerm arbTerm) {
180,087✔
789
  int32_t    code = 0;
180,087✔
790
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
180,087✔
791
  if (pSyncNode == NULL) {
180,087✔
792
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
793
    if (terrno != 0) code = terrno;
×
794
    TAOS_RETURN(code);
×
795
  }
796

797
  pSyncNode->arbTerm = TMAX(arbTerm, pSyncNode->arbTerm);
180,087✔
798
  syncNodeRelease(pSyncNode);
180,087✔
799
  TAOS_RETURN(code);
180,087✔
800
}
801

802
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex) {
127,760,153✔
803
  if (pSyncNode->raftCfg.configIndexCount < 1) {
127,760,153✔
804
    sError("vgId:%d, failed get snapshot config index, configIndexCount:%d", pSyncNode->vgId,
×
805
           pSyncNode->raftCfg.configIndexCount);
806
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
807
    return -2;
×
808
  }
809
  SyncIndex lastIndex = (pSyncNode->raftCfg.configIndexArr)[0];
127,760,457✔
810

811
  for (int32_t i = 0; i < pSyncNode->raftCfg.configIndexCount; ++i) {
270,251,142✔
812
    if ((pSyncNode->raftCfg.configIndexArr)[i] > lastIndex &&
142,490,685✔
813
        (pSyncNode->raftCfg.configIndexArr)[i] <= snapshotLastApplyIndex) {
14,729,490✔
814
      lastIndex = (pSyncNode->raftCfg.configIndexArr)[i];
14,677,909✔
815
    }
816
  }
817
  sTrace("vgId:%d, index:%" PRId64 ", get last snapshot config index:%" PRId64, pSyncNode->vgId, snapshotLastApplyIndex,
127,761,195✔
818
         lastIndex);
819

820
  return lastIndex;
127,761,195✔
821
}
822

823
static SRaftId syncGetRaftIdByEp(SSyncNode* pSyncNode, const SEp* pEp) {
76,327,361✔
824
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
130,283,102✔
825
    if (strcmp(pEp->fqdn, (pSyncNode->peersNodeInfo)[i].nodeFqdn) == 0 &&
88,542,787✔
826
        pEp->port == (pSyncNode->peersNodeInfo)[i].nodePort) {
88,544,502✔
827
      return pSyncNode->peersId[i];
34,589,992✔
828
    }
829
  }
830
  return EMPTY_RAFT_ID;
41,730,270✔
831
}
832

833
static void epsetToString(const SEpSet* pEpSet, char* buffer, size_t bufferSize) {
41,765,323✔
834
  if (pEpSet == NULL || buffer == NULL) {
41,765,323✔
835
    snprintf(buffer, bufferSize, "EpSet is NULL");
×
836
    return;
×
837
  }
838

839
  size_t offset = 0;
41,768,572✔
840
  offset += snprintf(buffer + offset, bufferSize - offset, "EpSet: [");
41,768,572✔
841

842
  for (int i = 0; i < pEpSet->numOfEps; ++i) {
118,097,043✔
843
    if (offset >= bufferSize) break;
76,324,787✔
844
    offset += snprintf(buffer + offset, bufferSize - offset, "%s:%d%s", pEpSet->eps[i].fqdn, pEpSet->eps[i].port,
76,326,262✔
845
                       (i + 1 < pEpSet->numOfEps) ? ", " : "");
76,324,787✔
846
  }
847

848
  if (offset < bufferSize) {
41,779,986✔
849
    snprintf(buffer + offset, bufferSize - offset, "]");
41,774,701✔
850
  }
851
}
852

853
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
41,767,294✔
854
  pEpSet->numOfEps = 0;
41,767,294✔
855

856
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
41,774,037✔
857
  if (pSyncNode == NULL) return;
41,778,366✔
858

859
  int index = -1;
41,778,366✔
860

861
  sDebug("vgId:%d, sync get retry epset, leaderCache:%" PRIx64 ", leaderCacheEp.fqdn:%s, leaderCacheEp.port:%d",
41,778,366✔
862
         pSyncNode->vgId, pSyncNode->leaderCache.addr, pSyncNode->leaderCacheEp.fqdn, pSyncNode->leaderCacheEp.port);
863
  int j = 0;
41,774,820✔
864
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
118,794,106✔
865
    if (pSyncNode->raftCfg.cfg.nodeInfo[i].nodeRole == TAOS_SYNC_ROLE_LEARNER) continue;
77,024,564✔
866
    SEp* pEp = &pEpSet->eps[j];
76,323,485✔
867
    tstrncpy(pEp->fqdn, pSyncNode->raftCfg.cfg.nodeInfo[i].nodeFqdn, TSDB_FQDN_LEN);
76,327,916✔
868
    pEp->port = (pSyncNode->raftCfg.cfg.nodeInfo)[i].nodePort;
76,330,526✔
869
    pEpSet->numOfEps++;
76,321,890✔
870
    SRaftId id = syncGetRaftIdByEp(pSyncNode, pEp);
76,325,627✔
871
    sDebug("vgId:%d, sync get retry epset, index:%d id:%" PRIx64 " %s:%d", pSyncNode->vgId, i, id.addr, pEp->fqdn,
76,321,948✔
872
           pEp->port);
873
    if (pEp->port == pSyncNode->leaderCacheEp.port &&
76,321,948✔
874
        strncmp(pEp->fqdn, pSyncNode->leaderCacheEp.fqdn, TSDB_FQDN_LEN) == 0 /*&&
29,793,317✔
875
        id.vgId == pSyncNode->leaderCache.vgId && id.addr != 0 && id.vgId != 0*/) {
876
      index = j;
29,793,175✔
877
    }
878
    j++;
76,327,535✔
879
  }
880
  if (pEpSet->numOfEps > 0) {
41,770,185✔
881
    if (index != -1) {
41,770,603✔
882
      pEpSet->inUse = index;
29,798,603✔
883
    } else {
884
      if (pSyncNode->myRaftId.addr == pSyncNode->leaderCache.addr &&
11,972,000✔
885
          pSyncNode->myRaftId.vgId == pSyncNode->leaderCache.vgId) {
407✔
886
        pEpSet->inUse = pSyncNode->raftCfg.cfg.myIndex;
407✔
887
      } else {
888
        pEpSet->inUse = (pSyncNode->raftCfg.cfg.myIndex + 1) % pEpSet->numOfEps;
11,971,569✔
889
      }
890
    }
891
    // pEpSet->inUse = 0;
892
  }
893
  epsetSort(pEpSet);
41,773,084✔
894

895
  char buffer[1024];
41,682,242✔
896
  epsetToString(pEpSet, buffer, sizeof(buffer));
41,767,183✔
897
  sDebug("vgId:%d, sync get retry epset numOfEps:%d %s inUse:%d", pSyncNode->vgId, pEpSet->numOfEps, buffer,
41,771,430✔
898
         pEpSet->inUse);
899
  syncNodeRelease(pSyncNode);
41,771,430✔
900
}
901

902
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
589,237,423✔
903
  int32_t    code = 0;
589,237,423✔
904
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
589,237,423✔
905
  if (pSyncNode == NULL) {
589,237,368✔
906
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
907
    if (terrno != 0) code = terrno;
×
908
    sError("sync propose error");
×
909
    TAOS_RETURN(code);
×
910
  }
911

912
  int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak, seq);
589,237,368✔
913
  syncNodeRelease(pSyncNode);
589,175,508✔
914
  return ret;
589,180,767✔
915
}
916

917
int32_t syncCheckMember(int64_t rid) {
×
918
  int32_t    code = 0;
×
919
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
×
920
  if (pSyncNode == NULL) {
×
921
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
922
    if (terrno != 0) code = terrno;
×
923
    sError("sync propose error");
×
924
    TAOS_RETURN(code);
×
925
  }
926

927
  if (pSyncNode->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_LEARNER) {
×
928
    syncNodeRelease(pSyncNode);
×
929
    return TSDB_CODE_SYN_WRONG_ROLE;
×
930
  }
931

932
  syncNodeRelease(pSyncNode);
×
933
  return 0;
×
934
}
935

936
int32_t syncIsCatchUp(int64_t rid) {
2,254,312✔
937
  int32_t    code = 0;
2,254,312✔
938
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
2,254,312✔
939
  if (pSyncNode == NULL) {
2,254,312✔
940
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
941
    if (terrno != 0) code = terrno;
×
942
    sError("sync Node Acquire error since %d", ERRNO);
×
943
    TAOS_RETURN(code);
×
944
  }
945

946
  int32_t isCatchUp = 0;
2,254,312✔
947
  if (pSyncNode->pLogBuf->totalIndex < 0 || pSyncNode->pLogBuf->commitIndex < 0 ||
2,254,312✔
948
      pSyncNode->pLogBuf->totalIndex < pSyncNode->pLogBuf->commitIndex ||
464,403✔
949
      pSyncNode->pLogBuf->totalIndex - pSyncNode->pLogBuf->commitIndex > SYNC_LEARNER_CATCHUP) {
463,554✔
950
    sInfo("vgId:%d, Not catch up, wait one second, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64,
2,129,865✔
951
          pSyncNode->vgId, pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex,
952
          pSyncNode->pLogBuf->matchIndex);
953
    isCatchUp = 0;
2,129,865✔
954
  } else {
955
    sInfo("vgId:%d, Catch up, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64, pSyncNode->vgId,
124,447✔
956
          pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex, pSyncNode->pLogBuf->matchIndex);
957
    isCatchUp = 1;
124,447✔
958
  }
959

960
  syncNodeRelease(pSyncNode);
2,254,312✔
961
  return isCatchUp;
2,254,312✔
962
}
963

964
ESyncRole syncGetRole(int64_t rid) {
2,254,312✔
965
  int32_t    code = 0;
2,254,312✔
966
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
2,254,312✔
967
  if (pSyncNode == NULL) {
2,254,312✔
968
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
969
    if (terrno != 0) code = terrno;
×
970
    sError("sync Node Acquire error since %d", ERRNO);
×
971
    TAOS_RETURN(code);
×
972
  }
973

974
  ESyncRole role = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole;
2,254,312✔
975

976
  syncNodeRelease(pSyncNode);
2,254,312✔
977
  return role;
2,254,312✔
978
}
979

980
int64_t syncGetTerm(int64_t rid) {
8,009,911✔
981
  int32_t    code = 0;
8,009,911✔
982
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
8,009,911✔
983
  if (pSyncNode == NULL) {
8,009,911✔
984
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
985
    if (terrno != 0) code = terrno;
×
986
    sError("sync Node Acquire error since %d", ERRNO);
×
987
    TAOS_RETURN(code);
×
988
  }
989

990
  int64_t term = raftStoreGetTerm(pSyncNode);
8,009,911✔
991

992
  syncNodeRelease(pSyncNode);
8,009,911✔
993
  return term;
8,009,911✔
994
}
995

996
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
589,744,639✔
997
  int32_t code = 0;
589,744,639✔
998
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
589,744,639✔
999
    code = TSDB_CODE_SYN_NOT_LEADER;
732,809✔
1000
    sNWarn(pSyncNode, "sync propose not leader, type:%s", TMSG_INFO(pMsg->msgType));
732,809✔
1001
    TAOS_RETURN(code);
732,809✔
1002
  }
1003

1004
  if (!pSyncNode->restoreFinish) {
589,020,273✔
1005
    code = TSDB_CODE_SYN_RESTORING;
14,178✔
1006
    sNWarn(pSyncNode, "failed to sync propose since not ready, type:%s, last:%" PRId64 ", cmt:%" PRId64,
14,178✔
1007
           TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
1008
    TAOS_RETURN(code);
14,178✔
1009
  }
1010

1011
  // heartbeat timeout
1012
  if (pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER && syncNodeHeartbeatReplyTimeout(pSyncNode)) {
589,007,034✔
1013
    code = TSDB_CODE_SYN_PROPOSE_NOT_READY;
939✔
1014
    sNError(pSyncNode, "failed to sync propose since heartbeat timeout, type:%s, last:%" PRId64 ", cmt:%" PRId64,
939✔
1015
            TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
1016
    TAOS_RETURN(code);
939✔
1017
  }
1018

1019
  // optimized one replica
1020
  if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) {
588,996,696✔
1021
    SyncIndex retIndex;
532,578,959✔
1022
    int32_t   code = syncNodeOnClientRequest(pSyncNode, pMsg, &retIndex);
532,647,340✔
1023
    if (code >= 0) {
532,590,150✔
1024
      pMsg->info.conn.applyIndex = retIndex;
532,590,150✔
1025
      pMsg->info.conn.applyTerm = raftStoreGetTerm(pSyncNode);
532,611,958✔
1026

1027
      // after raft member change, need to handle 1->2 switching point
1028
      // at this point, need to switch entry handling thread
1029
      if (pSyncNode->replicaNum == 1) {
532,638,515✔
1030
        sGDebug(&pMsg->info.traceId, "vgId:%d, index:%" PRId64 ", propose optimized msg type:%s", pSyncNode->vgId,
532,627,796✔
1031
                retIndex, TMSG_INFO(pMsg->msgType));
1032
        return 1;
532,592,107✔
1033
      } else {
1034
        sGDebug(&pMsg->info.traceId,
×
1035
                "vgId:%d, index:%" PRId64 ", propose optimized msg, return to normal, type:%s, handle:%p",
1036
                pSyncNode->vgId, retIndex, TMSG_INFO(pMsg->msgType), pMsg->info.handle);
1037
        return 0;
×
1038
      }
1039
    } else {
1040
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1041
      sError("vgId:%d, failed to propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
×
1042
             TMSG_INFO(pMsg->msgType));
1043
      TAOS_RETURN(code);
×
1044
    }
1045
  } else {
1046
    SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
56,358,509✔
1047
    uint64_t  seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);
56,358,404✔
1048
    SRpcMsg   rpcMsg = {.info.traceId = pMsg->info.traceId};
56,358,567✔
1049
    int32_t   code = syncBuildClientRequest(&rpcMsg, pMsg, seqNum, isWeak, pSyncNode->vgId);
56,358,334✔
1050
    if (code != 0) {
56,358,465✔
1051
      sError("vgId:%d, failed to propose msg while serialize since %s", pSyncNode->vgId, terrstr());
×
1052
      code = syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
×
1053
      TAOS_RETURN(code);
×
1054
    }
1055

1056
    sGDebug(&pMsg->info.traceId, "vgId:%d, msg:%p, propose msg, type:%s", pSyncNode->vgId, pMsg,
56,358,465✔
1057
            TMSG_INFO(pMsg->msgType));
1058
    code = (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg);
56,358,465✔
1059
    if (code != 0) {
56,357,327✔
1060
      sWarn("vgId:%d, failed to propose msg while enqueue since %s", pSyncNode->vgId, terrstr());
619,705✔
1061
      TAOS_CHECK_RETURN(syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum));
619,705✔
1062
    }
1063

1064
    if (seq != NULL) *seq = seqNum;
56,354,329✔
1065
    TAOS_RETURN(code);
56,354,984✔
1066
  }
1067
}
1068

1069
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId) {
76,846,004✔
1070
  pSyncTimer->pTimer = NULL;
76,846,004✔
1071
  pSyncTimer->counter = 0;
76,846,004✔
1072
  pSyncTimer->timerMS = pSyncNode->hbBaseLine;
76,846,365✔
1073
  pSyncTimer->timerCb = syncNodeEqPeerHeartbeatTimer;
76,846,004✔
1074
  pSyncTimer->destId = destId;
76,845,329✔
1075
  pSyncTimer->timeStamp = taosGetTimestampMs();
76,845,596✔
1076
  atomic_store_64(&pSyncTimer->logicClock, 0);
76,844,537✔
1077
  sInfo("vgId:%d, HbTimer init, timerMs:%d for addr:0x%" PRIx64, pSyncNode->vgId, pSyncTimer->timerMS, destId.addr);
76,844,921✔
1078
  return 0;
76,846,165✔
1079
}
1080

1081
static void syncHBSetTimerMS(SSyncTimer* pSyncTimer, int32_t ms) { pSyncTimer->timerMS = ms; }
×
1082

1083
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
1,183,303✔
1084
  int32_t code = 0;
1,183,303✔
1085
  int64_t tsNow = taosGetTimestampMs();
1,183,303✔
1086
  if (syncIsInit()) {
1,183,303✔
1087
    SSyncHbTimerData* pData = syncHbTimerDataAcquire(pSyncTimer->hbDataRid);
1,183,303✔
1088
    if (pData == NULL) {
1,183,303✔
1089
      pData = taosMemoryMalloc(sizeof(SSyncHbTimerData));
1,183,042✔
1090
      pData->rid = syncHbTimerDataAdd(pData);
1,183,042✔
1091
    }
1092
    pSyncTimer->hbDataRid = pData->rid;
1,183,303✔
1093
    pSyncTimer->timeStamp = tsNow;
1,183,303✔
1094

1095
    pData->syncNodeRid = pSyncNode->rid;
1,183,303✔
1096
    pData->pTimer = pSyncTimer;
1,183,303✔
1097
    pData->destId = pSyncTimer->destId;
1,183,303✔
1098
    pData->logicClock = pSyncTimer->logicClock;
1,183,303✔
1099
    pData->execTime = tsNow + pSyncTimer->timerMS;
1,183,303✔
1100

1101
    sInfo("vgId:%d, start hb timer, rid:%" PRId64 " addr:0x%" PRIx64 " at %d", pSyncNode->vgId, pData->rid,
1,183,303✔
1102
          pData->destId.addr, pSyncTimer->timerMS);
1103

1104
    bool stopped = taosTmrResetPriority(pSyncTimer->timerCb, pSyncTimer->timerMS, (void*)(pData->rid),
1,183,303✔
1105
                                        syncEnv()->pTimerManager, &pSyncTimer->pTimer, 2);
1,183,303✔
1106
    if (stopped) {
1,183,303✔
1107
      sWarn("vgId:%d, reset hb timer stopped:%d", pSyncNode->vgId, stopped);
261✔
1108
    }
1109
  } else {
1110
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1111
    sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId);
×
1112
  }
1113
  return code;
1,183,303✔
1114
}
1115

1116
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
12,063,461✔
1117
  int32_t ret = 0;
12,063,461✔
1118
  (void)atomic_add_fetch_64(&pSyncTimer->logicClock, 1);
12,063,461✔
1119
  bool stop = taosTmrStop(pSyncTimer->pTimer);
12,063,475✔
1120
  sDebug("vgId:%d, stop hb timer stop:%d", pSyncNode->vgId, stop);
12,063,173✔
1121
  pSyncTimer->pTimer = NULL;
12,063,173✔
1122
  syncHbTimerDataRemove(pSyncTimer->hbDataRid);
12,063,461✔
1123
  pSyncTimer->hbDataRid = -1;
12,063,025✔
1124
  return ret;
12,063,025✔
1125
}
1126

1127
int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) {
4,731,973✔
1128
  int32_t code = 0;
4,731,973✔
1129
  if (pNode->pLogStore == NULL) {
4,731,973✔
1130
    sError("vgId:%d, log store not created", pNode->vgId);
×
1131
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1132
  }
1133
  if (pNode->pFsm == NULL) {
4,732,043✔
1134
    sError("vgId:%d, pFsm not registered", pNode->vgId);
×
1135
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1136
  }
1137
  if (pNode->pFsm->FpGetSnapshotInfo == NULL) {
4,732,043✔
1138
    sError("vgId:%d, FpGetSnapshotInfo not registered", pNode->vgId);
×
1139
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1140
  }
1141
  SSnapshot snapshot = {0};
4,732,043✔
1142
  // TODO check return value
1143
  (void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
4,732,043✔
1144

1145
  SyncIndex commitIndex = snapshot.lastApplyIndex;
4,732,043✔
1146
  SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
4,732,043✔
1147
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
4,732,043✔
1148
  if ((lastVer < commitIndex || firstVer > commitIndex + 1) || pNode->fsmState == SYNC_FSM_STATE_INCOMPLETE) {
4,731,294✔
1149
    sInfo("vgId:%d, restore log store from snapshot, firstVer:%" PRId64 ", lastVer:%" PRId64 ", commitIndex:%" PRId64,
769✔
1150
          pNode->vgId, firstVer, lastVer, commitIndex);
1151
    if ((code = pNode->pLogStore->syncLogRestoreFromSnapshot(pNode->pLogStore, commitIndex)) != 0) {
769✔
1152
      sError("vgId:%d, failed to restore log store from snapshot since %s. lastVer:%" PRId64 ", snapshotVer:%" PRId64,
×
1153
             pNode->vgId, terrstr(), lastVer, commitIndex);
1154
      TAOS_RETURN(code);
×
1155
    }
1156
  }
1157
  TAOS_RETURN(code);
4,731,294✔
1158
}
1159

1160
// open/close --------------
1161
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion, int32_t electInterval, int32_t heartbeatInterval) {
4,730,567✔
1162
  int32_t    code = 0;
4,730,567✔
1163
  SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode));
4,730,567✔
1164
  if (pSyncNode == NULL) {
4,731,371✔
1165
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1166
    goto _error;
×
1167
  }
1168

1169
  if (!taosDirExist((char*)(pSyncInfo->path))) {
4,731,371✔
1170
    if (taosMkDir(pSyncInfo->path) != 0) {
3,399,609✔
1171
      terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
1172
      sError("vgId:%d, failed to create dir:%s since %s", pSyncInfo->vgId, pSyncInfo->path, terrstr());
×
1173
      goto _error;
×
1174
    }
1175
  }
1176

1177
  memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
4,732,043✔
1178
  snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s%sraft_store.json", pSyncInfo->path,
4,731,370✔
1179
           TD_DIRSEP);
1180
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
4,730,799✔
1181

1182
  if (!taosCheckExistFile(pSyncNode->configPath)) {
4,729,517✔
1183
    // create a new raft config file
1184
    sInfo("vgId:%d, create a new raft config file", pSyncInfo->vgId);
3,399,609✔
1185
    pSyncNode->vgId = pSyncInfo->vgId;
3,400,517✔
1186
    pSyncNode->mountVgId = pSyncInfo->mountVgId;
3,399,609✔
1187
    pSyncNode->raftCfg.isStandBy = pSyncInfo->isStandBy;
3,399,609✔
1188
    pSyncNode->raftCfg.snapshotStrategy = pSyncInfo->snapshotStrategy;
3,399,609✔
1189
    pSyncNode->raftCfg.lastConfigIndex = pSyncInfo->syncCfg.lastIndex;
3,399,609✔
1190
    pSyncNode->raftCfg.batchSize = pSyncInfo->batchSize;
3,399,609✔
1191
    pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
3,399,609✔
1192
    pSyncNode->raftCfg.configIndexCount = 1;
3,399,609✔
1193
    pSyncNode->raftCfg.configIndexArr[0] = -1;
3,399,609✔
1194

1195
    if ((code = syncWriteCfgFile(pSyncNode)) != 0) {
3,399,609✔
1196
      terrno = code;
×
1197
      sError("vgId:%d, failed to create sync cfg file", pSyncNode->vgId);
×
1198
      goto _error;
×
1199
    }
1200
  } else {
1201
    // update syncCfg by raft_config.json
1202
    if ((code = syncReadCfgFile(pSyncNode)) != 0) {
1,332,434✔
1203
      terrno = code;
×
1204
      sError("vgId:%d, failed to read sync cfg file", pSyncNode->vgId);
×
1205
      goto _error;
×
1206
    }
1207

1208
    if (vnodeVersion > pSyncNode->raftCfg.cfg.changeVersion) {
1,332,434✔
1209
      if (pSyncInfo->syncCfg.totalReplicaNum > 0 && syncIsConfigChanged(&pSyncNode->raftCfg.cfg, &pSyncInfo->syncCfg)) {
902,157✔
1210
        sInfo("vgId:%d, use sync config from input options and write to cfg file", pSyncNode->vgId);
787,798✔
1211
        pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
787,798✔
1212
        if ((code = syncWriteCfgFile(pSyncNode)) != 0) {
787,798✔
1213
          terrno = code;
×
1214
          sError("vgId:%d, failed to write sync cfg file", pSyncNode->vgId);
×
1215
          goto _error;
×
1216
        }
1217
      } else {
1218
        sInfo("vgId:%d, use sync config from sync cfg file", pSyncNode->vgId);
114,359✔
1219
        pSyncInfo->syncCfg = pSyncNode->raftCfg.cfg;
114,359✔
1220
      }
1221
    } else {
1222
      sInfo("vgId:%d, skip save sync cfg file since request ver:%d <= file ver:%d", pSyncNode->vgId, vnodeVersion,
430,277✔
1223
            pSyncInfo->syncCfg.changeVersion);
1224
    }
1225
  }
1226

1227
  // init by SSyncInfo
1228
  pSyncNode->vgId = pSyncInfo->vgId;
4,732,043✔
1229
  pSyncNode->mountVgId = pSyncInfo->mountVgId;
4,732,043✔
1230
  SSyncCfg* pCfg = &pSyncNode->raftCfg.cfg;
4,731,850✔
1231
  bool      updated = false;
4,732,043✔
1232
  sInfo("vgId:%d, start to open sync node, totalReplicaNum:%d replicaNum:%d selfIndex:%d", pSyncNode->vgId,
4,732,043✔
1233
        pCfg->totalReplicaNum, pCfg->replicaNum, pCfg->myIndex);
1234
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
12,769,996✔
1235
    SNodeInfo* pNode = &pCfg->nodeInfo[i];
8,037,953✔
1236
    if (tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort)) {
8,037,953✔
1237
      updated = true;
×
1238
    }
1239
    sInfo("vgId:%d, index:%d ep:%s:%u dnode:%d cluster:%" PRId64, pSyncNode->vgId, i, pNode->nodeFqdn, pNode->nodePort,
8,037,953✔
1240
          pNode->nodeId, pNode->clusterId);
1241
  }
1242

1243
  if (vnodeVersion > pSyncInfo->syncCfg.changeVersion) {
4,730,995✔
1244
    if (updated) {
505,125✔
1245
      sInfo("vgId:%d, save config info since dnode info changed", pSyncNode->vgId);
×
1246
      if ((code = syncWriteCfgFile(pSyncNode)) != 0) {
×
1247
        terrno = code;
×
1248
        sError("vgId:%d, failed to write sync cfg file on dnode info updated", pSyncNode->vgId);
×
1249
        goto _error;
×
1250
      }
1251
    }
1252
  }
1253

1254
  pSyncNode->pWal = pSyncInfo->pWal;
4,732,043✔
1255
  pSyncNode->msgcb = pSyncInfo->msgcb;
4,732,043✔
1256
  pSyncNode->syncSendMSg = pSyncInfo->syncSendMSg;
4,732,043✔
1257
  pSyncNode->syncEqMsg = pSyncInfo->syncEqMsg;
4,732,043✔
1258
  pSyncNode->syncEqCtrlMsg = pSyncInfo->syncEqCtrlMsg;
4,732,043✔
1259

1260
  // create raft log ring buffer
1261
  code = syncLogBufferCreate(&pSyncNode->pLogBuf);
4,732,043✔
1262
  if (pSyncNode->pLogBuf == NULL) {
4,732,043✔
1263
    sError("failed to init sync log buffer since %s. vgId:%d", tstrerror(code), pSyncNode->vgId);
×
1264
    goto _error;
×
1265
  }
1266

1267
  // init replicaNum, replicasId
1268
  pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
4,732,043✔
1269
  pSyncNode->totalReplicaNum = pSyncNode->raftCfg.cfg.totalReplicaNum;
4,732,043✔
1270
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
12,769,996✔
1271
    if (syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]) ==
8,037,953✔
1272
        false) {
1273
      sError("vgId:%d, failed to determine raft member id, replica:%d", pSyncNode->vgId, i);
×
1274
      goto _error;
×
1275
    }
1276
  }
1277

1278
  // init internal
1279
  pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
4,732,043✔
1280
  pSyncNode->myRaftId = pSyncNode->replicasId[pSyncNode->raftCfg.cfg.myIndex];
4,732,043✔
1281

1282
  // init peersNum, peers, peersId
1283
  pSyncNode->peersNum = pSyncNode->raftCfg.cfg.totalReplicaNum - 1;
4,732,043✔
1284
  int32_t j = 0;
4,732,043✔
1285
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
12,769,996✔
1286
    if (i != pSyncNode->raftCfg.cfg.myIndex) {
8,037,953✔
1287
      pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
3,306,176✔
1288
      pSyncNode->peersId[j] = pSyncNode->replicasId[i];
3,306,176✔
1289
      syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
3,306,176✔
1290
      j++;
3,306,176✔
1291
    }
1292
  }
1293

1294
  pSyncNode->arbTerm = -1;
4,732,043✔
1295
  (void)taosThreadMutexInit(&pSyncNode->arbTokenMutex, NULL);
4,732,043✔
1296
  syncUtilGenerateArbToken(pSyncNode->myNodeInfo.nodeId, pSyncInfo->vgId, pSyncNode->arbToken);
4,732,043✔
1297
  sInfo("vgId:%d, generate arb token:%s", pSyncNode->vgId, pSyncNode->arbToken);
4,732,043✔
1298

1299
  // init raft algorithm
1300
  pSyncNode->pFsm = pSyncInfo->pFsm;
4,732,043✔
1301
  pSyncInfo->pFsm = NULL;
4,732,043✔
1302
  pSyncNode->quorum = syncUtilQuorum(pSyncNode->raftCfg.cfg.replicaNum);
4,732,043✔
1303
  pSyncNode->leaderCache = EMPTY_RAFT_ID;
4,732,043✔
1304
  pSyncNode->leaderCacheEp.port = 0;
4,732,043✔
1305
  pSyncNode->leaderCacheEp.fqdn[0] = '\0';
4,732,043✔
1306

1307
  // init life cycle outside
1308

1309
  // TLA+ Spec
1310
  // InitHistoryVars == /\ elections = {}
1311
  //                    /\ allLogs   = {}
1312
  //                    /\ voterLog  = [i \in Server |-> [j \in {} |-> <<>>]]
1313
  // InitServerVars == /\ currentTerm = [i \in Server |-> 1]
1314
  //                   /\ state       = [i \in Server |-> Follower]
1315
  //                   /\ votedFor    = [i \in Server |-> Nil]
1316
  // InitCandidateVars == /\ votesResponded = [i \in Server |-> {}]
1317
  //                      /\ votesGranted   = [i \in Server |-> {}]
1318
  // \* The values nextIndex[i][i] and matchIndex[i][i] are never read, since the
1319
  // \* leader does not send itself messages. It's still easier to include these
1320
  // \* in the functions.
1321
  // InitLeaderVars == /\ nextIndex  = [i \in Server |-> [j \in Server |-> 1]]
1322
  //                   /\ matchIndex = [i \in Server |-> [j \in Server |-> 0]]
1323
  // InitLogVars == /\ log          = [i \in Server |-> << >>]
1324
  //                /\ commitIndex  = [i \in Server |-> 0]
1325
  // Init == /\ messages = [m \in {} |-> 0]
1326
  //         /\ InitHistoryVars
1327
  //         /\ InitServerVars
1328
  //         /\ InitCandidateVars
1329
  //         /\ InitLeaderVars
1330
  //         /\ InitLogVars
1331
  //
1332

1333
  // init TLA+ server vars
1334
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
4,732,043✔
1335
  pSyncNode->roleTimeMs = taosGetTimestampMs();
4,732,043✔
1336
  if ((code = raftStoreOpen(pSyncNode)) != 0) {
4,732,043✔
1337
    terrno = code;
×
1338
    sError("vgId:%d, failed to open raft store at path %s", pSyncNode->vgId, pSyncNode->raftStorePath);
×
1339
    goto _error;
×
1340
  }
1341

1342
  // init TLA+ candidate vars
1343
  pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode);
4,731,659✔
1344
  if (pSyncNode->pVotesGranted == NULL) {
4,731,659✔
1345
    sError("vgId:%d, failed to create VotesGranted", pSyncNode->vgId);
×
1346
    goto _error;
×
1347
  }
1348
  pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode);
4,731,659✔
1349
  if (pSyncNode->pVotesRespond == NULL) {
4,732,043✔
1350
    sError("vgId:%d, failed to create VotesRespond", pSyncNode->vgId);
×
1351
    goto _error;
×
1352
  }
1353

1354
  // init TLA+ leader vars
1355
  pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode);
4,729,941✔
1356
  if (pSyncNode->pNextIndex == NULL) {
4,730,984✔
1357
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
×
1358
    goto _error;
×
1359
  }
1360
  pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode);
4,729,073✔
1361
  if (pSyncNode->pMatchIndex == NULL) {
4,731,407✔
1362
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
×
1363
    goto _error;
×
1364
  }
1365

1366
  // init TLA+ log vars
1367
  pSyncNode->pLogStore = logStoreCreate(pSyncNode);
4,730,224✔
1368
  if (pSyncNode->pLogStore == NULL) {
4,730,984✔
1369
    sError("vgId:%d, failed to create SyncLogStore", pSyncNode->vgId);
×
1370
    goto _error;
×
1371
  }
1372

1373
  SyncIndex commitIndex = SYNC_INDEX_INVALID;
4,729,913✔
1374
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
4,729,913✔
1375
    SSnapshot snapshot = {0};
4,730,985✔
1376
    // TODO check return value
1377
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
4,731,333✔
1378
    if (snapshot.lastApplyIndex > commitIndex) {
4,731,368✔
1379
      commitIndex = snapshot.lastApplyIndex;
748,415✔
1380
      sNTrace(pSyncNode, "reset commit index by snapshot");
748,415✔
1381
    }
1382
    pSyncNode->fsmState = snapshot.state;
4,731,368✔
1383
    if (pSyncNode->fsmState == SYNC_FSM_STATE_INCOMPLETE) {
4,730,636✔
1384
      sError("vgId:%d, fsm state is incomplete.", pSyncNode->vgId);
×
1385
      if (pSyncNode->replicaNum == 1) {
×
1386
        terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1387
        goto _error;
×
1388
      }
1389
    }
1390
  }
1391
  pSyncNode->commitIndex = commitIndex;
4,728,416✔
1392
  sInfo("vgId:%d, sync node commitIndex initialized as %" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
4,729,984✔
1393

1394
  // restore log store on need
1395
  if ((code = syncNodeLogStoreRestoreOnNeed(pSyncNode)) < 0) {
4,731,013✔
1396
    terrno = code;
×
1397
    sError("vgId:%d, failed to restore log store since %s.", pSyncNode->vgId, terrstr());
×
1398
    goto _error;
×
1399
  }
1400

1401
  // timer ms init
1402
  pSyncNode->pingBaseLine = PING_TIMER_MS;
4,731,294✔
1403
  pSyncNode->electBaseLine = electInterval;
4,731,294✔
1404
  pSyncNode->hbBaseLine = heartbeatInterval;
4,731,294✔
1405

1406
  // init ping timer
1407
  pSyncNode->pPingTimer = NULL;
4,731,294✔
1408
  pSyncNode->pingTimerMS = pSyncNode->pingBaseLine;
4,731,294✔
1409
  atomic_store_64(&pSyncNode->pingTimerLogicClock, 0);
4,731,294✔
1410
  atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0);
4,732,043✔
1411
  pSyncNode->FpPingTimerCB = syncNodeEqPingTimer;
4,732,043✔
1412
  pSyncNode->pingTimerCounter = 0;
4,732,043✔
1413

1414
  // init elect timer
1415
  pSyncNode->pElectTimer = NULL;
4,732,043✔
1416
  pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
4,731,294✔
1417
  atomic_store_64(&pSyncNode->electTimerLogicClock, 0);
4,732,043✔
1418
  pSyncNode->FpElectTimerCB = syncNodeEqElectTimer;
4,732,043✔
1419
  pSyncNode->electTimerCounter = 0;
4,732,043✔
1420

1421
  // init heartbeat timer
1422
  pSyncNode->pHeartbeatTimer = NULL;
4,732,043✔
1423
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
4,732,043✔
1424
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0);
4,732,043✔
1425
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0);
4,732,043✔
1426
#ifdef BUILD_NO_CALL
1427
  pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer;
1428
#endif
1429
  pSyncNode->heartbeatTimerCounter = 0;
4,732,043✔
1430

1431
  // init peer heartbeat timer
1432
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
75,712,327✔
1433
    if ((code = syncHbTimerInit(pSyncNode, &(pSyncNode->peerHeartbeatTimerArr[i]), (pSyncNode->replicasId)[i])) != 0) {
70,980,284✔
1434
      terrno = code;
×
1435
      goto _error;
×
1436
    }
1437
  }
1438

1439
  // tools
1440
  if ((code = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS, &pSyncNode->pSyncRespMgr)) != 0) {
4,732,043✔
1441
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
×
1442
    goto _error;
×
1443
  }
1444
  if (pSyncNode->pSyncRespMgr == NULL) {
4,732,043✔
1445
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
×
1446
    goto _error;
×
1447
  }
1448

1449
  // restore state
1450
  pSyncNode->restoreFinish = false;
4,732,043✔
1451

1452
  // snapshot senders
1453
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
75,705,456✔
1454
    SSyncSnapshotSender* pSender = NULL;
70,973,413✔
1455
    code = snapshotSenderCreate(pSyncNode, i, &pSender);
70,973,917✔
1456
    if (pSender == NULL) return NULL;
70,975,230✔
1457

1458
    pSyncNode->senders[i] = pSender;
70,975,230✔
1459
    sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender);
70,975,905✔
1460
  }
1461

1462
  // snapshot receivers
1463
  code = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID, &pSyncNode->pNewNodeReceiver);
4,732,043✔
1464
  if (pSyncNode->pNewNodeReceiver == NULL) return NULL;
4,731,492✔
1465
  sRDebug(pSyncNode->pNewNodeReceiver, "snapshot receiver create while open sync node, data:%p",
4,731,492✔
1466
          pSyncNode->pNewNodeReceiver);
1467

1468
  // is config changing
1469
  pSyncNode->changing = false;
4,731,492✔
1470

1471
  // replication mgr
1472
  if ((code = syncNodeLogReplInit(pSyncNode)) < 0) {
4,731,492✔
1473
    terrno = code;
×
1474
    sError("vgId:%d, failed to init repl mgr since %s.", pSyncNode->vgId, terrstr());
×
1475
    goto _error;
×
1476
  }
1477

1478
  // peer state
1479
  if ((code = syncNodePeerStateInit(pSyncNode)) < 0) {
4,732,043✔
1480
    terrno = code;
×
1481
    sError("vgId:%d, failed to init peer stat since %s.", pSyncNode->vgId, terrstr());
×
1482
    goto _error;
×
1483
  }
1484

1485
  //
1486
  // min match index
1487
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
4,732,043✔
1488

1489
  // start in syncNodeStart
1490
  // start raft
1491

1492
  int64_t timeNow = taosGetTimestampMs();
4,732,043✔
1493
  pSyncNode->startTime = timeNow;
4,732,043✔
1494
  pSyncNode->lastReplicateTime = timeNow;
4,732,043✔
1495

1496
  // snapshotting
1497
  atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
4,732,043✔
1498

1499
  // init log buffer
1500
  if ((code = syncLogBufferInit(pSyncNode->pLogBuf, pSyncNode)) < 0) {
4,732,043✔
1501
    terrno = code;
×
1502
    sError("vgId:%d, failed to init sync log buffer since %s", pSyncNode->vgId, terrstr());
×
1503
    goto _error;
×
1504
  }
1505

1506
  pSyncNode->isStart = true;
4,732,043✔
1507
  pSyncNode->electNum = 0;
4,732,043✔
1508
  pSyncNode->becomeLeaderNum = 0;
4,732,043✔
1509
  pSyncNode->becomeAssignedLeaderNum = 0;
4,732,043✔
1510
  pSyncNode->configChangeNum = 0;
4,732,043✔
1511
  pSyncNode->hbSlowNum = 0;
4,732,043✔
1512
  pSyncNode->hbrSlowNum = 0;
4,732,043✔
1513
  pSyncNode->tmrRoutineNum = 0;
4,732,043✔
1514

1515
  sNInfo(pSyncNode, "sync node opened, node:%p electBaseLine:%d hbBaseLine:%d heartbeatTimeout:%d", pSyncNode,
4,732,043✔
1516
         pSyncNode->electBaseLine, pSyncNode->hbBaseLine, tsHeartbeatTimeout);
1517
  return pSyncNode;
4,732,043✔
1518

1519
_error:
×
1520
  if (pSyncInfo->pFsm) {
×
1521
    taosMemoryFree(pSyncInfo->pFsm);
×
1522
    pSyncInfo->pFsm = NULL;
×
1523
  }
1524
  syncNodeClose(pSyncNode);
×
1525
  pSyncNode = NULL;
×
1526
  return NULL;
×
1527
}
1528

1529
#ifdef BUILD_NO_CALL
1530
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
1531
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
1532
    SSnapshot snapshot = {0};
1533
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
1534
    if (snapshot.lastApplyIndex > pSyncNode->commitIndex) {
1535
      pSyncNode->commitIndex = snapshot.lastApplyIndex;
1536
    }
1537
  }
1538
}
1539
#endif
1540

1541
int32_t syncNodeRestore(SSyncNode* pSyncNode) {
4,731,224✔
1542
  int32_t code = 0;
4,731,224✔
1543
  if (pSyncNode->pLogStore == NULL) {
4,731,224✔
1544
    sError("vgId:%d, log store not created", pSyncNode->vgId);
×
1545
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1546
  }
1547
  if (pSyncNode->pLogBuf == NULL) {
4,731,224✔
1548
    sError("vgId:%d, ring log buffer not created", pSyncNode->vgId);
×
1549
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1550
  }
1551

1552
  (void)taosThreadMutexLock(&pSyncNode->pLogBuf->mutex);
4,731,224✔
1553
  SyncIndex lastVer = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
4,731,224✔
1554
  SyncIndex commitIndex = pSyncNode->pLogStore->syncLogCommitIndex(pSyncNode->pLogStore);
4,731,224✔
1555
  SyncIndex endIndex = pSyncNode->pLogBuf->endIndex;
4,731,224✔
1556
  (void)taosThreadMutexUnlock(&pSyncNode->pLogBuf->mutex);
4,731,224✔
1557

1558
  if (lastVer != -1 && endIndex != lastVer + 1) {
4,731,224✔
UNCOV
1559
    code = TSDB_CODE_WAL_LOG_INCOMPLETE;
×
UNCOV
1560
    sWarn("vgId:%d, failed to restore sync node since %s. expected lastLogIndex:%" PRId64 ", lastVer:%" PRId64,
×
1561
          pSyncNode->vgId, terrstr(), endIndex - 1, lastVer);
1562
    // TAOS_RETURN(code);
1563
  }
1564

1565
  // if (endIndex != lastVer + 1) return TSDB_CODE_SYN_INTERNAL_ERROR;
1566
  pSyncNode->commitIndex = TMAX(pSyncNode->commitIndex, commitIndex);
4,731,224✔
1567
  sInfo("vgId:%d, restore began, and keep syncing until commitIndex:%" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
4,731,224✔
1568

1569
  if (pSyncNode->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
9,463,186✔
1570
      (code = syncLogBufferCommit(pSyncNode->pLogBuf, pSyncNode, pSyncNode->commitIndex, NULL, "restore")) < 0) {
4,731,224✔
1571
    TAOS_RETURN(code);
×
1572
  }
1573

1574
  TAOS_RETURN(code);
4,731,224✔
1575
}
1576

1577
int32_t syncNodeStart(SSyncNode* pSyncNode) {
4,730,486✔
1578
  // start raft
1579
  sInfo("vgId:%d, begin to start sync node", pSyncNode->vgId);
4,730,486✔
1580
  if (pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole == TAOS_SYNC_ROLE_LEARNER) {
4,730,486✔
1581
    syncNodeBecomeLearner(pSyncNode, "first start");
127,485✔
1582
  } else {
1583
    if (pSyncNode->replicaNum == 1) {
4,603,739✔
1584
      raftStoreNextTerm(pSyncNode);
3,183,690✔
1585
      syncNodeBecomeLeader(pSyncNode, "one replica start");
3,183,690✔
1586

1587
      // Raft 3.6.2 Committing entries from previous terms
1588
      TAOS_CHECK_RETURN(syncNodeAppendNoop(pSyncNode));
3,183,690✔
1589
    } else {
1590
      SRaftId id = {0};
1,420,049✔
1591
      syncNodeBecomeFollower(pSyncNode, id, "first start");
1,420,049✔
1592
    }
1593
  }
1594

1595
  int32_t ret = 0;
4,730,895✔
1596
  ret = syncNodeStartPingTimer(pSyncNode);
4,730,895✔
1597
  if (ret != 0) {
4,731,224✔
1598
    sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, tstrerror(ret));
×
1599
  }
1600
  sInfo("vgId:%d, sync node started", pSyncNode->vgId);
4,731,224✔
1601
  return ret;
4,731,224✔
1602
}
1603

1604
#ifdef BUILD_NO_CALL
1605
int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) {
1606
  // state change
1607
  int32_t code = 0;
1608
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
1609
  pSyncNode->roleTimeMs = taosGetTimestampMs();
1610
  // TODO check return value
1611
  TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
1612

1613
  // reset elect timer, long enough
1614
  int32_t electMS = TIMER_MAX_MS;
1615
  code = syncNodeRestartElectTimer(pSyncNode, electMS);
1616
  if (code < 0) {
1617
    sError("vgId:%d, failed to restart elect timer since %s", pSyncNode->vgId, terrstr());
1618
    return -1;
1619
  }
1620

1621
  code = syncNodeStartPingTimer(pSyncNode);
1622
  if (code < 0) {
1623
    sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, terrstr());
1624
    return -1;
1625
  }
1626
  return code;
1627
}
1628
#endif
1629

1630
void syncNodePreClose(SSyncNode* pSyncNode) {
4,731,224✔
1631
  int32_t code = 0;
4,731,224✔
1632
  if (pSyncNode == NULL) {
4,731,224✔
1633
    sError("failed to pre close sync node since sync node is null");
×
1634
    return;
×
1635
  }
1636
  if (pSyncNode->pFsm == NULL) {
4,731,224✔
1637
    sError("failed to pre close sync node since fsm is null");
×
1638
    return;
×
1639
  }
1640
  if (pSyncNode->pFsm->FpApplyQueueItems == NULL) {
4,731,224✔
1641
    sError("failed to pre close sync node since FpApplyQueueItems is null");
×
1642
    return;
×
1643
  }
1644

1645
  // stop elect timer
1646
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
4,731,224✔
1647
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
1648
    return;
×
1649
  }
1650

1651
  // stop heartbeat timer
1652
  if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
4,731,224✔
1653
    sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
1654
    return;
×
1655
  }
1656

1657
  // stop ping timer
1658
  if ((code = syncNodeStopPingTimer(pSyncNode)) != 0) {
4,731,224✔
1659
    sError("vgId:%d, failed to stop ping timer since %s", pSyncNode->vgId, tstrerror(code));
×
1660
    return;
×
1661
  }
1662

1663
  // clean rsp
1664
  syncRespCleanRsp(pSyncNode->pSyncRespMgr);
4,730,937✔
1665
}
1666

1667
void syncNodePostClose(SSyncNode* pSyncNode) {
4,226,918✔
1668
  if (pSyncNode->pNewNodeReceiver != NULL) {
4,226,918✔
1669
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
4,226,918✔
1670
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
1671
    }
1672

1673
    sDebug("vgId:%d, snapshot receiver destroy while preclose sync node, data:%p", pSyncNode->vgId,
4,226,918✔
1674
           pSyncNode->pNewNodeReceiver);
1675
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
4,226,918✔
1676
    pSyncNode->pNewNodeReceiver = NULL;
4,226,918✔
1677
  }
1678
}
4,226,918✔
1679

1680
void syncHbTimerDataFree(SSyncHbTimerData* pData) { taosMemoryFree(pData); }
1,182,097✔
1681

1682
void syncNodeClose(SSyncNode* pSyncNode) {
4,718,747✔
1683
  int32_t code = 0;
4,718,747✔
1684
  if (pSyncNode == NULL) return;
4,718,747✔
1685
  sNInfo(pSyncNode, "sync close, node:%p", pSyncNode);
4,718,747✔
1686

1687
  syncRespCleanRsp(pSyncNode->pSyncRespMgr);
4,718,747✔
1688

1689
  if ((code = syncNodeStopPingTimer(pSyncNode)) != 0) {
4,718,747✔
1690
    sError("vgId:%d, failed to stop ping timer since %s", pSyncNode->vgId, tstrerror(code));
×
1691
    return;
×
1692
  }
1693
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
4,718,747✔
1694
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
1695
    return;
×
1696
  }
1697
  if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
4,718,747✔
1698
    sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
1699
    return;
×
1700
  }
1701
  syncNodeLogReplDestroy(pSyncNode);
4,718,747✔
1702

1703
  syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
4,718,747✔
1704
  pSyncNode->pSyncRespMgr = NULL;
4,718,747✔
1705
  voteGrantedDestroy(pSyncNode->pVotesGranted);
4,718,747✔
1706
  pSyncNode->pVotesGranted = NULL;
4,718,747✔
1707
  votesRespondDestory(pSyncNode->pVotesRespond);
4,718,747✔
1708
  pSyncNode->pVotesRespond = NULL;
4,718,747✔
1709
  syncIndexMgrDestroy(pSyncNode->pNextIndex);
4,718,747✔
1710
  pSyncNode->pNextIndex = NULL;
4,718,747✔
1711
  syncIndexMgrDestroy(pSyncNode->pMatchIndex);
4,718,747✔
1712
  pSyncNode->pMatchIndex = NULL;
4,718,747✔
1713
  logStoreDestory(pSyncNode->pLogStore);
4,718,747✔
1714
  pSyncNode->pLogStore = NULL;
4,718,747✔
1715
  syncLogBufferDestroy(pSyncNode->pLogBuf);
4,718,747✔
1716
  pSyncNode->pLogBuf = NULL;
4,718,747✔
1717

1718
  (void)taosThreadMutexDestroy(&pSyncNode->arbTokenMutex);
4,718,747✔
1719

1720
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
75,499,404✔
1721
    if (pSyncNode->senders[i] != NULL) {
70,780,657✔
1722
      sDebug("vgId:%d, snapshot sender destroy while close, data:%p", pSyncNode->vgId, pSyncNode->senders[i]);
70,780,683✔
1723

1724
      if (snapshotSenderIsStart(pSyncNode->senders[i])) {
70,780,683✔
1725
        snapshotSenderStop(pSyncNode->senders[i], false);
312✔
1726
      }
1727

1728
      snapshotSenderDestroy(pSyncNode->senders[i]);
70,780,708✔
1729
      pSyncNode->senders[i] = NULL;
70,780,657✔
1730
    }
1731
  }
1732

1733
  if (pSyncNode->pNewNodeReceiver != NULL) {
4,718,747✔
1734
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
504,306✔
1735
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
1736
    }
1737

1738
    sDebug("vgId:%d, snapshot receiver destroy while close, data:%p", pSyncNode->vgId, pSyncNode->pNewNodeReceiver);
504,306✔
1739
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
504,306✔
1740
    pSyncNode->pNewNodeReceiver = NULL;
504,306✔
1741
  }
1742

1743
  if (pSyncNode->pFsm != NULL) {
4,718,747✔
1744
    taosMemoryFree(pSyncNode->pFsm);
4,718,747✔
1745
  }
1746

1747
  raftStoreClose(pSyncNode);
4,718,747✔
1748

1749
  taosMemoryFree(pSyncNode);
4,718,747✔
1750
}
1751

1752
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->raftCfg.snapshotStrategy; }
×
1753

1754
// timer control --------------
1755
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
4,731,224✔
1756
  int32_t code = 0;
4,731,224✔
1757
  if (syncIsInit()) {
4,731,224✔
1758
    bool stopped = taosTmrResetPriority(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, (void*)pSyncNode->rid,
9,452,648✔
1759
                                        syncEnv()->pTimerManager, &pSyncNode->pPingTimer, 2);
4,731,224✔
1760
    if (stopped) {
4,731,224✔
1761
      sError("vgId:%d, failed to reset ping timer, ms:%d, stopped:%d", pSyncNode->vgId, pSyncNode->pingTimerMS,
×
1762
             stopped);
1763
    }
1764
    atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
4,731,224✔
1765
  } else {
1766
    sError("vgId:%d, start ping timer error, sync env is stop", pSyncNode->vgId);
×
1767
  }
1768
  return code;
4,731,224✔
1769
}
1770

1771
int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
9,449,971✔
1772
  int32_t code = 0;
9,449,971✔
1773
  (void)atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
9,449,971✔
1774
  bool stop = taosTmrStop(pSyncNode->pPingTimer);
9,449,348✔
1775
  sDebug("vgId:%d, stop ping timer, stop:%d", pSyncNode->vgId, stop);
9,449,971✔
1776
  pSyncNode->pPingTimer = NULL;
9,449,971✔
1777
  return code;
9,449,971✔
1778
}
1779

1780
int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
85,289,586✔
1781
  int32_t code = 0;
85,289,586✔
1782
  if (syncIsInit()) {
85,289,586✔
1783
    pSyncNode->electTimerMS = ms;
85,289,568✔
1784

1785
    int64_t execTime = taosGetTimestampMs() + ms;
85,290,171✔
1786
    atomic_store_64(&(pSyncNode->electTimerParam.executeTime), execTime);
85,290,171✔
1787
    atomic_store_64(&(pSyncNode->electTimerParam.logicClock), pSyncNode->electTimerLogicClock);
85,290,909✔
1788
    pSyncNode->electTimerParam.pSyncNode = pSyncNode;
85,290,323✔
1789
    pSyncNode->electTimerParam.pData = NULL;
85,289,738✔
1790

1791
    bool stopped = taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, (void*)(pSyncNode->rid),
170,572,166✔
1792
                                syncEnv()->pTimerManager, &pSyncNode->pElectTimer);
85,290,324✔
1793
    if (stopped) sWarn("vgId:%d, failed to reset elect timer, ms:%d", pSyncNode->vgId, ms);
85,290,909✔
1794
  } else {
1795
    sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
×
1796
  }
1797
  return code;
85,290,909✔
1798
}
1799

1800
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
98,428,369✔
1801
  int32_t code = 0;
98,428,369✔
1802
  (void)atomic_add_fetch_64(&pSyncNode->electTimerLogicClock, 1);
98,428,369✔
1803
  bool stop = taosTmrStop(pSyncNode->pElectTimer);
98,428,460✔
1804
  sTrace("vgId:%d, stop elect timer, stop:%d", pSyncNode->vgId, stop);
98,425,057✔
1805
  pSyncNode->pElectTimer = NULL;
98,425,057✔
1806

1807
  return code;
98,429,878✔
1808
}
1809

1810
int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
85,290,909✔
1811
  int32_t ret = 0;
85,290,909✔
1812
  TAOS_CHECK_RETURN(syncNodeStopElectTimer(pSyncNode));
85,290,909✔
1813
  TAOS_CHECK_RETURN(syncNodeStartElectTimer(pSyncNode, ms));
85,290,909✔
1814
  return ret;
85,290,909✔
1815
}
1816

1817
void syncNodeResetElectTimer(SSyncNode* pSyncNode) {
85,290,890✔
1818
  int32_t code = 0;
85,290,890✔
1819
  int32_t electMS;
1820

1821
  if (pSyncNode->raftCfg.isStandBy) {
85,290,890✔
1822
    electMS = TIMER_MAX_MS;
×
1823
  } else {
1824
    electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
85,289,287✔
1825
  }
1826

1827
  if ((code = syncNodeRestartElectTimer(pSyncNode, electMS)) != 0) {
85,290,909✔
1828
    sWarn("vgId:%d, failed to restart elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
1829
    return;
×
1830
  };
1831

1832
  sNTrace(pSyncNode, "reset elect timer, min:%d, max:%d, ms:%d", pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine,
85,290,909✔
1833
          electMS);
1834
}
1835

1836
#ifdef BUILD_NO_CALL
1837
static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) {
1838
  int32_t code = 0;
1839
  if (syncIsInit()) {
1840
    TAOS_CHECK_RETURN(taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, (void*)pSyncNode->rid,
1841
                                   syncEnv()->pTimerManager, &pSyncNode->pHeartbeatTimer));
1842
    atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
1843
  } else {
1844
    sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId);
1845
  }
1846

1847
  sNTrace(pSyncNode, "start heartbeat timer, ms:%d", pSyncNode->heartbeatTimerMS);
1848
  return code;
1849
}
1850
#endif
1851

1852
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
4,079,832✔
1853
  int32_t ret = 0;
4,079,832✔
1854

1855
#if 0
1856
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
1857
  ret = syncNodeDoStartHeartbeatTimer(pSyncNode);
1858
#endif
1859

1860
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
5,263,135✔
1861
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
1,183,303✔
1862
    if (pSyncTimer != NULL) {
1,183,303✔
1863
      TAOS_CHECK_RETURN(syncHbTimerStart(pSyncNode, pSyncTimer));
1,183,303✔
1864
    }
1865
  }
1866

1867
  return ret;
4,077,766✔
1868
}
1869

1870
int32_t syncNodeSetHeartbeatTimerMs(SSyncNode* pSyncNode, int32_t ms) {
×
1871
  int32_t code = 0;
×
1872

1873
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
×
1874
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
×
1875
    if (pSyncTimer != NULL) {
×
1876
      syncHBSetTimerMS(pSyncTimer, ms);
×
1877
    }
1878
  }
1879

1880
  return code;
×
1881
}
1882

1883
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
12,319,921✔
1884
  int32_t code = 0;
12,319,921✔
1885

1886
#if 0
1887
  TAOS_CHECK_RETURN(atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1));
1888
  bool stop = taosTmrStop(pSyncNode->pHeartbeatTimer);
1889
  sDebug("vgId:%d, stop heartbeat timer, stop:%d", pSyncNode->vgId, stop);
1890
  pSyncNode->pHeartbeatTimer = NULL;
1891
#endif
1892

1893
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
24,382,946✔
1894
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
12,063,475✔
1895
    if (pSyncTimer != NULL) {
12,063,461✔
1896
      TAOS_CHECK_RETURN(syncHbTimerStop(pSyncNode, pSyncTimer));
12,063,461✔
1897
    }
1898
  }
1899

1900
  return code;
12,319,183✔
1901
}
1902

1903
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode, int32_t heartbeatInterval) {
×
1904
  int32_t code = 0;
×
1905
  sInfo("vgId:%d, sync Node Restart HeartbeatTimer, state=%d", pSyncNode->vgId, pSyncNode->state);
×
1906
  TAOS_CHECK_RETURN(syncNodeSetHeartbeatTimerMs(pSyncNode, heartbeatInterval));
×
1907
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
×
1908
    TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
×
1909
    TAOS_CHECK_RETURN(syncNodeStartHeartbeatTimer(pSyncNode));
×
1910
  }
1911

1912
  return 0;
×
1913
}
1914

1915
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pNode, SRpcMsg* pMsg) {
181,992,992✔
1916
  SEpSet* epSet = NULL;
181,992,992✔
1917
  for (int32_t i = 0; i < pNode->peersNum; ++i) {
263,930,580✔
1918
    if (destRaftId->addr == pNode->peersId[i].addr) {
263,898,803✔
1919
      epSet = &pNode->peersEpset[i];
181,964,749✔
1920
      break;
181,965,873✔
1921
    }
1922
  }
1923

1924
  int32_t code = -1;
182,000,649✔
1925
  if (pNode->syncSendMSg != NULL && epSet != NULL) {
182,000,649✔
1926
    syncUtilMsgHtoN(pMsg->pCont);
181,963,397✔
1927
    pMsg->info.noResp = 1;
181,956,971✔
1928
    code = pNode->syncSendMSg(epSet, pMsg);
181,958,047✔
1929
  }
1930

1931
  if (code < 0) {
181,999,744✔
1932
    sError("vgId:%d, failed to send sync msg since %s. epset:%p dnode:%d addr:0x%" PRIx64, pNode->vgId, tstrerror(code),
32,301✔
1933
           epSet, DID(destRaftId), destRaftId->addr);
1934
    rpcFreeCont(pMsg->pCont);
32,301✔
1935
    pMsg->pCont = NULL;
32,301✔
1936
  }
1937

1938
  TAOS_RETURN(code);
181,999,744✔
1939
}
1940

1941
inline bool syncNodeInConfig(SSyncNode* pNode, const SSyncCfg* pCfg) {
635,007✔
1942
  bool b1 = false;
635,007✔
1943
  bool b2 = false;
635,007✔
1944

1945
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
781,044✔
1946
    if (strcmp(pCfg->nodeInfo[i].nodeFqdn, pNode->myNodeInfo.nodeFqdn) == 0 &&
781,044✔
1947
        pCfg->nodeInfo[i].nodePort == pNode->myNodeInfo.nodePort) {
781,044✔
1948
      b1 = true;
635,007✔
1949
      break;
635,007✔
1950
    }
1951
  }
1952

1953
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
781,044✔
1954
    SRaftId raftId = {
781,044✔
1955
        .addr = SYNC_ADDR(&pCfg->nodeInfo[i]),
781,044✔
1956
        .vgId = pNode->vgId,
781,044✔
1957
    };
1958

1959
    if (syncUtilSameId(&raftId, &pNode->myRaftId)) {
781,044✔
1960
      b2 = true;
635,007✔
1961
      break;
635,007✔
1962
    }
1963
  }
1964

1965
  if (b1 != b2) {
635,007✔
1966
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1967
    return false;
×
1968
  }
1969
  return b1;
635,007✔
1970
}
1971

1972
static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg) {
1,240,227✔
1973
  if (pOldCfg->totalReplicaNum != pNewCfg->totalReplicaNum) return true;
1,240,227✔
1974
  if (pOldCfg->myIndex != pNewCfg->myIndex) return true;
706,878✔
1975
  for (int32_t i = 0; i < pOldCfg->totalReplicaNum; ++i) {
1,840,395✔
1976
    const SNodeInfo* pOldInfo = &pOldCfg->nodeInfo[i];
1,479,255✔
1977
    const SNodeInfo* pNewInfo = &pNewCfg->nodeInfo[i];
1,479,255✔
1978
    if (strcmp(pOldInfo->nodeFqdn, pNewInfo->nodeFqdn) != 0) return true;
1,479,255✔
1979
    if (pOldInfo->nodePort != pNewInfo->nodePort) return true;
1,479,255✔
1980
    if (pOldInfo->nodeRole != pNewInfo->nodeRole) return true;
1,479,255✔
1981
  }
1982

1983
  return false;
361,140✔
1984
}
1985

1986
int32_t syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) {
452,429✔
1987
  int32_t  code = 0;
452,429✔
1988
  SSyncCfg oldConfig = pSyncNode->raftCfg.cfg;
452,429✔
1989
  if (!syncIsConfigChanged(&oldConfig, pNewConfig)) {
452,429✔
1990
    sInfo("vgId:1, sync not reconfig since not changed");
361,140✔
1991
    return 0;
361,140✔
1992
  }
1993

1994
  pSyncNode->raftCfg.cfg = *pNewConfig;
91,289✔
1995
  pSyncNode->raftCfg.lastConfigIndex = lastConfigChangeIndex;
91,289✔
1996

1997
  pSyncNode->configChangeNum++;
91,289✔
1998

1999
  bool IamInOld = syncNodeInConfig(pSyncNode, &oldConfig);
91,289✔
2000
  bool IamInNew = syncNodeInConfig(pSyncNode, pNewConfig);
91,289✔
2001

2002
  bool isDrop = false;
91,289✔
2003
  bool isAdd = false;
91,289✔
2004

2005
  if (IamInOld && !IamInNew) {
91,289✔
2006
    isDrop = true;
×
2007
  } else {
2008
    isDrop = false;
91,289✔
2009
  }
2010

2011
  if (!IamInOld && IamInNew) {
91,289✔
2012
    isAdd = true;
×
2013
  } else {
2014
    isAdd = false;
91,289✔
2015
  }
2016

2017
  // log begin config change
2018
  sNInfo(pSyncNode, "begin do config change, from %d to %d, from %" PRId64 " to %" PRId64 ", replicas:%d",
91,289✔
2019
         pSyncNode->vgId, oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum, oldConfig.lastIndex,
2020
         pNewConfig->lastIndex);
2021

2022
  if (IamInNew) {
91,289✔
2023
    pSyncNode->raftCfg.isStandBy = 0;  // change isStandBy to normal
91,289✔
2024
  }
2025
  if (isDrop) {
91,289✔
2026
    pSyncNode->raftCfg.isStandBy = 1;  // set standby
×
2027
  }
2028

2029
  // add last config index
2030
  SRaftCfg* pCfg = &pSyncNode->raftCfg;
91,289✔
2031
  if (pCfg->configIndexCount >= MAX_CONFIG_INDEX_COUNT) {
91,289✔
2032
    sNError(pSyncNode, "failed to add cfg index:%d since out of range", pCfg->configIndexCount);
×
2033
    terrno = TSDB_CODE_OUT_OF_RANGE;
×
2034
    return -1;
×
2035
  }
2036

2037
  pCfg->configIndexArr[pCfg->configIndexCount] = lastConfigChangeIndex;
91,289✔
2038
  pCfg->configIndexCount++;
91,289✔
2039

2040
  if (IamInNew) {
91,289✔
2041
    //-----------------------------------------
2042
    int32_t ret = 0;
91,289✔
2043

2044
    // save snapshot senders
2045
    SRaftId oldReplicasId[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
91,289✔
2046
    memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId));
91,289✔
2047
    SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
91,289✔
2048
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
1,460,624✔
2049
      oldSenders[i] = pSyncNode->senders[i];
1,369,335✔
2050
      sSTrace(oldSenders[i], "snapshot sender save old");
1,369,335✔
2051
    }
2052

2053
    // init internal
2054
    pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
91,289✔
2055
    if (syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId) == false) return terrno;
91,289✔
2056

2057
    // init peersNum, peers, peersId
2058
    pSyncNode->peersNum = pSyncNode->raftCfg.cfg.totalReplicaNum - 1;
91,289✔
2059
    int32_t j = 0;
91,289✔
2060
    for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
331,695✔
2061
      if (i != pSyncNode->raftCfg.cfg.myIndex) {
240,406✔
2062
        pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
149,117✔
2063
        syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
149,117✔
2064
        j++;
149,117✔
2065
      }
2066
    }
2067
    for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
240,406✔
2068
      if (syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]) == false)
149,117✔
2069
        return terrno;
×
2070
    }
2071

2072
    // init replicaNum, replicasId
2073
    pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
91,289✔
2074
    pSyncNode->totalReplicaNum = pSyncNode->raftCfg.cfg.totalReplicaNum;
91,289✔
2075
    for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
331,695✔
2076
      if (syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]) ==
240,406✔
2077
          false)
2078
        return terrno;
×
2079
    }
2080

2081
    // update quorum first
2082
    pSyncNode->quorum = syncUtilQuorum(pSyncNode->raftCfg.cfg.replicaNum);
91,289✔
2083

2084
    syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode);
91,289✔
2085
    syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode);
91,289✔
2086
    voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode);
91,289✔
2087
    votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode);
91,289✔
2088

2089
    // reset snapshot senders
2090

2091
    // clear new
2092
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
1,460,624✔
2093
      pSyncNode->senders[i] = NULL;
1,369,335✔
2094
    }
2095

2096
    // reset new
2097
    for (int32_t i = 0; i < pSyncNode->totalReplicaNum; ++i) {
331,695✔
2098
      // reset sender
2099
      bool reset = false;
240,406✔
2100
      for (int32_t j = 0; j < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++j) {
1,033,896✔
2101
        if (syncUtilSameId(&(pSyncNode->replicasId)[i], &oldReplicasId[j]) && oldSenders[j] != NULL) {
990,211✔
2102
          sNTrace(pSyncNode, "snapshot sender reset for:%" PRId64 ", newIndex:%d, dnode:%d, %p",
196,721✔
2103
                  (pSyncNode->replicasId)[i].addr, i, DID(&pSyncNode->replicasId[i]), oldSenders[j]);
2104

2105
          pSyncNode->senders[i] = oldSenders[j];
196,721✔
2106
          oldSenders[j] = NULL;
196,721✔
2107
          reset = true;
196,721✔
2108

2109
          // reset replicaIndex
2110
          int32_t oldreplicaIndex = pSyncNode->senders[i]->replicaIndex;
196,721✔
2111
          pSyncNode->senders[i]->replicaIndex = i;
196,721✔
2112

2113
          sNTrace(pSyncNode, "snapshot sender udpate replicaIndex from %d to %d, dnode:%d, %p, reset:%d",
196,721✔
2114
                  oldreplicaIndex, i, DID(&pSyncNode->replicasId[i]), pSyncNode->senders[i], reset);
2115

2116
          break;
196,721✔
2117
        }
2118
      }
2119
    }
2120

2121
    // create new
2122
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
1,460,624✔
2123
      if (pSyncNode->senders[i] == NULL) {
1,369,335✔
2124
        TAOS_CHECK_RETURN(snapshotSenderCreate(pSyncNode, i, &pSyncNode->senders[i]));
1,172,614✔
2125
        if (pSyncNode->senders[i] == NULL) {
1,172,614✔
2126
          // will be created later while send snapshot
2127
          sSError(pSyncNode->senders[i], "snapshot sender create failed while reconfig");
×
2128
        } else {
2129
          sSDebug(pSyncNode->senders[i], "snapshot sender create while reconfig, data:%p", pSyncNode->senders[i]);
1,172,614✔
2130
        }
2131
      } else {
2132
        sSDebug(pSyncNode->senders[i], "snapshot sender already exist, data:%p", pSyncNode->senders[i]);
196,721✔
2133
      }
2134
    }
2135

2136
    // free old
2137
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
1,460,624✔
2138
      if (oldSenders[i] != NULL) {
1,369,335✔
2139
        sSDebug(oldSenders[i], "snapshot sender destroy old, data:%p replica-index:%d", oldSenders[i], i);
1,172,614✔
2140
        snapshotSenderDestroy(oldSenders[i]);
1,172,614✔
2141
        oldSenders[i] = NULL;
1,172,614✔
2142
      }
2143
    }
2144

2145
    // persist cfg
2146
    TAOS_CHECK_RETURN(syncWriteCfgFile(pSyncNode));
91,289✔
2147
  } else {
2148
    // persist cfg
2149
    TAOS_CHECK_RETURN(syncWriteCfgFile(pSyncNode));
×
2150
    sNInfo(pSyncNode, "do not config change from %d to %d", oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum);
×
2151
  }
2152

2153
_END:
×
2154
  // log end config change
2155
  sNInfo(pSyncNode, "end do config change, from %d to %d", oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum);
91,289✔
2156
  return 0;
91,289✔
2157
}
2158

2159
// raft state change --------------
2160
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
174,013✔
2161
  if (term > raftStoreGetTerm(pSyncNode)) {
174,013✔
2162
    raftStoreSetTerm(pSyncNode, term);
×
2163
  }
2164
}
174,013✔
2165

2166
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm, SRaftId id, char* strFrom) {
50,890,745✔
2167
  SyncTerm currentTerm = raftStoreGetTerm(pSyncNode);
50,890,745✔
2168
  if (currentTerm > newTerm) {
50,890,745✔
2169
    sNTrace(pSyncNode, "step down, ignore, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
×
2170
    return;
×
2171
  }
2172

2173
  do {
2174
    sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
50,890,745✔
2175
  } while (0);
2176

2177
  if (pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
50,890,745✔
2178
    (void)taosThreadMutexLock(&pSyncNode->arbTokenMutex);
×
2179
    syncUtilGenerateArbToken(pSyncNode->myNodeInfo.nodeId, pSyncNode->vgId, pSyncNode->arbToken);
×
2180
    sInfo("vgId:%d, generate arb token, will step down from assigned leader, new arbToken:%s", pSyncNode->vgId,
×
2181
          pSyncNode->arbToken);
2182
    (void)taosThreadMutexUnlock(&pSyncNode->arbTokenMutex);
×
2183
  }
2184

2185
  if (currentTerm < newTerm) {
50,890,569✔
2186
    raftStoreSetTerm(pSyncNode, newTerm);
1,034,694✔
2187
    char tmpBuf[64];
1,034,166✔
2188
    snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64 " from %" PRId64 ", since %s", newTerm,
1,034,694✔
2189
             currentTerm, strFrom);
2190
    syncNodeBecomeFollower(pSyncNode, id, tmpBuf);
1,034,694✔
2191
    raftStoreClearVote(pSyncNode);
1,034,694✔
2192
  } else {
2193
    if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
49,855,875✔
2194
      char tmpBuf[64];
20,001✔
2195
      snprintf(tmpBuf, sizeof(tmpBuf), "step down, with same term to %" PRId64 " from %" PRId64 ", since %s", newTerm, 
20,001✔
2196
               currentTerm, strFrom);
2197
      syncNodeBecomeFollower(pSyncNode, id, tmpBuf);
20,001✔
2198
    }
2199
  }
2200
}
2201

2202
void syncNodeLeaderChangeRsp(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); }
2,476,466✔
2203

2204
void syncNodeBecomeFollower(SSyncNode* pSyncNode, SRaftId leaderId, const char* debugStr) {
2,478,298✔
2205
  int32_t code = 0;  // maybe clear leader cache
2,478,298✔
2206
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
2,478,298✔
2207
    pSyncNode->leaderCache = EMPTY_RAFT_ID;
10,645✔
2208
    pSyncNode->leaderCacheEp.port = 0;
10,645✔
2209
    pSyncNode->leaderCacheEp.fqdn[0] = '\0';
10,645✔
2210
  }
2211

2212
  pSyncNode->hbSlowNum = 0;
2,478,902✔
2213

2214
  pSyncNode->leaderCache = leaderId;  // state change
2,478,902✔
2215

2216
  for (int32_t i = 0; i < pSyncNode->totalReplicaNum; ++i) {
7,901,856✔
2217
    if (syncUtilSameId(&pSyncNode->replicasId[i], &leaderId)) {
6,477,199✔
2218
      pSyncNode->leaderCacheEp.port = pSyncNode->raftCfg.cfg.nodeInfo[i].nodePort;
1,054,695✔
2219
      strncpy(pSyncNode->leaderCacheEp.fqdn, pSyncNode->raftCfg.cfg.nodeInfo[i].nodeFqdn, TSDB_FQDN_LEN);
1,054,695✔
2220
      break;
1,054,695✔
2221
    }
2222
  }
2223
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
2,479,352✔
2224
  pSyncNode->roleTimeMs = taosGetTimestampMs();
2,478,902✔
2225
  if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
2,478,614✔
2226
    sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
2227
    return;
×
2228
  }
2229

2230
  // trace log
2231
  sNTrace(pSyncNode, "become follower %s", debugStr);
2,478,164✔
2232

2233
  // send rsp to client
2234
  syncNodeLeaderChangeRsp(pSyncNode);
2,478,164✔
2235

2236
  // call back
2237
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeFollowerCb != NULL) {
2,477,428✔
2238
    pSyncNode->pFsm->FpBecomeFollowerCb(pSyncNode->pFsm);
2,478,164✔
2239
  }
2240

2241
  // min match index
2242
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
2,478,902✔
2243

2244
  // reset log buffer
2245
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
2,478,902✔
2246
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2247
    return;
×
2248
  }
2249

2250
  // reset elect timer
2251
  syncNodeResetElectTimer(pSyncNode);
2,478,902✔
2252

2253
  sInfo("vgId:%d, become follower. %s", pSyncNode->vgId, debugStr);
2,478,902✔
2254
}
2255

2256
void syncNodeBecomeLearner(SSyncNode* pSyncNode, const char* debugStr) {
127,485✔
2257
  pSyncNode->hbSlowNum = 0;
127,485✔
2258

2259
  // state change
2260
  pSyncNode->state = TAOS_SYNC_STATE_LEARNER;
127,485✔
2261
  pSyncNode->roleTimeMs = taosGetTimestampMs();
127,485✔
2262

2263
  // trace log
2264
  sNTrace(pSyncNode, "become learner %s", debugStr);
127,485✔
2265

2266
  // call back
2267
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLearnerCb != NULL) {
127,485✔
2268
    pSyncNode->pFsm->FpBecomeLearnerCb(pSyncNode->pFsm);
127,485✔
2269
  }
2270

2271
  // min match index
2272
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
127,485✔
2273

2274
  // reset log buffer
2275
  int32_t code = 0;
127,485✔
2276
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
127,485✔
2277
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2278
    return;
×
2279
  };
2280
}
2281

2282
// TLA+ Spec
2283
// \* Candidate i transitions to leader.
2284
// BecomeLeader(i) ==
2285
//     /\ state[i] = Candidate
2286
//     /\ votesGranted[i] \in Quorum
2287
//     /\ state'      = [state EXCEPT ![i] = Leader]
2288
//     /\ nextIndex'  = [nextIndex EXCEPT ![i] =
2289
//                          [j \in Server |-> Len(log[i]) + 1]]
2290
//     /\ matchIndex' = [matchIndex EXCEPT ![i] =
2291
//                          [j \in Server |-> 0]]
2292
//     /\ elections'  = elections \cup
2293
//                          {[eterm     |-> currentTerm[i],
2294
//                            eleader   |-> i,
2295
//                            elog      |-> log[i],
2296
//                            evotes    |-> votesGranted[i],
2297
//                            evoterLog |-> voterLog[i]]}
2298
//     /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
2299
//
2300
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
3,688,430✔
2301
  int32_t code = 0;
3,688,430✔
2302
  pSyncNode->becomeLeaderNum++;
3,688,430✔
2303
  pSyncNode->hbrSlowNum = 0;
3,688,450✔
2304

2305
  // reset restoreFinish
2306
  pSyncNode->restoreFinish = false;
3,688,450✔
2307

2308
  // state change
2309
  pSyncNode->state = TAOS_SYNC_STATE_LEADER;
3,687,681✔
2310
  pSyncNode->roleTimeMs = taosGetTimestampMs();
3,689,199✔
2311

2312
  // set leader cache
2313
  pSyncNode->leaderCache = pSyncNode->myRaftId;
3,689,199✔
2314
  strncpy(pSyncNode->leaderCacheEp.fqdn, pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeFqdn,
3,689,199✔
2315
          TSDB_FQDN_LEN);
2316
  pSyncNode->leaderCacheEp.port = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodePort;
3,688,890✔
2317

2318
  for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
8,375,685✔
2319
    SyncIndex lastIndex;
4,678,498✔
2320
    SyncTerm  lastTerm;
4,678,498✔
2321
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
4,687,552✔
2322
    if (code != 0) {
4,685,514✔
2323
      sError("vgId:%d, failed to become leader since %s", pSyncNode->vgId, tstrerror(code));
×
2324
      return;
×
2325
    }
2326
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
4,685,514✔
2327
  }
2328

2329
  for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
8,374,803✔
2330
    // maybe overwrite myself, no harm
2331
    // just do it!
2332
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
4,687,445✔
2333
  }
2334

2335
  // init peer mgr
2336
  if ((code = syncNodePeerStateInit(pSyncNode)) != 0) {
3,687,909✔
2337
    sError("vgId:%d, failed to init peer state since %s", pSyncNode->vgId, tstrerror(code));
×
2338
    return;
×
2339
  }
2340

2341
#if 0
2342
  // update sender private term
2343
  SSyncSnapshotSender* pMySender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->myRaftId));
2344
  if (pMySender != NULL) {
2345
    for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
2346
      if (pSyncNode->senders[i]->privateTerm > pMySender->privateTerm) {
2347
        pMySender->privateTerm = pSyncNode->senders[i]->privateTerm;
2348
      }
2349
    }
2350
    (pMySender->privateTerm) += 100;
2351
  }
2352
#endif
2353

2354
  // close receiver
2355
  if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
3,688,875✔
2356
    snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
2357
  }
2358

2359
  // stop elect timer
2360
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
3,689,199✔
2361
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
2362
    return;
×
2363
  }
2364

2365
  // start heartbeat timer
2366
  if ((code = syncNodeStartHeartbeatTimer(pSyncNode)) != 0) {
3,688,737✔
2367
    sError("vgId:%d, failed to start heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
2368
    return;
×
2369
  }
2370

2371
  // send heartbeat right now
2372
  if ((code = syncNodeHeartbeatPeers(pSyncNode)) != 0) {
3,686,457✔
2373
    sError("vgId:%d, failed to send heartbeat to peers since %s", pSyncNode->vgId, tstrerror(code));
×
2374
    return;
×
2375
  }
2376

2377
  // call back
2378
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLeaderCb != NULL) {
3,686,560✔
2379
    pSyncNode->pFsm->FpBecomeLeaderCb(pSyncNode->pFsm);
3,687,935✔
2380
  }
2381

2382
  // min match index
2383
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
3,686,214✔
2384

2385
  // reset log buffer
2386
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
3,689,199✔
2387
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2388
    return;
×
2389
  }
2390

2391
  // trace log
2392
  sNInfo(pSyncNode, "node become leader, %s", debugStr);
3,689,199✔
2393
}
2394

2395
void syncNodeBecomeAssignedLeader(SSyncNode* pSyncNode) {
261✔
2396
  int32_t code = 0;
261✔
2397
  pSyncNode->becomeAssignedLeaderNum++;
261✔
2398
  pSyncNode->hbrSlowNum = 0;
261✔
2399

2400
  // reset restoreFinish
2401
  // pSyncNode->restoreFinish = false;
2402

2403
  // state change
2404
  pSyncNode->state = TAOS_SYNC_STATE_ASSIGNED_LEADER;
261✔
2405
  pSyncNode->roleTimeMs = taosGetTimestampMs();
261✔
2406

2407
  // set leader cache
2408
  pSyncNode->leaderCache = pSyncNode->myRaftId;
261✔
2409

2410
  for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
783✔
2411
    SyncIndex lastIndex;
×
2412
    SyncTerm  lastTerm;
×
2413
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
522✔
2414
    if (code != 0) {
522✔
2415
      sError("vgId:%d, failed to become assigned leader since %s", pSyncNode->vgId, tstrerror(code));
×
2416
      return;
×
2417
    }
2418
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
522✔
2419
  }
2420

2421
  for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
783✔
2422
    // maybe overwrite myself, no harm
2423
    // just do it!
2424
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
522✔
2425
  }
2426

2427
  // init peer mgr
2428
  if ((code = syncNodePeerStateInit(pSyncNode)) != 0) {
261✔
2429
    sError("vgId:%d, failed to init peer state since %s", pSyncNode->vgId, tstrerror(code));
×
2430
    return;
×
2431
  }
2432

2433
  // close receiver
2434
  if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
261✔
2435
    snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
×
2436
  }
2437

2438
  // stop elect timer
2439
  if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
261✔
2440
    sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
×
2441
    return;
×
2442
  }
2443

2444
  // start heartbeat timer
2445
  if ((code = syncNodeStartHeartbeatTimer(pSyncNode)) != 0) {
261✔
2446
    sError("vgId:%d, failed to start heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
×
2447
    return;
×
2448
  }
2449

2450
  // send heartbeat right now
2451
  if ((code = syncNodeHeartbeatPeers(pSyncNode)) != 0) {
261✔
2452
    sError("vgId:%d, failed to send heartbeat to peers since %s", pSyncNode->vgId, tstrerror(code));
×
2453
    return;
×
2454
  }
2455

2456
  // call back
2457
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeAssignedLeaderCb != NULL) {
261✔
2458
    pSyncNode->pFsm->FpBecomeAssignedLeaderCb(pSyncNode->pFsm);
261✔
2459
  }
2460

2461
  // min match index
2462
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
261✔
2463

2464
  // reset log buffer
2465
  if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
261✔
2466
    sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
×
2467
    return;
×
2468
  }
2469

2470
  // trace log
2471
  sNInfo(pSyncNode, "become assigned leader");
261✔
2472
}
2473

2474
void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
505,509✔
2475
  if (pSyncNode->state != TAOS_SYNC_STATE_CANDIDATE) {
505,509✔
2476
    sError("vgId:%d, failed leader from candidate since node state is wrong:%d", pSyncNode->vgId, pSyncNode->state);
×
2477
    return;
×
2478
  }
2479
  bool granted = voteGrantedMajority(pSyncNode->pVotesGranted);
505,509✔
2480
  if (!granted) {
505,509✔
2481
    sError("vgId:%d, not granted by majority.", pSyncNode->vgId);
×
2482
    return;
×
2483
  }
2484
  syncNodeBecomeLeader(pSyncNode, "from candidate to leader");
505,509✔
2485

2486
  sNTrace(pSyncNode, "state change syncNodeCandidate2Leader");
505,509✔
2487

2488
  int32_t ret = syncNodeAppendNoop(pSyncNode);
505,509✔
2489
  if (ret < 0) {
505,509✔
2490
    sError("vgId:%d, failed to append noop entry since %s", pSyncNode->vgId, terrstr());
×
2491
  }
2492

2493
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
505,509✔
2494

2495
  sInfo("vgId:%d, become leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64, pSyncNode->vgId,
505,509✔
2496
        raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
2497
}
2498

2499
bool syncNodeIsMnode(SSyncNode* pSyncNode) { return (pSyncNode->vgId == 1); }
61,280,721✔
2500

2501
int32_t syncSetElectBaseline(int64_t rid, int32_t ms){
24,000✔
2502
  int32_t code = 0;
24,000✔
2503
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
24,000✔
2504
  if (pSyncNode == NULL) {
24,000✔
2505
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
2506
    if (terrno != 0) code = terrno;
×
2507
    sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid);
×
2508
    TAOS_RETURN(code);
×
2509
  }
2510
  pSyncNode->electBaseLine = ms;
24,000✔
2511
  syncNodeResetElectTimer(pSyncNode);
24,000✔
2512
  return code;
24,000✔
2513
}
2514

2515
int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) {
8,420,293✔
2516
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
134,673,838✔
2517
    pSyncNode->peerStates[i].lastSendIndex = SYNC_INDEX_INVALID;
126,252,659✔
2518
    pSyncNode->peerStates[i].lastSendTime = 0;
126,253,760✔
2519
  }
2520

2521
  return 0;
8,421,179✔
2522
}
2523

2524
void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
630,137✔
2525
  if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
630,137✔
2526
    sError("vgId:%d, failed candidate from follower since node state is wrong:%d", pSyncNode->vgId, pSyncNode->state);
×
2527
    return;
×
2528
  }
2529
  pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
630,137✔
2530
  pSyncNode->roleTimeMs = taosGetTimestampMs();
630,137✔
2531
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
630,137✔
2532
  sInfo("vgId:%d, become candidate from follower. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
630,137✔
2533
        pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
2534

2535
  sNTrace(pSyncNode, "follower to candidate");
630,137✔
2536
}
2537

2538
int32_t syncNodeAssignedLeader2Leader(SSyncNode* pSyncNode) {
×
2539
  if (pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) return TSDB_CODE_SYN_INTERNAL_ERROR;
×
2540
  syncNodeBecomeLeader(pSyncNode, "assigned leader to leader");
×
2541

2542
  sNTrace(pSyncNode, "assigned leader to leader");
×
2543

2544
  int32_t ret = syncNodeAppendNoop(pSyncNode);
×
2545
  if (ret < 0) {
×
2546
    sError("vgId:%d, failed to append noop entry since %s", pSyncNode->vgId, tstrerror(ret));
×
2547
  }
2548

2549
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
×
2550
  sInfo("vgId:%d, become leader from assigned leader. term:%" PRId64 ", commit index:%" PRId64
×
2551
        "assigned commit index:%" PRId64 ", last index:%" PRId64,
2552
        pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, pSyncNode->assignedCommitIndex,
2553
        lastIndex);
2554
  return 0;
×
2555
}
2556

2557
// just called by syncNodeVoteForSelf
2558
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
719,314✔
2559
  SyncTerm storeTerm = raftStoreGetTerm(pSyncNode);
719,314✔
2560
  if (term != storeTerm) {
719,314✔
2561
    sError("vgId:%d, failed to vote for term, term:%" PRId64 ", storeTerm:%" PRId64, pSyncNode->vgId, term, storeTerm);
×
2562
    return;
×
2563
  }
2564
  sTrace("vgId:%d, begin hasVoted", pSyncNode->vgId);
719,314✔
2565
  bool voted = raftStoreHasVoted(pSyncNode);
719,314✔
2566
  if (voted) {
719,314✔
2567
    sError("vgId:%d, failed to vote for term since not voted", pSyncNode->vgId);
×
2568
    return;
×
2569
  }
2570

2571
  raftStoreVote(pSyncNode, pRaftId);
719,314✔
2572
}
2573

2574
// simulate get vote from outside
2575
void syncNodeVoteForSelf(SSyncNode* pSyncNode, SyncTerm currentTerm) {
719,314✔
2576
  syncNodeVoteForTerm(pSyncNode, currentTerm, &pSyncNode->myRaftId);
719,314✔
2577

2578
  SRpcMsg rpcMsg = {0};
719,314✔
2579
  int32_t ret = syncBuildRequestVoteReply(&rpcMsg, pSyncNode->vgId);
719,314✔
2580
  if (ret != 0) return;
719,314✔
2581

2582
  SyncRequestVoteReply* pMsg = rpcMsg.pCont;
719,314✔
2583
  pMsg->srcId = pSyncNode->myRaftId;
719,314✔
2584
  pMsg->destId = pSyncNode->myRaftId;
719,314✔
2585
  pMsg->term = currentTerm;
719,314✔
2586
  pMsg->voteGranted = true;
719,314✔
2587

2588
  voteGrantedVote(pSyncNode->pVotesGranted, pMsg);
719,314✔
2589
  votesRespondAdd(pSyncNode->pVotesRespond, pMsg);
719,314✔
2590
  rpcFreeCont(rpcMsg.pCont);
719,314✔
2591
}
2592

2593
// return if has a snapshot
2594
bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
7,228,498✔
2595
  bool      ret = false;
7,228,498✔
2596
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
7,228,498✔
2597
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
7,227,947✔
2598
    // TODO check return value
2599
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
7,227,947✔
2600
    if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) {
7,228,498✔
2601
      ret = true;
1,438,907✔
2602
    }
2603
  }
2604
  return ret;
7,229,049✔
2605
}
2606

2607
// return max(logLastIndex, snapshotLastIndex)
2608
// if no snapshot and log, return -1
2609
SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode) {
7,244,384✔
2610
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
7,244,384✔
2611
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
7,244,384✔
2612
    // TODO check return value
2613
    (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
7,243,833✔
2614
  }
2615
  SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
7,244,384✔
2616

2617
  SyncIndex lastIndex = logLastIndex > snapshot.lastApplyIndex ? logLastIndex : snapshot.lastApplyIndex;
7,244,384✔
2618
  return lastIndex;
7,244,384✔
2619
}
2620

2621
// return the last term of snapshot and log
2622
// if error, return SYNC_TERM_INVALID (by syncLogLastTerm)
2623
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) {
7,228,498✔
2624
  SyncTerm lastTerm = 0;
7,228,498✔
2625
  if (syncNodeHasSnapshot(pSyncNode)) {
7,228,498✔
2626
    // has snapshot
2627
    SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
1,438,907✔
2628
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
1,438,907✔
2629
      // TODO check return value
2630
      (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
1,438,907✔
2631
    }
2632

2633
    SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
1,438,907✔
2634
    if (logLastIndex > snapshot.lastApplyIndex) {
1,438,907✔
2635
      lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
1,054,511✔
2636
    } else {
2637
      lastTerm = snapshot.lastApplyTerm;
384,396✔
2638
    }
2639

2640
  } else {
2641
    // no snapshot
2642
    lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
5,789,591✔
2643
  }
2644

2645
  return lastTerm;
7,227,673✔
2646
}
2647

2648
// get last index and term along with snapshot
2649
int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm) {
6,053,607✔
2650
  *pLastIndex = syncNodeGetLastIndex(pSyncNode);
6,053,607✔
2651
  *pLastTerm = syncNodeGetLastTerm(pSyncNode);
6,056,313✔
2652
  return 0;
6,054,168✔
2653
}
2654

2655
#ifdef BUILD_NO_CALL
2656
// return append-entries first try index
2657
SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) {
2658
  SyncIndex syncStartIndex = syncNodeGetLastIndex(pSyncNode) + 1;
2659
  return syncStartIndex;
2660
}
2661

2662
// if index > 0, return index - 1
2663
// else, return -1
2664
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
2665
  SyncIndex preIndex = index - 1;
2666
  if (preIndex < SYNC_INDEX_INVALID) {
2667
    preIndex = SYNC_INDEX_INVALID;
2668
  }
2669

2670
  return preIndex;
2671
}
2672

2673
// if index < 0, return SYNC_TERM_INVALID
2674
// if index == 0, return 0
2675
// if index > 0, return preTerm
2676
// if error, return SYNC_TERM_INVALID
2677
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
2678
  if (index < SYNC_INDEX_BEGIN) {
2679
    return SYNC_TERM_INVALID;
2680
  }
2681

2682
  if (index == SYNC_INDEX_BEGIN) {
2683
    return 0;
2684
  }
2685

2686
  SyncTerm  preTerm = 0;
2687
  SyncIndex preIndex = index - 1;
2688

2689
  SSyncRaftEntry* pPreEntry = NULL;
2690
  SLRUCache*      pCache = pSyncNode->pLogStore->pCache;
2691
  LRUHandle*      h = taosLRUCacheLookup(pCache, &preIndex, sizeof(preIndex));
2692
  int32_t         code = 0;
2693
  if (h) {
2694
    pPreEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
2695
    code = 0;
2696

2697
    pSyncNode->pLogStore->cacheHit++;
2698
    sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", preIndex, pPreEntry->bytes, pPreEntry);
2699

2700
  } else {
2701
    pSyncNode->pLogStore->cacheMiss++;
2702
    sNTrace(pSyncNode, "miss cache index:%" PRId64, preIndex);
2703

2704
    code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry);
2705
  }
2706

2707
  SSnapshot snapshot = {.data = NULL,
2708
                        .lastApplyIndex = SYNC_INDEX_INVALID,
2709
                        .lastApplyTerm = SYNC_TERM_INVALID,
2710
                        .lastConfigIndex = SYNC_INDEX_INVALID};
2711

2712
  if (code == 0) {
2713
    if (pPreEntry == NULL) return -1;
2714
    preTerm = pPreEntry->term;
2715

2716
    if (h) {
2717
      taosLRUCacheRelease(pCache, h, false);
2718
    } else {
2719
      syncEntryDestroy(pPreEntry);
2720
    }
2721

2722
    return preTerm;
2723
  } else {
2724
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
2725
      // TODO check return value
2726
      (void)pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
2727
      if (snapshot.lastApplyIndex == preIndex) {
2728
        return snapshot.lastApplyTerm;
2729
      }
2730
    }
2731
  }
2732

2733
  sNError(pSyncNode, "sync node get pre term error, index:%" PRId64 ", snap-index:%" PRId64 ", snap-term:%" PRId64,
2734
          index, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
2735
  return SYNC_TERM_INVALID;
2736
}
2737

2738
// get pre index and term of "index"
2739
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm) {
2740
  *pPreIndex = syncNodeGetPreIndex(pSyncNode, index);
2741
  *pPreTerm = syncNodeGetPreTerm(pSyncNode, index);
2742
  return 0;
2743
}
2744
#endif
2745

2746
static void syncNodeEqPingTimer(void* param, void* tmrId) {
27,501,134✔
2747
  if (!syncIsInit()) return;
27,501,134✔
2748

2749
  int64_t    rid = (int64_t)param;
27,501,134✔
2750
  SSyncNode* pNode = syncNodeAcquire(rid);
27,501,134✔
2751

2752
  if (pNode == NULL) return;
27,501,134✔
2753

2754
  if (atomic_load_64(&pNode->pingTimerLogicClockUser) <= atomic_load_64(&pNode->pingTimerLogicClock)) {
27,501,134✔
2755
    SRpcMsg rpcMsg = {0};
27,501,134✔
2756
    int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock),
27,501,134✔
2757
                                    pNode->pingTimerMS, pNode);
2758
    if (code != 0) {
27,500,490✔
2759
      sError("failed to build ping msg");
×
2760
      rpcFreeCont(rpcMsg.pCont);
×
2761
      goto _out;
×
2762
    }
2763

2764
    // sTrace("enqueue ping msg");
2765
    code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
27,500,490✔
2766
    if (code != 0) {
27,501,134✔
2767
      sError("failed to sync enqueue ping msg since %s", terrstr());
6,447✔
2768
      rpcFreeCont(rpcMsg.pCont);
6,447✔
2769
      goto _out;
6,447✔
2770
    }
2771

2772
  _out:
27,501,134✔
2773
    if (taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager,
27,500,773✔
2774
                     &pNode->pPingTimer))
2775
      sError("failed to reset ping timer");
×
2776
  }
2777
  syncNodeRelease(pNode);
27,501,134✔
2778
}
2779

2780
static void syncNodeEqElectTimer(void* param, void* tmrId) {
747,810✔
2781
  if (!syncIsInit()) return;
747,810✔
2782

2783
  int64_t    rid = (int64_t)param;
747,810✔
2784
  SSyncNode* pNode = syncNodeAcquire(rid);
747,810✔
2785

2786
  if (pNode == NULL) return;
747,810✔
2787

2788
  if (pNode->syncEqMsg == NULL) {
746,204✔
2789
    syncNodeRelease(pNode);
×
2790
    return;
×
2791
  }
2792

2793
  int64_t tsNow = taosGetTimestampMs();
746,204✔
2794
  if (tsNow < pNode->electTimerParam.executeTime) {
746,204✔
2795
    syncNodeRelease(pNode);
5,578✔
2796
    return;
5,578✔
2797
  }
2798

2799
  SRpcMsg rpcMsg = {0};
740,626✔
2800
  int32_t code =
2801
      syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pNode->electTimerParam.logicClock, pNode->electTimerMS, pNode);
740,626✔
2802

2803
  if (code != 0) {
740,626✔
2804
    sError("failed to build elect msg");
×
2805
    syncNodeRelease(pNode);
×
2806
    return;
×
2807
  }
2808

2809
  SyncTimeout* pTimeout = rpcMsg.pCont;
740,626✔
2810
  sNTrace(pNode, "enqueue elect msg lc:%" PRId64, pTimeout->logicClock);
740,626✔
2811

2812
  code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
740,626✔
2813
  if (code != 0) {
740,626✔
2814
    sError("failed to sync enqueue elect msg since %s", terrstr());
×
2815
    rpcFreeCont(rpcMsg.pCont);
×
2816
    syncNodeRelease(pNode);
×
2817
    return;
×
2818
  }
2819

2820
  syncNodeRelease(pNode);
740,626✔
2821
}
2822

2823
#ifdef BUILD_NO_CALL
2824
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
2825
  if (!syncIsInit()) return;
2826

2827
  int64_t    rid = (int64_t)param;
2828
  SSyncNode* pNode = syncNodeAcquire(rid);
2829

2830
  if (pNode == NULL) return;
2831

2832
  if (pNode->totalReplicaNum > 1) {
2833
    if (atomic_load_64(&pNode->heartbeatTimerLogicClockUser) <= atomic_load_64(&pNode->heartbeatTimerLogicClock)) {
2834
      SRpcMsg rpcMsg = {0};
2835
      int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pNode->heartbeatTimerLogicClock),
2836
                                      pNode->heartbeatTimerMS, pNode);
2837

2838
      if (code != 0) {
2839
        sError("failed to build heartbeat msg");
2840
        goto _out;
2841
      }
2842

2843
      sTrace("vgId:%d, enqueue heartbeat timer", pNode->vgId);
2844
      code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
2845
      if (code != 0) {
2846
        sError("failed to enqueue heartbeat msg since %s", terrstr());
2847
        rpcFreeCont(rpcMsg.pCont);
2848
        goto _out;
2849
      }
2850

2851
    _out:
2852
      if (taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager,
2853
                       &pNode->pHeartbeatTimer) != 0)
2854
        return;
2855

2856
    } else {
2857
      sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRId64 ", heartbeatTimerLogicClockUser:%" PRId64,
2858
             pNode->heartbeatTimerLogicClock, pNode->heartbeatTimerLogicClockUser);
2859
    }
2860
  }
2861
}
2862
#endif
2863

2864
static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
34,070,746✔
2865
  if (tsSyncLogHeartbeat) {
34,070,746✔
2866
    sInfo("heartbeat timer start");
×
2867
  }
2868
  int32_t code = 0;
34,070,746✔
2869
  int64_t hbDataRid = (int64_t)param;
34,070,746✔
2870
  int64_t tsNow = taosGetTimestampMs();
34,070,746✔
2871

2872
  SSyncHbTimerData* pData = syncHbTimerDataAcquire(hbDataRid);
34,070,746✔
2873
  if (pData == NULL) {
34,070,746✔
2874
    sError("hb timer get pData NULL, %" PRId64, hbDataRid);
×
2875
    return;
×
2876
  }
2877

2878
  SSyncNode* pSyncNode = syncNodeAcquire(pData->syncNodeRid);
34,070,746✔
2879
  if (pSyncNode == NULL) {
34,070,746✔
2880
    syncHbTimerDataRelease(pData);
328✔
2881
    sError("hb timer get pSyncNode NULL");
328✔
2882
    return;
328✔
2883
  }
2884

2885
  SSyncTimer* pSyncTimer = pData->pTimer;
34,070,418✔
2886

2887
  if (!pSyncNode->isStart) {
34,070,418✔
2888
    syncNodeRelease(pSyncNode);
×
2889
    syncHbTimerDataRelease(pData);
×
2890
    sError("vgId:%d, hb timer sync node already stop", pSyncNode->vgId);
×
2891
    return;
×
2892
  }
2893

2894
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
34,070,418✔
2895
    syncNodeRelease(pSyncNode);
×
2896
    syncHbTimerDataRelease(pData);
×
2897
    sError("vgId:%d, hb timer sync node not leader", pSyncNode->vgId);
×
2898
    return;
×
2899
  }
2900

2901
  if (tsSyncLogHeartbeat) {
34,070,418✔
2902
    sInfo("vgId:%d, peer hb timer execution, rid:%" PRId64 " addr:0x%" PRIx64, pSyncNode->vgId, hbDataRid,
×
2903
          pData->destId.addr);
2904
  } else {
2905
    sTrace("vgId:%d, peer hb timer execution, rid:%" PRId64 " addr:0x%" PRIx64, pSyncNode->vgId, hbDataRid,
34,070,418✔
2906
           pData->destId.addr);
2907
  }
2908

2909
  if (pSyncNode->totalReplicaNum > 1) {
34,070,418✔
2910
    int64_t timerLogicClock = atomic_load_64(&pSyncTimer->logicClock);
34,069,716✔
2911
    int64_t msgLogicClock = atomic_load_64(&pData->logicClock);
34,069,716✔
2912

2913
    if (timerLogicClock == msgLogicClock) {
34,069,716✔
2914
      if (tsNow > pData->execTime) {
34,068,731✔
2915
        pData->execTime += pSyncTimer->timerMS;
34,055,730✔
2916

2917
        SRpcMsg rpcMsg = {0};
34,055,730✔
2918
        if ((code = syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId)) != 0) {
34,055,730✔
2919
          sError("vgId:%d, failed to build heartbeat msg since %s", pSyncNode->vgId, tstrerror(code));
×
2920
          syncNodeRelease(pSyncNode);
×
2921
          syncHbTimerDataRelease(pData);
×
2922
          return;
×
2923
        }
2924

2925
        pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode);
34,055,730✔
2926

2927
        SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
34,055,730✔
2928
        pSyncMsg->srcId = pSyncNode->myRaftId;
34,055,730✔
2929
        pSyncMsg->destId = pData->destId;
34,055,730✔
2930
        pSyncMsg->term = raftStoreGetTerm(pSyncNode);
34,055,730✔
2931
        pSyncMsg->commitIndex = pSyncNode->commitIndex;
34,055,730✔
2932
        pSyncMsg->minMatchIndex = pSyncNode->minMatchIndex;
34,055,730✔
2933
        pSyncMsg->privateTerm = 0;
34,055,730✔
2934
        pSyncMsg->timeStamp = tsNow;
34,055,730✔
2935

2936
        // update reset time
2937
        int64_t timerElapsed = tsNow - pSyncTimer->timeStamp;
34,055,730✔
2938
        pSyncTimer->timeStamp = tsNow;
34,055,730✔
2939

2940
        // send msg
2941
        TRACE_SET_MSGID(&(rpcMsg.info.traceId), tGenIdPI64());
34,055,730✔
2942
        TRACE_SET_ROOTID(&(rpcMsg.info.traceId), tGenIdPI64());
34,055,730✔
2943
        syncLogSendHeartbeat(pSyncNode, pSyncMsg, false, timerElapsed, pData->execTime, &(rpcMsg.info.traceId));
34,055,730✔
2944
        int ret = syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
34,055,730✔
2945
        if (ret != 0) {
34,055,730✔
2946
          sError("vgId:%d, failed to send heartbeat since %s", pSyncNode->vgId, tstrerror(ret));
32,301✔
2947
        }
2948
      }
2949

2950
      if (syncIsInit()) {
34,068,731✔
2951
        if (tsSyncLogHeartbeat) {
34,068,731✔
2952
          sInfo("vgId:%d, reset peer hb timer at %d", pSyncNode->vgId, pSyncTimer->timerMS);
×
2953
        } else {
2954
          sTrace("vgId:%d, reset peer hb timer at %d", pSyncNode->vgId, pSyncTimer->timerMS);
34,068,731✔
2955
        }
2956
        bool stopped = taosTmrResetPriority(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS, (void*)hbDataRid,
34,068,731✔
2957
                                            syncEnv()->pTimerManager, &pSyncTimer->pTimer, 2);
34,068,731✔
2958
        if (stopped) sError("vgId:%d, reset peer hb timer error, %s", pSyncNode->vgId, tstrerror(code));
34,068,731✔
2959

2960
      } else {
2961
        sError("sync env is stop, reset peer hb timer error");
×
2962
      }
2963

2964
    } else {
2965
      sTrace("vgId:%d, do not send hb, timerLogicClock:%" PRId64 ", msgLogicClock:%" PRId64, pSyncNode->vgId,
985✔
2966
             timerLogicClock, msgLogicClock);
2967
    }
2968

2969
    if (tsSyncLogHeartbeat) {
34,069,716✔
2970
      sInfo("vgId:%d, finish send sync-heartbeat", pSyncNode->vgId);
×
2971
    }
2972
  }
2973

2974
  syncHbTimerDataRelease(pData);
34,070,418✔
2975
  syncNodeRelease(pSyncNode);
34,070,418✔
2976
  if (tsSyncLogHeartbeat) {
34,070,418✔
2977
    sInfo("heartbeat timer stop");
×
2978
  }
2979
}
2980

2981
#ifdef BUILD_NO_CALL
2982
static void deleteCacheEntry(const void* key, size_t keyLen, void* value, void* ud) {
2983
  (void)ud;
2984
  taosMemoryFree(value);
2985
}
2986

2987
int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) {
2988
  SSyncLogStoreData* pData = pLogStore->data;
2989
  sNTrace(pData->pSyncNode, "in cache index:%" PRId64 ", bytes:%u, %p", pEntry->index, pEntry->bytes, pEntry);
2990

2991
  int32_t   code = 0;
2992
  int32_t   entryLen = sizeof(*pEntry) + pEntry->dataLen;
2993
  LRUStatus status = taosLRUCacheInsert(pLogStore->pCache, &pEntry->index, sizeof(pEntry->index), pEntry, entryLen,
2994
                                        deleteCacheEntry, h, TAOS_LRU_PRIORITY_LOW, NULL);
2995
  if (status != TAOS_LRU_STATUS_OK) {
2996
    code = -1;
2997
  }
2998

2999
  return code;
3000
}
3001
#endif
3002

3003
void syncBuildConfigFromReq(SAlterVnodeReplicaReq* pReq, SSyncCfg* cfg) {  // TODO SAlterVnodeReplicaReq name is proper?
×
3004
  cfg->replicaNum = 0;
×
3005
  cfg->totalReplicaNum = 0;
×
3006
  int32_t code = 0;
×
3007

3008
  for (int i = 0; i < pReq->replica; ++i) {
×
3009
    SNodeInfo* pNode = &cfg->nodeInfo[i];
×
3010
    pNode->nodeId = pReq->replicas[i].id;
×
3011
    pNode->nodePort = pReq->replicas[i].port;
×
3012
    tstrncpy(pNode->nodeFqdn, pReq->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
×
3013
    pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3014
    bool update = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
×
3015
    sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d, update:%d", pReq->vgId, i, pNode->nodeFqdn,
×
3016
          pNode->nodePort, pNode->nodeId, pNode->nodeRole, update);
3017
    cfg->replicaNum++;
×
3018
  }
3019
  if (pReq->selfIndex != -1) {
×
3020
    cfg->myIndex = pReq->selfIndex;
×
3021
  }
3022
  for (int i = cfg->replicaNum; i < pReq->replica + pReq->learnerReplica; ++i) {
×
3023
    SNodeInfo* pNode = &cfg->nodeInfo[i];
×
3024
    pNode->nodeId = pReq->learnerReplicas[cfg->totalReplicaNum].id;
×
3025
    pNode->nodePort = pReq->learnerReplicas[cfg->totalReplicaNum].port;
×
3026
    pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3027
    tstrncpy(pNode->nodeFqdn, pReq->learnerReplicas[cfg->totalReplicaNum].fqdn, sizeof(pNode->nodeFqdn));
×
3028
    bool update = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
×
3029
    sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d, update:%d", pReq->vgId, i, pNode->nodeFqdn,
×
3030
          pNode->nodePort, pNode->nodeId, pNode->nodeRole, update);
3031
    cfg->totalReplicaNum++;
×
3032
  }
3033
  cfg->totalReplicaNum += pReq->replica;
×
3034
  if (pReq->learnerSelfIndex != -1) {
×
3035
    cfg->myIndex = pReq->replica + pReq->learnerSelfIndex;
×
3036
  }
3037
  cfg->changeVersion = pReq->changeVersion;
×
3038
}
×
3039

3040
int32_t syncNodeCheckChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry) {
×
3041
  int32_t code = 0;
×
3042
  if (pEntry->originalRpcType != TDMT_SYNC_CONFIG_CHANGE) {
×
3043
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3044
  }
3045

3046
  SMsgHead* head = (SMsgHead*)pEntry->data;
×
3047
  void*     pReq = POINTER_SHIFT(head, sizeof(SMsgHead));
×
3048

3049
  SAlterVnodeTypeReq req = {0};
×
3050
  if (tDeserializeSAlterVnodeReplicaReq(pReq, head->contLen, &req) != 0) {
×
3051
    code = TSDB_CODE_INVALID_MSG;
×
3052
    TAOS_RETURN(code);
×
3053
  }
3054

3055
  SSyncCfg cfg = {0};
×
3056
  syncBuildConfigFromReq(&req, &cfg);
×
3057

3058
  if (cfg.totalReplicaNum >= 1 &&
×
3059
      (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER)) {
×
3060
    bool incfg = false;
×
3061
    for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
×
3062
      if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
×
3063
          ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
×
3064
        incfg = true;
×
3065
        break;
×
3066
      }
3067
    }
3068

3069
    if (!incfg) {
×
3070
      SyncTerm currentTerm = raftStoreGetTerm(ths);
×
3071
      SRaftId  id = EMPTY_RAFT_ID;
×
3072
      syncNodeStepDown(ths, currentTerm, id, "changeConfig");
×
3073
      return 1;
×
3074
    }
3075
  }
3076
  return 0;
×
3077
}
3078

3079
void syncNodeLogConfigInfo(SSyncNode* ths, SSyncCfg* cfg, char* str) {
×
3080
  sInfo("vgId:%d, %s. SyncNode, replicaNum:%d, peersNum:%d, lastConfigIndex:%" PRId64
×
3081
        ", changeVersion:%d, "
3082
        "restoreFinish:%d",
3083
        ths->vgId, str, ths->replicaNum, ths->peersNum, ths->raftCfg.lastConfigIndex, ths->raftCfg.cfg.changeVersion,
3084
        ths->restoreFinish);
3085

3086
  sInfo("vgId:%d, %s, myNodeInfo, clusterId:0x%" PRIx64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str,
×
3087
        ths->myNodeInfo.clusterId, ths->myNodeInfo.nodeId, ths->myNodeInfo.nodeFqdn, ths->myNodeInfo.nodePort,
3088
        ths->myNodeInfo.nodeRole);
3089

3090
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
3091
    sInfo("vgId:%d, %s, peersNodeInfo%d, clusterId:0x%" PRIx64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str,
×
3092
          i, ths->peersNodeInfo[i].clusterId, ths->peersNodeInfo[i].nodeId, ths->peersNodeInfo[i].nodeFqdn,
3093
          ths->peersNodeInfo[i].nodePort, ths->peersNodeInfo[i].nodeRole);
3094
  }
3095

3096
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
3097
    char    buf[256];
×
3098
    int32_t len = 256;
×
3099
    int32_t n = 0;
×
3100
    n += tsnprintf(buf + n, len - n, "%s", "{");
×
3101
    for (int i = 0; i < ths->peersEpset->numOfEps; i++) {
×
3102
      n += tsnprintf(buf + n, len - n, "%s:%d%s", ths->peersEpset->eps[i].fqdn, ths->peersEpset->eps[i].port,
×
3103
                     (i + 1 < ths->peersEpset->numOfEps ? ", " : ""));
×
3104
    }
3105
    n += tsnprintf(buf + n, len - n, "%s", "}");
×
3106

3107
    sInfo("vgId:%d, %s, peersEpset%d, %s, inUse:%d", ths->vgId, str, i, buf, ths->peersEpset->inUse);
×
3108
  }
3109

3110
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
3111
    sInfo("vgId:%d, %s, peersId%d, addr:0x%" PRIx64, ths->vgId, str, i, ths->peersId[i].addr);
×
3112
  }
3113

3114
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3115
    sInfo("vgId:%d, %s, nodeInfo%d, clusterId:0x%" PRIx64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str, i,
×
3116
          ths->raftCfg.cfg.nodeInfo[i].clusterId, ths->raftCfg.cfg.nodeInfo[i].nodeId,
3117
          ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, ths->raftCfg.cfg.nodeInfo[i].nodePort,
3118
          ths->raftCfg.cfg.nodeInfo[i].nodeRole);
3119
  }
3120

3121
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3122
    sInfo("vgId:%d, %s, replicasId%d, addr:0x%" PRIx64, ths->vgId, str, i, ths->replicasId[i].addr);
×
3123
  }
3124
}
×
3125

3126
int32_t syncNodeRebuildPeerAndCfg(SSyncNode* ths, SSyncCfg* cfg) {
×
3127
  int32_t i = 0;
×
3128

3129
  // change peersNodeInfo
3130
  i = 0;
×
3131
  for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
3132
    if (!(strcmp(ths->myNodeInfo.nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
3133
          ths->myNodeInfo.nodePort == cfg->nodeInfo[j].nodePort)) {
×
3134
      ths->peersNodeInfo[i].nodeRole = cfg->nodeInfo[j].nodeRole;
×
3135
      ths->peersNodeInfo[i].clusterId = cfg->nodeInfo[j].clusterId;
×
3136
      tstrncpy(ths->peersNodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn, TSDB_FQDN_LEN);
×
3137
      ths->peersNodeInfo[i].nodeId = cfg->nodeInfo[j].nodeId;
×
3138
      ths->peersNodeInfo[i].nodePort = cfg->nodeInfo[j].nodePort;
×
3139

3140
      syncUtilNodeInfo2EpSet(&ths->peersNodeInfo[i], &ths->peersEpset[i]);
×
3141

3142
      if (syncUtilNodeInfo2RaftId(&ths->peersNodeInfo[i], ths->vgId, &ths->peersId[i]) == false) {
×
3143
        sError("vgId:%d, failed to determine raft member id, peer:%d", ths->vgId, i);
×
3144
        return terrno;
×
3145
      }
3146

3147
      i++;
×
3148
    }
3149
  }
3150
  ths->peersNum = i;
×
3151

3152
  // change cfg nodeInfo
3153
  ths->raftCfg.cfg.replicaNum = 0;
×
3154
  i = 0;
×
3155
  for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
3156
    if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3157
      ths->raftCfg.cfg.replicaNum++;
×
3158
    }
3159
    ths->raftCfg.cfg.nodeInfo[i].nodeRole = cfg->nodeInfo[j].nodeRole;
×
3160
    ths->raftCfg.cfg.nodeInfo[i].clusterId = cfg->nodeInfo[j].clusterId;
×
3161
    tstrncpy(ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn, TSDB_FQDN_LEN);
×
3162
    ths->raftCfg.cfg.nodeInfo[i].nodeId = cfg->nodeInfo[j].nodeId;
×
3163
    ths->raftCfg.cfg.nodeInfo[i].nodePort = cfg->nodeInfo[j].nodePort;
×
3164
    if ((strcmp(ths->myNodeInfo.nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
3165
         ths->myNodeInfo.nodePort == cfg->nodeInfo[j].nodePort)) {
×
3166
      ths->raftCfg.cfg.myIndex = i;
×
3167
    }
3168
    i++;
×
3169
  }
3170
  ths->raftCfg.cfg.totalReplicaNum = i;
×
3171

3172
  return 0;
×
3173
}
3174

3175
void syncNodeChangePeerAndCfgToVoter(SSyncNode* ths, SSyncCfg* cfg) {
×
3176
  // change peersNodeInfo
3177
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
3178
    for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
3179
      if (strcmp(ths->peersNodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
3180
          ths->peersNodeInfo[i].nodePort == cfg->nodeInfo[j].nodePort) {
×
3181
        if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3182
          ths->peersNodeInfo[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3183
        }
3184
      }
3185
    }
3186
  }
3187

3188
  // change cfg nodeInfo
3189
  ths->raftCfg.cfg.replicaNum = 0;
×
3190
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3191
    for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
×
3192
      if (strcmp(ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
×
3193
          ths->raftCfg.cfg.nodeInfo[i].nodePort == cfg->nodeInfo[j].nodePort) {
×
3194
        if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3195
          ths->raftCfg.cfg.nodeInfo[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3196
          ths->raftCfg.cfg.replicaNum++;
×
3197
        }
3198
      }
3199
    }
3200
  }
3201
}
×
3202

3203
int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum) {
×
3204
  int32_t code = 0;
×
3205
  // 1.rebuild replicasId, remove deleted one
3206
  SRaftId oldReplicasId[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
×
3207
  memcpy(oldReplicasId, ths->replicasId, sizeof(oldReplicasId));
×
3208

3209
  ths->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3210
  ths->totalReplicaNum = ths->raftCfg.cfg.totalReplicaNum;
×
3211
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3212
    if (syncUtilNodeInfo2RaftId(&ths->raftCfg.cfg.nodeInfo[i], ths->vgId, &ths->replicasId[i]) == false) return terrno;
×
3213
  }
3214

3215
  // 2.rebuild MatchIndex, remove deleted one
3216
  SSyncIndexMgr* oldIndex = ths->pMatchIndex;
×
3217

3218
  ths->pMatchIndex = syncIndexMgrCreate(ths);
×
3219
  if (ths->pMatchIndex == NULL) {
×
3220
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
3221
    if (terrno != 0) code = terrno;
×
3222
    TAOS_RETURN(code);
×
3223
  }
3224

3225
  syncIndexMgrCopyIfExist(ths->pMatchIndex, oldIndex, oldReplicasId);
×
3226

3227
  syncIndexMgrDestroy(oldIndex);
×
3228

3229
  // 3.rebuild NextIndex, remove deleted one
3230
  SSyncIndexMgr* oldNextIndex = ths->pNextIndex;
×
3231

3232
  ths->pNextIndex = syncIndexMgrCreate(ths);
×
3233
  if (ths->pNextIndex == NULL) {
×
3234
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
3235
    if (terrno != 0) code = terrno;
×
3236
    TAOS_RETURN(code);
×
3237
  }
3238

3239
  syncIndexMgrCopyIfExist(ths->pNextIndex, oldNextIndex, oldReplicasId);
×
3240

3241
  syncIndexMgrDestroy(oldNextIndex);
×
3242

3243
  // 4.rebuild pVotesGranted, pVotesRespond, no need to keep old vote state, only rebuild
3244
  voteGrantedUpdate(ths->pVotesGranted, ths);
×
3245
  votesRespondUpdate(ths->pVotesRespond, ths);
×
3246

3247
  // 5.rebuild logReplMgr
3248
  for (int i = 0; i < oldtotalReplicaNum; ++i) {
×
3249
    sDebug("vgId:%d, old logReplMgrs i:%d, peerId:%d, restoreed:%d, [%" PRId64 " %" PRId64 ", %" PRId64 ")", ths->vgId,
×
3250
           i, ths->logReplMgrs[i]->peerId, ths->logReplMgrs[i]->restored, ths->logReplMgrs[i]->startIndex,
3251
           ths->logReplMgrs[i]->matchIndex, ths->logReplMgrs[i]->endIndex);
3252
  }
3253

3254
  SSyncLogReplMgr* oldLogReplMgrs = NULL;
×
3255
  int64_t          length = sizeof(SSyncLogReplMgr) * (TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA);
×
3256
  oldLogReplMgrs = taosMemoryMalloc(length);
×
3257
  if (NULL == oldLogReplMgrs) return terrno;
×
3258
  memset(oldLogReplMgrs, 0, length);
×
3259

3260
  for (int i = 0; i < oldtotalReplicaNum; i++) {
×
3261
    oldLogReplMgrs[i] = *(ths->logReplMgrs[i]);
×
3262
  }
3263

3264
  syncNodeLogReplDestroy(ths);
×
3265
  if ((code = syncNodeLogReplInit(ths)) != 0) {
×
3266
    taosMemoryFree(oldLogReplMgrs);
×
3267
    TAOS_RETURN(code);
×
3268
  }
3269

3270
  for (int i = 0; i < ths->totalReplicaNum; ++i) {
×
3271
    for (int j = 0; j < oldtotalReplicaNum; j++) {
×
3272
      if (syncUtilSameId(&ths->replicasId[i], &oldReplicasId[j])) {
×
3273
        *(ths->logReplMgrs[i]) = oldLogReplMgrs[j];
×
3274
        ths->logReplMgrs[i]->peerId = i;
×
3275
      }
3276
    }
3277
  }
3278

3279
  for (int i = 0; i < ths->totalReplicaNum; ++i) {
×
3280
    sDebug("vgId:%d, new logReplMgrs i:%d, peerId:%d, restoreed:%d, [%" PRId64 " %" PRId64 ", %" PRId64 ")", ths->vgId,
×
3281
           i, ths->logReplMgrs[i]->peerId, ths->logReplMgrs[i]->restored, ths->logReplMgrs[i]->startIndex,
3282
           ths->logReplMgrs[i]->matchIndex, ths->logReplMgrs[i]->endIndex);
3283
  }
3284

3285
  // 6.rebuild sender
3286
  for (int i = 0; i < oldtotalReplicaNum; ++i) {
×
3287
    sDebug("vgId:%d, old sender i:%d, replicaIndex:%d, lastSendTime:%" PRId64, ths->vgId, i,
×
3288
           ths->senders[i]->replicaIndex, ths->senders[i]->lastSendTime)
3289
  }
3290

3291
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
×
3292
    if (ths->senders[i] != NULL) {
×
3293
      sDebug("vgId:%d, snapshot sender destroy while close, data:%p", ths->vgId, ths->senders[i]);
×
3294

3295
      if (snapshotSenderIsStart(ths->senders[i])) {
×
3296
        snapshotSenderStop(ths->senders[i], false);
×
3297
      }
3298

3299
      snapshotSenderDestroy(ths->senders[i]);
×
3300
      ths->senders[i] = NULL;
×
3301
    }
3302
  }
3303

3304
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
×
3305
    SSyncSnapshotSender* pSender = NULL;
×
3306
    int32_t              code = snapshotSenderCreate(ths, i, &pSender);
×
3307
    if (pSender == NULL) return terrno = code;
×
3308

3309
    ths->senders[i] = pSender;
×
3310
    sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender);
×
3311
  }
3312

3313
  for (int i = 0; i < ths->totalReplicaNum; i++) {
×
3314
    sDebug("vgId:%d, new sender i:%d, replicaIndex:%d, lastSendTime:%" PRId64, ths->vgId, i,
×
3315
           ths->senders[i]->replicaIndex, ths->senders[i]->lastSendTime)
3316
  }
3317

3318
  // 7.rebuild synctimer
3319
  if ((code = syncNodeStopHeartbeatTimer(ths)) != 0) {
×
3320
    taosMemoryFree(oldLogReplMgrs);
×
3321
    TAOS_RETURN(code);
×
3322
  }
3323

3324
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
×
3325
    if ((code = syncHbTimerInit(ths, &ths->peerHeartbeatTimerArr[i], ths->replicasId[i])) != 0) {
×
3326
      taosMemoryFree(oldLogReplMgrs);
×
3327
      TAOS_RETURN(code);
×
3328
    }
3329
  }
3330

3331
  if ((code = syncNodeStartHeartbeatTimer(ths)) != 0) {
×
3332
    taosMemoryFree(oldLogReplMgrs);
×
3333
    TAOS_RETURN(code);
×
3334
  }
3335

3336
  // 8.rebuild peerStates
3337
  SPeerState oldState[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA] = {0};
×
3338
  for (int i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; i++) {
×
3339
    oldState[i] = ths->peerStates[i];
×
3340
  }
3341

3342
  for (int i = 0; i < ths->totalReplicaNum; i++) {
×
3343
    for (int j = 0; j < oldtotalReplicaNum; j++) {
×
3344
      if (syncUtilSameId(&ths->replicasId[i], &oldReplicasId[j])) {
×
3345
        ths->peerStates[i] = oldState[j];
×
3346
      }
3347
    }
3348
  }
3349

3350
  taosMemoryFree(oldLogReplMgrs);
×
3351

3352
  return 0;
×
3353
}
3354

3355
void syncNodeChangeToVoter(SSyncNode* ths) {
×
3356
  // replicasId, only need to change replicaNum when 1->3
3357
  ths->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3358
  sDebug("vgId:%d, totalReplicaNum:%d", ths->vgId, ths->totalReplicaNum);
×
3359
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
×
3360
    sDebug("vgId:%d, i:%d, replicaId.addr:0x%" PRIx64, ths->vgId, i, ths->replicasId[i].addr);
×
3361
  }
3362

3363
  // pMatchIndex, pNextIndex, only need to change replicaNum when 1->3
3364
  ths->pMatchIndex->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3365
  ths->pNextIndex->replicaNum = ths->raftCfg.cfg.replicaNum;
×
3366

3367
  sDebug("vgId:%d, pMatchIndex->totalReplicaNum:%d", ths->vgId, ths->pMatchIndex->totalReplicaNum);
×
3368
  for (int32_t i = 0; i < ths->pMatchIndex->totalReplicaNum; ++i) {
×
3369
    sDebug("vgId:%d, i:%d, match.index:%" PRId64, ths->vgId, i, ths->pMatchIndex->index[i]);
×
3370
  }
3371

3372
  // pVotesGranted, pVotesRespond
3373
  voteGrantedUpdate(ths->pVotesGranted, ths);
×
3374
  votesRespondUpdate(ths->pVotesRespond, ths);
×
3375

3376
  // logRepMgrs
3377
  // no need to change logRepMgrs when 1->3
3378
}
×
3379

3380
void syncNodeResetPeerAndCfg(SSyncNode* ths) {
×
3381
  SNodeInfo node = {0};
×
3382
  for (int32_t i = 0; i < ths->peersNum; ++i) {
×
3383
    memcpy(&ths->peersNodeInfo[i], &node, sizeof(SNodeInfo));
×
3384
  }
3385

3386
  for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
×
3387
    memcpy(&ths->raftCfg.cfg.nodeInfo[i], &node, sizeof(SNodeInfo));
×
3388
  }
3389
}
×
3390

3391
int32_t syncNodeChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry, char* str) {
×
3392
  int32_t code = 0;
×
3393
  if (pEntry->originalRpcType != TDMT_SYNC_CONFIG_CHANGE) {
×
3394
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3395
  }
3396

3397
  SMsgHead* head = (SMsgHead*)pEntry->data;
×
3398
  void*     pReq = POINTER_SHIFT(head, sizeof(SMsgHead));
×
3399

3400
  SAlterVnodeTypeReq req = {0};
×
3401
  if (tDeserializeSAlterVnodeReplicaReq(pReq, head->contLen, &req) != 0) {
×
3402
    code = TSDB_CODE_INVALID_MSG;
×
3403
    TAOS_RETURN(code);
×
3404
  }
3405

3406
  SSyncCfg cfg = {0};
×
3407
  syncBuildConfigFromReq(&req, &cfg);
×
3408

3409
  if (cfg.changeVersion <= ths->raftCfg.cfg.changeVersion) {
×
3410
    sInfo(
×
3411
        "vgId:%d, skip conf change entry since lower version. "
3412
        "this entry, index:%" PRId64 ", term:%" PRId64
3413
        ", totalReplicaNum:%d, changeVersion:%d; "
3414
        "current node, replicaNum:%d, peersNum:%d, lastConfigIndex:%" PRId64 ", changeVersion:%d",
3415
        ths->vgId, pEntry->index, pEntry->term, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum, ths->peersNum,
3416
        ths->raftCfg.lastConfigIndex, ths->raftCfg.cfg.changeVersion);
3417
    return 0;
×
3418
  }
3419

3420
  if (strcmp(str, "Commit") == 0) {
×
3421
    sInfo(
×
3422
        "vgId:%d, change config from %s. "
3423
        "this, i:%" PRId64
3424
        ", trNum:%d, vers:%d; "
3425
        "node, rNum:%d, pNum:%d, trNum:%d, "
3426
        "buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
3427
        "), "
3428
        "cond:(next i:%" PRId64 ", t:%" PRId64 " ==%s)",
3429
        ths->vgId, str, pEntry->index - 1, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum, ths->peersNum,
3430
        ths->totalReplicaNum, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex, ths->pLogBuf->matchIndex,
3431
        ths->pLogBuf->endIndex, pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType));
3432
  } else {
3433
    sInfo(
×
3434
        "vgId:%d, change config from %s. "
3435
        "this, i:%" PRId64 ", t:%" PRId64
3436
        ", trNum:%d, vers:%d; "
3437
        "node, rNum:%d, pNum:%d, trNum:%d, "
3438
        "buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
3439
        "), "
3440
        "cond:(pre i:%" PRId64 "==ci:%" PRId64 ", bci:%" PRId64 ")",
3441
        ths->vgId, str, pEntry->index, pEntry->term, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum,
3442
        ths->peersNum, ths->totalReplicaNum, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
3443
        ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex, pEntry->index - 1, ths->commitIndex,
3444
        ths->pLogBuf->commitIndex);
3445
  }
3446

3447
  syncNodeLogConfigInfo(ths, &cfg, "before config change");
×
3448

3449
  int32_t oldTotalReplicaNum = ths->totalReplicaNum;
×
3450

3451
  if (cfg.totalReplicaNum == 1 || cfg.totalReplicaNum == 2) {  // remove replica
×
3452

3453
    bool incfg = false;
×
3454
    for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
×
3455
      if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
×
3456
          ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
×
3457
        incfg = true;
×
3458
        break;
×
3459
      }
3460
    }
3461

3462
    if (incfg) {  // remove other
×
3463
      syncNodeResetPeerAndCfg(ths);
×
3464

3465
      // no need to change myNodeInfo
3466

3467
      if ((code = syncNodeRebuildPeerAndCfg(ths, &cfg)) != 0) {
×
3468
        TAOS_RETURN(code);
×
3469
      };
3470

3471
      if ((code = syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum)) != 0) {
×
3472
        TAOS_RETURN(code);
×
3473
      };
3474
    } else {  // remove myself
3475
      // no need to do anything actually, to change the following to reduce distruptive server chance
3476

3477
      syncNodeResetPeerAndCfg(ths);
×
3478

3479
      // change myNodeInfo
3480
      ths->myNodeInfo.nodeRole = TAOS_SYNC_ROLE_LEARNER;
×
3481

3482
      // change peer and cfg
3483
      ths->peersNum = 0;
×
3484
      memcpy(&ths->raftCfg.cfg.nodeInfo[0], &ths->myNodeInfo, sizeof(SNodeInfo));
×
3485
      ths->raftCfg.cfg.replicaNum = 0;
×
3486
      ths->raftCfg.cfg.totalReplicaNum = 1;
×
3487

3488
      // change other
3489
      if ((code = syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum)) != 0) {
×
3490
        TAOS_RETURN(code);
×
3491
      }
3492

3493
      // change state
3494
      ths->state = TAOS_SYNC_STATE_LEARNER;
×
3495
    }
3496

3497
    ths->restoreFinish = false;
×
3498
  } else {                            // add replica, or change replica type
3499
    if (ths->totalReplicaNum == 3) {  // change replica type
×
3500
      sInfo("vgId:%d, begin change replica type", ths->vgId);
×
3501

3502
      // change myNodeInfo
3503
      for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
×
3504
        if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
×
3505
            ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
×
3506
          if (cfg.nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3507
            ths->myNodeInfo.nodeRole = TAOS_SYNC_ROLE_VOTER;
×
3508
          }
3509
        }
3510
      }
3511

3512
      // change peer and cfg
3513
      syncNodeChangePeerAndCfgToVoter(ths, &cfg);
×
3514

3515
      // change other
3516
      syncNodeChangeToVoter(ths);
×
3517

3518
      // change state
3519
      if (ths->state == TAOS_SYNC_STATE_LEARNER) {
×
3520
        if (ths->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_VOTER) {
×
3521
          ths->state = TAOS_SYNC_STATE_FOLLOWER;
×
3522
        }
3523
      }
3524

3525
      ths->restoreFinish = false;
×
3526
    } else {  // add replica
3527
      sInfo("vgId:%d, begin add replica", ths->vgId);
×
3528

3529
      // no need to change myNodeInfo
3530

3531
      // change peer and cfg
3532
      if ((code = syncNodeRebuildPeerAndCfg(ths, &cfg)) != 0) {
×
3533
        TAOS_RETURN(code);
×
3534
      };
3535

3536
      // change other
3537
      if ((code = syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum)) != 0) {
×
3538
        TAOS_RETURN(code);
×
3539
      };
3540

3541
      // no need to change state
3542

3543
      if (ths->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_LEARNER) {
×
3544
        ths->restoreFinish = false;
×
3545
      }
3546
    }
3547
  }
3548

3549
  ths->quorum = syncUtilQuorum(ths->replicaNum);
×
3550

3551
  ths->raftCfg.lastConfigIndex = pEntry->index;
×
3552
  ths->raftCfg.cfg.lastIndex = pEntry->index;
×
3553
  ths->raftCfg.cfg.changeVersion = cfg.changeVersion;
×
3554

3555
  syncNodeLogConfigInfo(ths, &cfg, "after config change");
×
3556

3557
  if ((code = syncWriteCfgFile(ths)) != 0) {
×
3558
    sError("vgId:%d, failed to create sync cfg file", ths->vgId);
×
3559
    TAOS_RETURN(code);
×
3560
  };
3561

3562
  TAOS_RETURN(code);
×
3563
}
3564

3565
int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry, SRpcMsg* pMsg) {
592,056,784✔
3566
  int32_t code = -1;
592,056,784✔
3567
  if (pEntry->dataLen < sizeof(SMsgHead)) {
592,056,784✔
3568
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
3569
    sError("vgId:%d, msg:%p, cannot append an invalid client request with no msg head, type:%s dataLen:%d", ths->vgId,
×
3570
           pMsg, TMSG_INFO(pEntry->originalRpcType), pEntry->dataLen);
3571
    syncEntryDestroy(pEntry);
×
3572
    pEntry = NULL;
×
3573
    goto _out;
×
3574
  }
3575

3576
  // append to log buffer
3577
  if ((code = syncLogBufferAppend(ths->pLogBuf, ths, pEntry)) < 0) {
592,044,419✔
3578
    sError("vgId:%d, index:%" PRId64 ", failed to enqueue sync log buffer", ths->vgId, pEntry->index);
×
3579
    int32_t ret = 0;
×
3580
    if ((ret = syncFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, terrno, false)) != 0) {
×
3581
      sError("vgId:%d, index:%" PRId64 ", failed to execute fsm since %s", ths->vgId, pEntry->index, tstrerror(ret));
×
3582
    }
3583
    syncEntryDestroy(pEntry);
×
3584
    pEntry = NULL;
×
3585
    goto _out;
×
3586
  }
3587

3588
  code = 0;
592,047,312✔
3589
_out:;
592,047,312✔
3590
  // proceed match index, with replicating on needed
3591
  SyncIndex       matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, NULL, "Append", pMsg);
592,047,312✔
3592
  const STraceId* trace = pEntry ? &pEntry->originRpcTraceId : NULL;
592,037,008✔
3593

3594
  if (pEntry != NULL) {
592,041,916✔
3595
    sGDebug(trace,
592,060,457✔
3596
            "vgId:%d, index:%" PRId64 ", raft entry appended, msg:%p term:%" PRId64 " buf:[%" PRId64 " %" PRId64
3597
            " %" PRId64 ", %" PRId64 ")",
3598
            ths->vgId, pEntry->index, pMsg, pEntry->term, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
3599
            ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex);
3600
  }
3601

3602
  if (code == 0 && ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
592,028,294✔
3603
    int64_t index = syncNodeUpdateAssignedCommitIndex(ths, matchIndex);
522✔
3604
    sGTrace(trace, "vgId:%d, index:%" PRId64 ", update assigned commit, msg:%p", ths->vgId, index, pMsg);
522✔
3605

3606
    if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
1,044✔
3607
        syncLogBufferCommit(ths->pLogBuf, ths, ths->assignedCommitIndex, trace, "append-entry") < 0) {
522✔
3608
      sGError(trace, "vgId:%d, index:%" PRId64 ", failed to commit, msg:%p commit index:%" PRId64, ths->vgId, index,
×
3609
              pMsg, ths->commitIndex);
3610
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
3611
    }
3612
  }
3613

3614
  // multi replica
3615
  if (ths->replicaNum > 1) {
592,048,911✔
3616
    TAOS_RETURN(code);
23,769,573✔
3617
  }
3618

3619
  // single replica
3620
  SyncIndex returnIndex = syncNodeUpdateCommitIndex(ths, matchIndex);
568,252,125✔
3621
  sGTrace(trace, "vgId:%d, index:%" PRId64 ", raft entry update commit, msg:%p return index:%" PRId64, ths->vgId,
568,247,775✔
3622
          matchIndex, pMsg, returnIndex);
3623

3624
  if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
1,136,518,365✔
3625
      (code = syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex, trace, "append-entry")) < 0) {
568,263,722✔
3626
    sGError(trace,
×
3627
            "vgId:%d, index:%" PRId64 ", failed to commit, msg:%p commit index:%" PRId64 " return index:%" PRId64,
3628
            ths->vgId, matchIndex, pMsg, ths->commitIndex, returnIndex);
3629
  }
3630

3631
  TAOS_RETURN(code);
568,257,961✔
3632
}
3633

3634
bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode) {
588,995,307✔
3635
  if (pSyncNode->totalReplicaNum == 1) {
588,995,307✔
3636
    return false;
565,033,924✔
3637
  }
3638

3639
  int32_t toCount = 0;
23,971,891✔
3640
  int64_t tsNow = taosGetTimestampMs();
23,971,821✔
3641
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
71,835,451✔
3642
    if (pSyncNode->peersNodeInfo[i].nodeRole == TAOS_SYNC_ROLE_LEARNER) {
47,863,458✔
3643
      continue;
210,505✔
3644
    }
3645
    int64_t recvTime = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
47,653,125✔
3646
    if (recvTime == 0 || recvTime == -1) {
47,653,125✔
3647
      continue;
×
3648
    }
3649

3650
    if (tsNow - recvTime > tsHeartbeatTimeout) {
47,653,125✔
3651
      toCount++;
293,966✔
3652
    }
3653
  }
3654

3655
  bool b = (toCount >= pSyncNode->quorum ? true : false);
23,971,891✔
3656

3657
  return b;
23,971,891✔
3658
}
3659

3660
bool syncNodeSnapshotSending(SSyncNode* pSyncNode) {
×
3661
  if (pSyncNode == NULL) return false;
×
3662
  bool b = false;
×
3663
  for (int32_t i = 0; i < pSyncNode->totalReplicaNum; ++i) {
×
3664
    if (pSyncNode->senders[i] != NULL && pSyncNode->senders[i]->start) {
×
3665
      b = true;
×
3666
      break;
×
3667
    }
3668
  }
3669
  return b;
×
3670
}
3671

3672
bool syncNodeSnapshotRecving(SSyncNode* pSyncNode) {
×
3673
  if (pSyncNode == NULL) return false;
×
3674
  if (pSyncNode->pNewNodeReceiver == NULL) return false;
×
3675
  if (pSyncNode->pNewNodeReceiver->start) return true;
×
3676
  return false;
×
3677
}
3678

3679
static int32_t syncNodeAppendNoop(SSyncNode* ths) {
3,688,691✔
3680
  int32_t   code = 0;
3,688,691✔
3681
  SyncIndex index = syncLogBufferGetEndIndex(ths->pLogBuf);
3,688,691✔
3682
  SyncTerm  term = raftStoreGetTerm(ths);
3,689,460✔
3683

3684
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
3,689,460✔
3685
  if (pEntry == NULL) {
3,689,460✔
3686
    code = TSDB_CODE_OUT_OF_MEMORY;
×
3687
    TAOS_RETURN(code);
×
3688
  }
3689

3690
  code = syncNodeAppend(ths, pEntry, NULL);
3,689,460✔
3691
  TAOS_RETURN(code);
3,689,131✔
3692
}
3693

3694
#ifdef BUILD_NO_CALL
3695
static int32_t syncNodeAppendNoopOld(SSyncNode* ths) {
3696
  int32_t ret = 0;
3697

3698
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
3699
  SyncTerm        term = raftStoreGetTerm(ths);
3700
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
3701
  if (pEntry == NULL) return -1;
3702

3703
  LRUHandle* h = NULL;
3704

3705
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
3706
    int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry, false);
3707
    if (code != 0) {
3708
      sError("append noop error");
3709
      return -1;
3710
    }
3711

3712
    syncCacheEntry(ths->pLogStore, pEntry, &h);
3713
  }
3714

3715
  if (h) {
3716
    taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
3717
  } else {
3718
    syncEntryDestroy(pEntry);
3719
  }
3720

3721
  return ret;
3722
}
3723
#endif
3724

3725
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
32,042,052✔
3726
  SyncHeartbeat* pMsg = pRpcMsg->pCont;
32,042,052✔
3727
  bool           resetElect = false;
32,042,052✔
3728

3729
  int64_t tsMs = taosGetTimestampMs();
32,042,052✔
3730

3731
  int64_t lastRecvTime = syncIndexMgrGetRecvTime(ths->pNextIndex, &(pMsg->srcId));
32,042,052✔
3732
  syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs);
32,042,052✔
3733
  syncIndexMgrIncRecvCount(ths->pNextIndex, &(pMsg->srcId));
32,042,052✔
3734

3735
  int64_t netElapsed = tsMs - pMsg->timeStamp;
32,042,052✔
3736
  int64_t timeDiff = tsMs - lastRecvTime;
32,042,052✔
3737
  syncLogRecvHeartbeat(ths, pMsg, netElapsed, &pRpcMsg->info.traceId, timeDiff, pRpcMsg);
32,042,052✔
3738

3739
  if (!syncNodeInRaftGroup(ths, &pMsg->srcId)) {
32,041,356✔
3740
    sWarn(
×
3741
        "vgId:%d, drop heartbeat msg from dnode:%d, because it come from another cluster:%d, differ from current "
3742
        "cluster:%d",
3743
        ths->vgId, DID(&(pMsg->srcId)), CID(&(pMsg->srcId)), CID(&(ths->myRaftId)));
3744
    return 0;
×
3745
  }
3746

3747
  SyncTerm currentTerm = raftStoreGetTerm(ths);
32,042,052✔
3748

3749
  if (pMsg->term > currentTerm && ths->state == TAOS_SYNC_STATE_LEARNER) {
32,042,052✔
3750
    raftStoreSetTerm(ths, pMsg->term);
124,120✔
3751
    currentTerm = pMsg->term;
124,120✔
3752
  }
3753

3754
  int64_t tsMs2 = taosGetTimestampMs();
32,042,052✔
3755

3756
  int64_t processTime = tsMs2 - tsMs;
32,042,052✔
3757
  if (processTime > SYNC_HEARTBEAT_SLOW_MS) {
32,042,052✔
3758
    sGError(&pRpcMsg->info.traceId,
738✔
3759
            "vgId:%d, process sync-heartbeat msg from dnode:%d, commit-index:%" PRId64 ", cluster:%d msgTerm:%" PRId64
3760
            " currentTerm:%" PRId64 ", processTime:%" PRId64,
3761
            ths->vgId, DID(&(pMsg->srcId)), pMsg->commitIndex, CID(&(pMsg->srcId)), pMsg->term, currentTerm,
3762
            processTime);
3763
  } else {
3764
    sGDebug(&pRpcMsg->info.traceId,
32,041,314✔
3765
            "vgId:%d, process sync-heartbeat msg from dnode:%d, commit-index:%" PRId64 ", cluster:%d msgTerm:%" PRId64
3766
            " currentTerm:%" PRId64 ", processTime:%" PRId64,
3767
            ths->vgId, DID(&(pMsg->srcId)), pMsg->commitIndex, CID(&(pMsg->srcId)), pMsg->term, currentTerm,
3768
            processTime);
3769
  }
3770

3771
  if (pMsg->term == currentTerm &&
32,042,052✔
3772
      (ths->state != TAOS_SYNC_STATE_LEADER && ths->state != TAOS_SYNC_STATE_ASSIGNED_LEADER)) {
31,811,576✔
3773
    resetElect = true;
31,811,465✔
3774

3775
    ths->minMatchIndex = pMsg->minMatchIndex;
31,811,465✔
3776

3777
    if (ths->state == TAOS_SYNC_STATE_FOLLOWER || ths->state == TAOS_SYNC_STATE_LEARNER) {
31,811,465✔
3778
      SRpcMsg rpcMsgLocalCmd = {0};
31,791,891✔
3779
      TAOS_CHECK_RETURN(syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId));
31,791,738✔
3780
      rpcMsgLocalCmd.info.traceId = pRpcMsg->info.traceId;
31,792,865✔
3781

3782
      SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
31,792,476✔
3783
      pSyncMsg->cmd =
31,793,172✔
3784
          (ths->state == TAOS_SYNC_STATE_LEARNER) ? SYNC_LOCAL_CMD_LEARNER_CMT : SYNC_LOCAL_CMD_FOLLOWER_CMT;
31,792,476✔
3785
      pSyncMsg->commitIndex = pMsg->commitIndex;
31,792,476✔
3786
      pSyncMsg->currentTerm = pMsg->term;
31,792,476✔
3787

3788
      if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
31,791,891✔
3789
        int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
31,793,172✔
3790
        if (code != 0) {
31,791,031✔
3791
          sError("vgId:%d, failed to enqueue sync-local-cmd msg(cmd=commit) from heartbeat since %s",
1,392✔
3792
                 ths->vgId, tstrerror(code));
3793
          rpcFreeCont(rpcMsgLocalCmd.pCont);
1,392✔
3794
        } else {
3795
          sGTrace(&pRpcMsg->info.traceId,
31,789,639✔
3796
                  "vgId:%d, enqueue sync-local-cmd msg(cmd=commit) from heartbeat, commit-index:%" PRId64
3797
                  ", term:%" PRId64,
3798
                  ths->vgId, pMsg->commitIndex, pMsg->term);
3799
        }
3800
      }
3801
    }
3802
  }
3803

3804
  if (pMsg->term >= currentTerm &&
32,040,074✔
3805
      (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER)) {
32,041,747✔
3806
    SRpcMsg rpcMsgLocalCmd = {0};
1,367✔
3807
    TAOS_CHECK_RETURN(syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId));
618✔
3808
    rpcMsgLocalCmd.info.traceId = pRpcMsg->info.traceId;
618✔
3809

3810
    SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
618✔
3811
    pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN;
618✔
3812
    pSyncMsg->currentTerm = pMsg->term;
618✔
3813
    pSyncMsg->commitIndex = pMsg->commitIndex;
618✔
3814

3815
    if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
618✔
3816
      int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
618✔
3817
      if (code != 0) {
618✔
3818
        sError("vgId:%d, sync enqueue sync-local-cmd msg(cmd=step-down) error, code:%d", ths->vgId, code);
×
3819
        rpcFreeCont(rpcMsgLocalCmd.pCont);
×
3820
      } else {
3821
        sTrace("vgId:%d, sync enqueue sync-local-cmd msg(cmd=step-down), new-term:%" PRId64, ths->vgId, pMsg->term);
618✔
3822
      }
3823
    }
3824
  }
3825

3826
  SRpcMsg rpcMsg = {0};
32,038,634✔
3827
  TAOS_CHECK_RETURN(syncBuildHeartbeatReply(&rpcMsg, ths->vgId));
32,040,717✔
3828
  SyncHeartbeatReply* pMsgReply = rpcMsg.pCont;
32,038,842✔
3829
  pMsgReply->destId = pMsg->srcId;
32,038,842✔
3830
  pMsgReply->srcId = ths->myRaftId;
32,040,607✔
3831
  pMsgReply->term = currentTerm;
32,040,429✔
3832
  pMsgReply->privateTerm = 8864;  // magic number
32,040,428✔
3833
  pMsgReply->startTime = ths->startTime;
32,041,014✔
3834
  pMsgReply->timeStamp = tsMs;
32,039,120✔
3835
  rpcMsg.info.traceId = pRpcMsg->info.traceId;
32,039,658✔
3836

3837
  // reply
3838
  int64_t tsMs3 = taosGetTimestampMs();
32,038,491✔
3839

3840
  int64_t processTime2 = tsMs3 - tsMs2;
32,038,491✔
3841
  TRACE_SET_MSGID(&(rpcMsg.info.traceId), tGenIdPI64());
32,038,491✔
3842
  if (processTime2 > SYNC_HEARTBEAT_SLOW_MS) {
32,036,973✔
3843
    sGError(&rpcMsg.info.traceId,
1,689✔
3844
            "vgId:%d, send sync-heartbeat-reply to dnode:%d term:%" PRId64 " timestamp:%" PRId64
3845
            ", processTime:%" PRId64,
3846
            ths->vgId, DID(&(pMsgReply->destId)), pMsgReply->term, pMsgReply->timeStamp, processTime2);
3847
  } else {
3848
    if(tsSyncLogHeartbeat){
32,035,284✔
3849
      sGInfo(&rpcMsg.info.traceId,
×
3850
            "vgId:%d, send sync-heartbeat-reply to dnode:%d term:%" PRId64 " timestamp:%" PRId64
3851
            ", processTime:%" PRId64,
3852
            ths->vgId, DID(&(pMsgReply->destId)), pMsgReply->term, pMsgReply->timeStamp, processTime2);
3853
    }
3854
    else{
3855
      sGDebug(&rpcMsg.info.traceId,
32,035,284✔
3856
            "vgId:%d, send sync-heartbeat-reply to dnode:%d term:%" PRId64 " timestamp:%" PRId64
3857
            ", processTime:%" PRId64,
3858
            ths->vgId, DID(&(pMsgReply->destId)), pMsgReply->term, pMsgReply->timeStamp, processTime2);
3859
    }
3860
  }
3861

3862
  TAOS_CHECK_RETURN(syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg));
32,036,973✔
3863

3864
  if (resetElect) syncNodeResetElectTimer(ths);
32,042,052✔
3865
  return 0;
32,042,052✔
3866
}
3867

3868
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
31,643,881✔
3869
  int32_t code = 0;
31,643,881✔
3870

3871
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
31,643,881✔
3872
  SSyncLogReplMgr*    pMgr = syncNodeGetLogReplMgr(ths, &pMsg->srcId);
31,643,881✔
3873
  if (pMgr == NULL) {
31,643,143✔
3874
    code = TSDB_CODE_SYN_RETURN_VALUE_NULL;
×
3875
    if (terrno != 0) code = terrno;
×
3876
    sError("vgId:%d, failed to get log repl mgr for the peer at addr 0x016%" PRIx64, ths->vgId, pMsg->srcId.addr);
×
3877
    TAOS_RETURN(code);
×
3878
  }
3879

3880
  int64_t tsMs = taosGetTimestampMs();
31,643,143✔
3881
  int64_t lastRecvTime = syncIndexMgrGetRecvTime(ths->pMatchIndex, &pMsg->srcId);
31,643,143✔
3882
  syncLogRecvHeartbeatReply(ths, pMsg, tsMs - pMsg->timeStamp, &pRpcMsg->info.traceId, tsMs - lastRecvTime);
31,643,881✔
3883

3884
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);
31,642,394✔
3885
  syncIndexMgrIncRecvCount(ths->pMatchIndex, &(pMsg->srcId));
31,643,132✔
3886

3887
  return syncLogReplProcessHeartbeatReply(pMgr, ths, pMsg);
31,643,881✔
3888
}
3889

3890
#ifdef BUILD_NO_CALL
3891
int32_t syncNodeOnHeartbeatReplyOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
3892
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
3893

3894
  int64_t tsMs = taosGetTimestampMs();
3895
  int64_t timeDiff = tsMs - pMsg->timeStamp;
3896
  syncLogRecvHeartbeatReply(ths, pMsg, timeDiff, &pRpcMsg->info.traceId, timeDiff);
3897

3898
  // update last reply time, make decision whether the other node is alive or not
3899
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);
3900
  return 0;
3901
}
3902
#endif
3903

3904
int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
31,792,398✔
3905
  SyncLocalCmd* pMsg = pRpcMsg->pCont;
31,792,398✔
3906
  syncLogRecvLocalCmd(ths, pMsg, &pRpcMsg->info.traceId);
31,792,398✔
3907

3908
  if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
31,792,398✔
3909
    SRaftId id = EMPTY_RAFT_ID;
618✔
3910
    syncNodeStepDown(ths, pMsg->currentTerm, id, "localCmd");
618✔
3911

3912
  } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT || pMsg->cmd == SYNC_LOCAL_CMD_LEARNER_CMT) {
63,582,822✔
3913
    if (syncLogBufferIsEmpty(ths->pLogBuf)) {
31,791,780✔
3914
      sError("vgId:%d, sync log buffer is empty", ths->vgId);
×
3915
      return 0;
×
3916
    }
3917
    SyncTerm matchTerm = syncLogBufferGetLastMatchTerm(ths->pLogBuf);
31,791,780✔
3918
    if (matchTerm < 0) {
31,791,780✔
3919
      return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3920
    }
3921
    if (pMsg->currentTerm == matchTerm) {
31,791,780✔
3922
      SyncIndex returnIndex = syncNodeUpdateCommitIndex(ths, pMsg->commitIndex);
30,303,290✔
3923
      sTrace("vgId:%d, raft entry update commit, return index:%" PRId64, ths->vgId, returnIndex);
30,303,290✔
3924
    }
3925
    if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
63,582,822✔
3926
        syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex, &pRpcMsg->info.traceId, "heartbeat") < 0) {
31,791,780✔
3927
      sError("vgId:%d, failed to commit raft log since %s. commit index:%" PRId64, ths->vgId, terrstr(),
1,350✔
3928
             ths->commitIndex);
3929
    }
3930
  } else {
3931
    sError("error local cmd");
×
3932
  }
3933

3934
  return 0;
31,791,660✔
3935
}
3936

3937
// TLA+ Spec
3938
// ClientRequest(i, v) ==
3939
//     /\ state[i] = Leader
3940
//     /\ LET entry == [term  |-> currentTerm[i],
3941
//                      value |-> v]
3942
//            newLog == Append(log[i], entry)
3943
//        IN  log' = [log EXCEPT ![i] = newLog]
3944
//     /\ UNCHANGED <<messages, serverVars, candidateVars,
3945
//                    leaderVars, commitIndex>>
3946
//
3947

3948
int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex) {
588,375,727✔
3949
  sGDebug(&pMsg->info.traceId, "vgId:%d, msg:%p, process client request", ths->vgId, pMsg);
588,375,727✔
3950
  int32_t code = 0;
588,375,727✔
3951

3952
  SyncIndex       index = syncLogBufferGetEndIndex(ths->pLogBuf);
588,375,727✔
3953
  SyncTerm        term = raftStoreGetTerm(ths);
588,381,855✔
3954
  SSyncRaftEntry* pEntry = NULL;
588,379,122✔
3955
  if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
588,379,122✔
3956
    pEntry = syncEntryBuildFromClientRequest(pMsg->pCont, term, index, &pMsg->info.traceId);
55,738,862✔
3957
  } else {
3958
    pEntry = syncEntryBuildFromRpcMsg(pMsg, term, index);
532,633,214✔
3959
  }
3960

3961
  if (pEntry == NULL) {
588,376,297✔
3962
    sGError(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to process client request since %s", ths->vgId, pMsg,
×
3963
            terrstr());
3964
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
3965
  }
3966

3967
  // 1->2, config change is add in write thread, and will continue in sync thread
3968
  // need save message for it
3969
  if (pMsg->msgType == TDMT_SYNC_CONFIG_CHANGE) {
588,376,297✔
3970
    SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
×
3971
    uint64_t  seqNum = syncRespMgrAdd(ths->pSyncRespMgr, &stub);
×
3972
    pEntry->seqNum = seqNum;
×
3973
  }
3974

3975
  if (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
588,369,624✔
3976
    if (pRetIndex) {
588,374,279✔
3977
      (*pRetIndex) = index;
532,630,600✔
3978
    }
3979

3980
    if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) {
588,380,779✔
3981
      int32_t code = syncNodeCheckChangeConfig(ths, pEntry);
×
3982
      if (code < 0) {
×
3983
        sGError(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to check change config since %s", ths->vgId, pMsg,
×
3984
                terrstr());
3985
        syncEntryDestroy(pEntry);
×
3986
        pEntry = NULL;
×
3987
        TAOS_RETURN(code);
×
3988
      }
3989

3990
      if (code > 0) {
×
3991
        SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
×
3992
        int32_t num = syncRespMgrGetAndDel(ths->pSyncRespMgr, pEntry->seqNum, &rsp.info);
×
3993
        sGDebug(&pMsg->info.traceId, "vgId:%d, msg:%p, get response stub for config change, seqNum:%" PRIu64 " num:%d",
×
3994
                ths->vgId, pMsg, pEntry->seqNum, num);
3995
        if (rsp.info.handle != NULL) {
×
3996
          tmsgSendRsp(&rsp);
×
3997
        }
3998
        syncEntryDestroy(pEntry);
×
3999
        pEntry = NULL;
×
4000
        TAOS_RETURN(code);
×
4001
      }
4002
    }
4003

4004
    code = syncNodeAppend(ths, pEntry, pMsg);
588,374,055✔
4005
    return code;
588,334,559✔
4006
  } else {
4007
    syncEntryDestroy(pEntry);
×
4008
    pEntry = NULL;
×
4009
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
4010
  }
4011
}
4012

4013
const char* syncStr(ESyncState state) {
179,112,116✔
4014
  switch (state) {
179,112,116✔
4015
    case TAOS_SYNC_STATE_FOLLOWER:
86,372,744✔
4016
      return "follower";
86,372,744✔
4017
    case TAOS_SYNC_STATE_CANDIDATE:
2,570,810✔
4018
      return "candidate";
2,570,810✔
4019
    case TAOS_SYNC_STATE_LEADER:
82,105,862✔
4020
      return "leader";
82,105,862✔
4021
    case TAOS_SYNC_STATE_ERROR:
×
4022
      return "error";
×
4023
    case TAOS_SYNC_STATE_OFFLINE:
1,644,535✔
4024
      return "offline";
1,644,535✔
4025
    case TAOS_SYNC_STATE_LEARNER:
6,409,713✔
4026
      return "learner";
6,409,713✔
4027
    case TAOS_SYNC_STATE_ASSIGNED_LEADER:
1,665✔
4028
      return "assigned leader";
1,665✔
4029
    default:
6,787✔
4030
      return "unknown";
6,787✔
4031
  }
4032
}
4033

4034
int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) {
452,429✔
4035
  for (int32_t i = 0; i < pNewCfg->totalReplicaNum; ++i) {
530,958✔
4036
    SRaftId raftId = {
530,958✔
4037
        .addr = SYNC_ADDR(&pNewCfg->nodeInfo[i]),
530,958✔
4038
        .vgId = ths->vgId,
530,958✔
4039
    };
4040

4041
    if (syncUtilSameId(&(ths->myRaftId), &raftId)) {
530,958✔
4042
      pNewCfg->myIndex = i;
452,429✔
4043
      return 0;
452,429✔
4044
    }
4045
  }
4046

4047
  return TSDB_CODE_SYN_INTERNAL_ERROR;
×
4048
}
4049

4050
bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) {
588,992,649✔
4051
  return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1);
588,992,649✔
4052
}
4053

4054
bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId) {
145,950,443✔
4055
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
276,876,971✔
4056
    if (syncUtilSameId(&((ths->replicasId)[i]), pRaftId)) {
276,876,386✔
4057
      return true;
145,950,443✔
4058
    }
4059
  }
4060
  return false;
×
4061
}
4062

4063
SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId) {
21,318,363✔
4064
  SSyncSnapshotSender* pSender = NULL;
21,318,363✔
4065
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
86,118,206✔
4066
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
64,805,883✔
4067
      pSender = (ths->senders)[i];
21,319,416✔
4068
    }
4069
  }
4070
  return pSender;
21,319,126✔
4071
}
4072

4073
SSyncTimer* syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId) {
13,246,764✔
4074
  SSyncTimer* pTimer = NULL;
13,246,764✔
4075
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
56,422,623✔
4076
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
43,176,147✔
4077
      pTimer = &((ths->peerHeartbeatTimerArr)[i]);
13,246,778✔
4078
    }
4079
  }
4080
  return pTimer;
13,246,764✔
4081
}
4082

4083
SPeerState* syncNodeGetPeerState(SSyncNode* ths, const SRaftId* pDestId) {
750,610✔
4084
  SPeerState* pState = NULL;
750,610✔
4085
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
2,539,685✔
4086
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
1,789,075✔
4087
      pState = &((ths->peerStates)[i]);
750,610✔
4088
    }
4089
  }
4090
  return pState;
750,610✔
4091
}
4092

4093
#ifdef BUILD_NO_CALL
4094
bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const SyncAppendEntries* pMsg) {
4095
  SPeerState* pState = syncNodeGetPeerState(ths, pDestId);
4096
  if (pState == NULL) {
4097
    sError("vgId:%d, replica maybe dropped", ths->vgId);
4098
    return false;
4099
  }
4100

4101
  SyncIndex sendIndex = pMsg->prevLogIndex + 1;
4102
  int64_t   tsNow = taosGetTimestampMs();
4103

4104
  if (pState->lastSendIndex == sendIndex && tsNow - pState->lastSendTime < SYNC_APPEND_ENTRIES_TIMEOUT_MS) {
4105
    return false;
4106
  }
4107

4108
  return true;
4109
}
4110

4111
bool syncNodeCanChange(SSyncNode* pSyncNode) {
4112
  if (pSyncNode->changing) {
4113
    sError("sync cannot change");
4114
    return false;
4115
  }
4116

4117
  if ((pSyncNode->commitIndex >= SYNC_INDEX_BEGIN)) {
4118
    SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
4119
    if (pSyncNode->commitIndex != lastIndex) {
4120
      sError("sync cannot change2");
4121
      return false;
4122
    }
4123
  }
4124

4125
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
4126
    SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->peersId)[i]);
4127
    if (pSender != NULL && pSender->start) {
4128
      sError("sync cannot change3");
4129
      return false;
4130
    }
4131
  }
4132

4133
  return true;
4134
}
4135
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc